学习目标
. 掌握热门品类Top10分析实现思路。
. 掌握如何创建Spark连接并读取数据集。
. 掌握利用Spark获取业务数据。
. 掌握利用Spark统计品类的行为类型。
. 掌握利用Spark过滤品类的行为类型。
. 掌握利用Spark合并相同品类的行为类型。
. 掌握利用Spark根据品类的行为类型进行排序。
. 掌握将数据持久化到HBase数据库。
. 熟悉通过SparkOnYARN 运行程序。
品类指商品所属分类,用户在访问电商网站时,通常会产生很多行为,如查看商品的信
息、将感兴趣的商品加入购物车和购买商品等,这些行为都将作为数据被网站存储。本章通
过对电商网站存储的用户行为数据进行分析,从而统计出排名前十的热门品类。
3.1 数据集分析
某电商网站2019年11月产生的用户行为数据存储在文件user_session.txt,该文件中
的每一行数据都表示一个用户行为,所有行为都与商品和用户有关。由于原始数据集较大
(13.7GB),对硬件配置要求较高,考虑到读者学习的便捷性,从原始数据集中抽取了500万
条数据(约1GB)进行分析。虽然数据比较多,但是数据内容格式基本类似,这里选取其中
一条数据进行分析,具体如下。
{"user_session":"0000007c- adbf- 4ed7- af17- d1fef9763d67","event_type":"view","category_
id":"2053013553090134275","user _id":"560165420","product _id":"8900305","address _name":
"Maryland","event_time":"2019-11-18 09:16:19"}
上述数据包含很多字段,每个字段都代表特定的含义,具体介绍如下。
. user_session:用于标识用户行为的唯一值。
.event_type:表示用户行为的类型,包括view(查看)、cart(加入购物车)和purchase
(购买)行为。
7
4
Spark项目实战
.categoryid:表示商品品类ID 。
.userid: 示用户ID 。表(_)
.prouct_id:表示商品ID 。d(_)
.addres_name:表示产生事件的区域。
.event_time:表示产生事件的具体时间
。
注:本书的配套资源会为读者提供数据集文件usrssin.
x
e_eott。
3.实现思路分析
2
用户在访问电商网站时,通常会针对商品产生很多行为事件,如查看、加入购物车和购
买。首先需要分别统计各个品类商品的查看次数、加入购物车次数以及购买次数。接下来,
将同一品类中商品的查看、加入购物车以及购买次数进行合并。然后,自定义排序规则、按
照各个品类中商品的查看、加入购物车和购买次数进行降序排序,获取排名前十的品类,就
是热门品类Top10 。排序时,优先按照各个品类商品的查看次数降序排列,如果查看次数相
同,则按照各个品类商品的加入购物车次数进行降序排列。如果查看次数和加入购车次数
都相同,那么按照各品类商品的购买次数进行降序排列。最后,将同一品类中商品的查看、
加入购物车和购买次数映射到自定义排序规则中进行排序处理。有关热门品类Top10的
分析过程如图3-1所示。
图3-
1
热门品类Top10的分析过程
针对图3-1中热门品类Top10的分析过程讲解如下。
.读取/转换:读取数据集中的行为类型(event_type)和品类ID(category_id)数据,为
第3章热门品类Top10分析75
了便于后续聚合处理时,将相同Key的Value值进行累加,计算每个品类中不同行
为出现的总次数,这里需要对输出结果的数据格式进行转换处理,将行为类型和品
类ID作为Key,值1作为Value。
.聚合:统计各个品类的查看、加入购物车和购买次数。
.过滤/转换:将聚合结果进行过滤处理,并分为3部分数据,第一部分数据包含各个
品类查看次数,第二部分数据包含各个品类加入购物车次数,第三部分包含各个品
类购买次数。对过滤后的3部分数据进行转换处理,去除数据中的行为类型字段。
此步目的是后续合并操作时,明确同一品类中不同行为类型所处位置。
.合并:将Key值相同的Value进行合并处理,目的是将相同品类的查看次数、加入
购物车次数和购买次数合并到一行。
t)、
.排序:对每个品类中查看次数(viewcoun加入购物车次数(cartcount)和购买次数
(purchasecount)进行排序处理,在排序过程会涉及3类值的排序,因此这里需要使
用Spark的二次排序,在排序操作时使用自定义排序的方式进行处理。
3.实现热门品类Tp10
3o
实现热门品类Top10分析的程序由Java编程语言实现。目前,Java的主流开发工具
有两种:Eclipse工具和InteliJIDEA工具。我们可以在这两种开发工具中编写Java代码
来实现热门品类Top10分析。由于InteliJIDEA工具内置了很多优秀的插件,在智能代码
助手、代码自动提示、重构、CVS整合、代码分析等方面有着不错的表现,因此本项目将使用
InteliJIDEA作为Java开发工具。
3.1
创建项目
3.
本项目使用的IJIDEA版本为2018.读者可通过IJIDEA官网下载并安装
nteli3, nteli
程序,关于InteliJIDEA的下载安装这里不做赘述(注意:安装InteliJIDEA之前需要
安
装JDK并在系统环境变量中配置JDK,本项目使用的JDK版本为1.
8)。
Maven便于维护和管理项目依赖,因此本项目将通过构建Maven现目实现相关需求。
接下来,详细讲解如何在InteliJIDEA中构建Maven项目SparkProject,具体步骤如下。
1.创建Maven项目
打开InteliJIDEA开发工具,进入InteliJIDEA欢迎界面,具体如图3-2所示。
在图3-2中单击Configure右侧的下拉箭头,依次选择ProjectDefaults→Project
Structure命令,配置项目使用的JDK,如图3-3所示。
在图3-3中配置完JDK后,单击OK按钮返回InteliJIDEA欢迎界面。
单击图3-2中的CreateNewProject按钮创建新项目,在弹出的NewProject窗口左侧
选择Maven,即创建Maven项目,如图3-4所示。
在图3-4中,单击Next按钮,配置Maven项目的组织名(GroupId)和项目工程名
(ArtifactId),如图3-5所示。
76
Spark项目实战
图3-
2
InteliJIDEA
欢迎界面
图3-
3
配置项目使用的JDK
在图3-5中,单击Next按钮,配置项目名称(Projectname)和项目本地的存放目录
(Projectlocation),如图3-6所示。
在图3-6中,单击Finish按钮,完成项目SparkProject的创建。项目SparkProject的初
始结构如图3-7所示。
第3章热门品类Top10 分析77
图3-
4
创建Maven项目
图3-
5
配置组织名和工程名
2.
导入依赖
本项目所需要的依赖包括JSON 、HBe和Spa在文件pom.l中添加这些依赖方
式,具体代码如文件3-1所示。
asrk, xm
Spark项78 目实战
图3-6 配置项目名称和本地存放目录
图3-7 项目SparkProject的初始结构
文件3-1 pom.xml
1
2
3
4 io.netty
5 netty-all
6 4.1.18.Final
7
8
9
10
11
12
13 com.alibaba
第3章 热门品类Top10分析 79
14 fastjson
15 1.2.62
16
17
18
19 org.apache.hbase
20 hbase-client
21 1.2.1
22
23
24 org.apache.hbase
25 hbase-common
26 1.2.1
27
28
29
30 org.apache.spark
31 spark-core_2.11
32 2.3.2
33
34
35 io.netty
36 netty
37
38
39
40
41
42
43
44 org.apache.maven.plugins
45 maven-compiler-plugin
46
47 1.8
48 1.8
49
50
51
52 maven-assembly-plugin
53
54 false
55
56 jar-with-dependencies
57
58
59
60
61 cn.itcast.top10.CategoryTop10
62
63
Spark项80 目实战
64
65
66
67 make-assembly
68 package
69
70 assembly
71
72
73
74
75
76
文件3-1中:第1~9行代码主要是对项目中Netty依赖进行多版本管理,避免本地运
行出现多个版本的Netty,导致程序出现NoSuchMethodError异常;第12~16行代码引入
JSON 依赖,用于解析JSON 数据;第18~27行代码引入HBase依赖,用于操作HBase数
据库;第29~39行代码引入Spark依赖,用于开发Spark数据分析程序;第43~50行代码
指定Maven编译的JDK版本,如果不指定,Maven3默认用JDK1.5,Maven2默认用JDK
1.3;第51~74行代码配置程序打包方式并指定程序主类。
3.创建项目目录
在项目SparkProject中右击java目录,在弹出的快捷菜单中依次选择New→Package,
从而新建Package包,具体如图3-8所示。
图3-8 新建Package包的步骤
第3章热门品类Top10分析81
通过如图3-8所示的操作后,会弹出NewPackage对话框,在文本输入框Enternew
pakgame中输入cn.tattp10设置Pakge名称,用于存放实现热门品类Top10分
caenics.oca
析的Java文件,如图3-9所示。
图3-
9
设置Package名称
在图3-9中单击OK按钮完成Package包的创建。
4.创建程序主类
右击包cn.itattp10,在弹出的快捷菜单中依次选择New→Jva新建Jaa类,
cs.oaaCls
v
具体如图3-10所示。
图3-10
新建Java类
通过如图3-10所示的操作后,会弹出CreateNewClas
对话框,在文本框Name中输
入CategoryTop10设置类名称,在类中实现热门品类Top10分析,具体如图3-11所示。
3.2
创建Sprk连接并读取数据集
3.a
在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方
法中实现Spark程序,具体代码如文件3-2所示。
Spark项82 目实战
图3-11 设置Java类名称
文件3-2 CategoryTop10.java
1 public class CategoryTop10{
2 public static void main(String[] arg){
3 //实现热门品类Top10 分析
4 }
5 }
在文件3-2的main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext
对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数,具体代码如下。
1 SparkConf conf =new SparkConf();
2 //设置Application 名称为top10_category
3 conf.setAppName("top10_category");
4 JavaSparkContext sc =new JavaSparkContext(conf);
在文件3-2的main()方法中,调用JavaSparkContext对象的textFile()方法读取外部
文件,将文件中的数据加载到textFileRDD,具体代码如下。
JavaRDD textFileRDD =sc.textFile(arg[0]);
上述代码中,通过变量arg[0]指定文件路径,目的是执行提交Spark程序到YARN 集
群运行的命令中,通过参数指定文件路径。
3.3.3 获取业务数据
在文件3-2的main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据,
用于获取每一行数据中的行为类型和品类ID 数据,将转换结果加载到transformRDD,具
体代码如下。
1 JavaPairRDD,Integer> transformRDD =textFileRDD
2 .mapToPair(
3 new PairFunction<
4 String,
5 Tuple2, Integer>() {
6 @Override
7 public Tuple2, Integer> call(String s)
8 throws Exception {
第3章 热门品类Top10分析 83
9 //将数据转换为JSON 对象
10 JSONObject json =JSONObject.parseObject(s);
11 String category_id =json.getString("category_id");
12 String event_type =json.getString("event_type");
13 return new Tuple2<>(
14 new Tuple2<>(category_id,event_type),
15 new Integer(1));
16 }
17 });
上述代码中,首先将textFileRDD中的每一行数据转换为JSON 对象;然后获取JSON
对象中的category_id(品类ID)和event_type(行为类型);最后将category_id、event_type
和值1添加到Tuple2对象中。
3.3.4 统计品类的行为类型
在文件3-2的main()方法中,使用reduceByKey()算子对transformRDD 进行聚合操
作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到
aggregationRDD,具体代码如下。
1 JavaPairRDD, Integer> aggregationRDD =
2 transformRDD.reduceByKey(
3 new Function2() {
4 @Override
5 public Integer call(Integer integer1, Integer integer2)
6 throws Exception {
7 return integer1 +integer2;
8 }
9 });
3.3.5 过滤品类的行为类型
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID 数
据,最终将转换结果加载到getViewCategoryRDD,具体代码如下。
1 JavaPairRDD getViewCategoryRDD =aggregationRDD
2 .filter(new Function, Integer>
3 , Boolean>() {
4 @Override
5 public Boolean call(Tuple2
6 , Integer> tuple2) throws Exception {
7 //获取行为类型
8 String action =tuple2._1._2;
9 return action.equals("view");
10 }
Spark项84 目实战
11 }).mapToPair(
12 new PairFunction
13 , Integer>, String, Integer>() {
14 @Override
15 public Tuple2
16 call(Tuple2, Integer> tuple2)
17 throws Exception {
18 return new Tuple2<>(tuple2._1._1,tuple2._2);
19 }
20 });
上述代码中,第9行通过equals()方法判断获取的行为类型是否为view(查看)并将判
断结果作为返回值,若返回值为True,则进行后续转换操作。
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车的次数和品
类ID数据,最终将转换结果加载到getCartCategoryRDD,具体代码如下。
1 JavaPairRDD getCartCategoryRDD =aggregationRDD
2 .filter(new Function, Integer>
3 , Boolean>() {
4 @Override
5 public Boolean call(Tuple2
6 , Integer> tuple2) throws Exception {
7 String action =tuple2._1._2;
8 return action.equals("cart");
9 }
10 }).mapToPair(new PairFunction
11 , Integer>, String, Integer>() {
12 @Override
13 public Tuple2
14 call(Tuple2, Integer> tuple2)
15 throws Exception {
16 return new Tuple2<>(tuple2._1._1,tuple2._2);
17 }
18 });
上述代码中,第8行通过equals()方法判断获取的行为类型是否为cart(加入购物车)
并将判断结果作为返回值,若返回值为True,则进行后续转换操作。
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID 数
据,最终将转换结果加载到getPurchaseCategoryRDD,具体代码如下。
第3章 热门品类Top10分析 85
1 JavaPairRDD getPurchaseCategoryRDD =aggregationRDD
2 .filter(new Function, Integer>
3 , Boolean>() {
4 @Override
5 public Boolean call(Tuple2
6 , Integer> tuple2) throws Exception {
7 String action =tuple2._1._2;
8 return action.equals("purchase");
9 }
10 }).mapToPair(new PairFunction
11 , Integer>, String, Integer>() {
12 @Override
13 public Tuple2
14 call(Tuple2, Integer> tuple2)
15 throws Exception {
16 return new Tuple2<>(tuple2._1._1,tuple2._2);
17 }
18 });
上述代码中,第8行通过equals()方法判断获取的行为类型是否为purchase(购买)并
将判断结果作为返回值,若返回值为True,则进行后续转换操作。
3.3.6 合并相同品类的行为类型
在文件3-2 的main()方法中,使用leftOuterJoin()(左外连接)算子合并
getViewCategoryRDD、getCartCategoryRDD 和getPurchaseCategoryRDD,用于合并同一
品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD,具体
代码如下。
1 JavaPairRDD>>
2 tmpJoinCategoryRDD =getViewCategoryRDD
3 .leftOuterJoin(getCartCategoryRDD);
4 JavaPairRDD>,
6 Optional>> joinCategoryRDD =
7 tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);
上述代码中,首先通过leftOuterJoin()算子合并getViewCategoryRDD和getCartCategoryRDD,
将合并结果加载到tmpJoinCategoryRDD,然后通过leftOuterJoin()算子合并
tmpJoinCategoryRDD和getPurchaseCategoryRDD,将合并结果加载到joinCategoryRDD。
Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解
决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。
3.3.7 根据品类的行为类型进行排序
在包cn.itcast.top10中创建文件CategorySortKey.java,用于实现自定义排序。在类
CategorySortKey中继承比较器接口Comparable 和序列化接口Serializable,并实现
Spark项86 目实战
Comparable接口的compareTo()方法,具体代码如文件3-3所示。
文件3-3 CategorySortKey.java
1 import java.io.Serializable;
2 public class CategorySortKey implements Comparable
3 ,Serializable{
4 //查看次数
5 private int viewCount;
6 //加入购物车次数
7 private int cartCount;
8 //购买次数
9 private int purchaseCount;
10 //定义类的构造方法
11 public CategorySortKey(
12 int viewcount,
13 int cartCount,
14 int purchaseCount)
15 {
16 this.viewCount =viewcount;
17 this.cartCount =cartCount;
18 this.purchaseCount =purchaseCount;
19 }
20 //定义属性的getter 和setter 方法
21 .
22 @Override
23 public int compareTo(CategorySortKey other) {
24 if(viewCount -other.getViewCount() !=0) {
25 return (int) (viewCount -other.getViewCount());
26 } else if(cartCount -other.getCartCount() !=0) {
27 return (int) (cartCount -other.getCartCount());
28 } else if(purchaseCount -other.getPurchaseCount() !=0) {
29 return (int) (purchaseCount -other.getPurchaseCount());
30 }
31 return 0;
32 }
33 }
在文件3-3中,第22~32行代码,重写接口Comparable的compareTo()方法,在方法
内部实现对象的比较,比较的规则为返回值等于0表示相等;返回值小于0表示小于;返回
值大于0表示大于。比较的优先级按照viewCount、cartCount和purchaseCount的顺序。
在文件3-2 的main()方法中,使用mapTopair()算子转换joinCategoryRDD,将
joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类
CategorySortKey,通过transCategoryRDD加载转换结果,具体代码如下。
1 JavaPairRDD transCategoryRDD =joinCategoryRDD
2 .mapToPair(new PairFunction>,Optional>>,
第3章 热门品类Top10分析 87
4 CategorySortKey,String>() {
5 @Override
6 public Tuple2 call(Tuple2>,
8 Optional>> tuple2) throws Exception {
9 String category_id =tuple2._1;
10 int viewcount =tuple2._2._1._1;
11 int cartcount =0;
12 int purchasecount =0;
13 //判断品类被加入购物车次数是否为空
14 if (tuple2._2._1._2.isPresent()){
15 cartcount =tuple2._2._1._2.get().intValue();
16 }
17 //判断品类被购买次数是否为空
18 if (tuple2._2._2.isPresent()){
19 purchasecount =tuple2._2._2.get().intValue();
20 }
21 /*将viewcount、cartcount 和purchasecount 映射到
22 类CategorySortKey 的构造方法中*/
23 CategorySortKey sortKey =
24 new CategorySortKey(viewcount, cartcount, purchasecount);
25 return new Tuple2<>(sortKey,category_id);
26 }
27 });
上述代码中的isPresent()方法用于判断Optional类型的数据是否为空,若值为空则通
过get()方法获取值,并通过intValue()方法指定获取的值为Int类型。
在文件3-2的main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操
作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序
类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD,具
体代码如下。
JavaPairRDD sortedCategoryRDD =
transCategoryRDD.sortByKey(false);
上述代码中,sortByKey()算子的参数为false,表示使用自定义排序类的比较方式进行
排序。在
文件3-2的main()方法中,使用take()算子获取sortedCategoryRDD前10个元素,
即热门品类Top10分析结果,将分析结果加载到top10CategoryList,具体代码如下。
List> top10CategoryList =
sortedCategoryRDD.take(10);
上述代码中,take()算子的参数为10,表示获取sortedCategoryRDD前10个元素。
3.3.8 数据持久化
本项目使用HBase数据库作为数据持久化工具,HBase分布式数据库通过HDFS和
Spark项88 目实战
ZooKeeper实现数据的高可用和冗余,从而确保数据库和数据的安全性。接下来,分步骤讲
解如何将热门品类Top10分析结果持久化到HBase数据库中。
1.封装HBase工具类
为了避免后续环节重复编写数据库连接和数据库操作的相关代码,这里将HBase数据
库连接工具类和HBase数据库操作工具类进行封装,具体实现步骤如下。
(1)在项目SparkProject的java目录新建Package包cn.itcast.hbase,用于存放实现数
据持久化的Java文件。在包cn.itcast.hbase下创建文件HbaseConnect.java,用于实现封装
HBase数据库连接工具类,具体代码如文件3-4所示。
文件3-4 HbaseConnect.java
1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.hbase.HBaseConfiguration;
3 import org.apache.hadoop.hbase.MasterNotRunningException;
4 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
5 import org.apache.hadoop.hbase.client.Connection;
6 import org.apache.hadoop.hbase.client.ConnectionFactory;
7 import org.apache.hadoop.hbase.client.HBaseAdmin;
8 import java.io.IOException;
9 public class HbaseConnect {
10 public static Configuration conf;
11 public static Connection conn;
12 public static HBaseAdmin hbaseAdmin;
13 static {
14 //创建HBase 配置信息
15 conf =HBaseConfiguration.create();
16 //配置ZooKeeper 集群地址
17 conf.set("hbase.zookeeper.quorum", "spark01,spark02,spark03");
18 //配置ZooKeeper 端口号
19 conf.set("hbase.zookeeper.property.clientPort", "2181");
20 try {
21 //通过HBase 配置获取HBase 数据库连接对象
22 conn =ConnectionFactory.createConnection(conf);
23 } catch (IOException e) {
24 e.printStackTrace();
25 }
26 }
27 public static HBaseAdmin getHBaseAdmin() throws IOException{
28 try {
29 //获取HBase 数据库操作对象
30 hbaseAdmin =(HBaseAdmin)(conn.getAdmin());
31 } catch (MasterNotRunningException e) {
32 e.printStackTrace();
33 } catch (ZooKeeperConnectionException e) {
34 e.printStackTrace();
35 }
36 return hbaseAdmin;
第3章 热门品类Top10分析 89
37 }
38 public static Connection getConnection(){
39 return conn;
40 }
41 public static synchronized void closeConnection(){
42 if(conn!=null){
43 try {
44 conn.close();
45 } catch (IOException e) {
46 e.printStackTrace();
47 }
48 }
49 }
50 }
在文件3-3中,第38~40行代码创建返回值类型为Connection的方法getConnection(),
用于获取HBase数据库连接;第41~49行代码创建方法closeConnection(),用于关闭
HBase数据库连接。
需要注意的是,若运行项目SparkProject的环境中未配置IP 映射,则需要在配置
Zookeeper集群地址时使用IP地址而不是主机名。
(2)在项目SparkProject的包cn.itcast.hbase下创建文件HbaseUtils.java,用于实现
封装HBase数据库操作工具类,具体代码如文件3-5所示。
文件3-5 HbaseUtils.java
1 import org.apache.hadoop.hbase.HColumnDescriptor;
2 import org.apache.hadoop.hbase.HTableDescriptor;
3 import org.apache.hadoop.hbase.TableName;
4 import org.apache.hadoop.hbase.client.HBaseAdmin;
5 import org.apache.hadoop.hbase.client.Put;
6 import org.apache.hadoop.hbase.client.Table;
7 import org.apache.hadoop.hbase.util.Bytes;
8 import java.io.IOException;
9 public class HbaseUtils {
10 public static void createTable(String tableName,
11 String... columFamilys)
12 throws IOException {
13 //获取HBase 数据表操作对象
14 HBaseAdmin admin =HbaseConnect.getHBaseAdmin();
15 //判断表是否存在
16 if (admin.tableExists(tableName)){
17 //关闭表
18 admin.disableTable(tableName);
19 //删除表
20 admin.deleteTable(tableName);
21 }
22 //HTableDescriptor 类包含了表的名字以及表的列族信息
Spark项90 目实战
23 HTableDescriptor hd
24 =new HTableDescriptor(TableName.valueOf(tableName));
25 for (String cf : columFamilys) {
26 hd.addFamily(new HColumnDescriptor(cf));
27 }
28 //通过createTable()方法创建HBase 数据表
29 admin.createTable(hd);
30 admin.close();
31 }
32 public static void putsToHBase(String tableName,
33 String rowkey,
34 String cf,
35 String[] column,
36 String[] value)
37 throws Exception {
38 //获取指定HBase 数据表的操作对象
39 Table table =HbaseConnect
40 .getConnection()
41 .getTable(TableName.valueOf(tableName));
42 //通过Put 对象存储插入数据表的内容
43 Put puts =new Put(rowkey.getBytes());
44 for (int i =0;i>
2 top10CategoryList) throws Exception
3 {
4 //创建数据表top10 和列族top10_category
5 HbaseUtils.createTable("top10","top10_category");
6 //创建数组column,用于存储数据表top10 的列名
7 String[] column =
8 {"category_id","viewcount","cartcount","purchasecount"};
9 String viewcount ="";
10 String cartcount ="";
11 String purchasecount ="";
12 String category_id ="";
13 int count =0;
14 //遍历集合top10CategoryList
15 for (Tuple2 top10: top10CategoryList) {
16 count++;
17 //获取查看次数
18 viewcount =String.valueOf(top10._1.getViewCount());
19 //获取加入购物车次数
20 cartcount =String.valueOf(top10._1.getCartCount());
21 //获取购买次数
22 purchasecount =String.valueOf(top10._1.getPurchaseCount());
23 //获取品类ID
24 category_id =top10._2;
25 //创建数组value,用于存储数据表top10 的值
26 String[] value =
27 {category_id,viewcount,cartcount,purchasecount};
28 HbaseUtils.putsToHBase("top10",
29 "rowkey_top"+count,
30 "top10_category",
31 column,
32 value);
33 }
34 }
上述代码中,第28~32行,调用HBase数据库操作工具类的putToHBase()方法,用于
持久化热门品类Top10数据。putToHBase()方法包含5个参数:其中第1个参数为字符
串top10,表示数据表名称;第2个参数为字符串对象count和字符串rowkey_top,表示数
据表的行键;第3个参数为字符串top10_category,表示数据表的列族;第4个参数为数组
column,数组中的每一个元素表示数据表的列名;第5个参数为数组value,数组中的每一个
元素表示数据表的值。
在文件3-2的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList,
用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10 分析结果持久化到
HBase数据库中的数据表top10,具体代码如下。
Spark项92 目实战
1 //通过try…catch 抛出异常
2 try {
3 top10ToHbase(top10CategoryList);
4 } catch (Exception e) {
5 e.printStackTrace();
6 }
7 //关闭HBase 数据库连接
8 HbaseConnect.closeConnection();
9 //关闭JavaSparkContext 连接
10 sc.close();
3.4 运行程序
热门品类Top10分析程序编写完成后,需要在IntelliJIDEA 中将程序封装成jar包,并
上传到集群环境中,通过spark-submit将程序提交到YARN 中运行,具体步骤如下。
1.封装jar包
在IntelliJIDEA 主界面单击右侧Maven选项卡打开Maven窗口,如图3-12和图3-13
所示。
图3-12 Maven选项卡
在Maven窗口单击,展开Lifecycle目录,如图3-14所示。
双击Lifecycle目录中的package选项,IntelliJIDEA 会自动将程序封装成jar包,封装
完成后,若出现BUILDSUCCESS内容,则证明成功封装热门品类Top10分析程序为jar