第
5
章
DataStream API
实时分析是当前比较热门的数据处理技术,因为许多不同领域的数据都需
要进行实时处理、计算。随着大数据技术在各行各业的广泛应用,对海量数据
进行实时分析的需求越来越多,同时,数据处理的业务逻辑也越来越复杂。传
统的批处理方式和早期的流处理框架(如Storm)越来越难以在延迟性、吞吐量、
容错能力以及使用便捷性等方面满足业务日益严苛的要求。在这种形式下,新
型流处理框架Flink通过创造性地把现代大规模并行处理技术应用到流处理
中,极大地改善了以前的流处理框架所存在的问题。为了满足实时计算需求,
Flink提供了数据流处理API,即DataStream API,它基于GoogleDataflow 模
型,支持原生数据流处理,可以让用户灵活且高效地编写流应用程序。虽然
Spark也提供了流计算的支持,但是,相比较而言,Flink在流计算上有明显优
势,核心架构和模型也更透彻和灵活一些。
本章将重点介绍如何利用DataStream API开发流式应用。首先介绍
DataStream 编程模型(包括数据源、数据转换、数据输出)和窗口的划分;其次介
绍时间概念,包括事件生成时间、事件接入时间和事件处理时间;再次介绍窗口
计算,包括窗口类型和窗口计算函数;最后介绍水位线、延迟数据处理和状态
编程。
5.1 DataStream 编程模型
Flink流处理程序的基本运行流程包括以下5个步骤。
(1)创建流处理执行环境。
(2)创建数据源。
(3)指定对接收的数据进行转换操作的逻辑。
(4)指定数据计算的输出结果方式。
(5)程序触发执行。
第(1)步中创建流处理执行环境的方式如下:
val env =StreamExecutionEnvironment.getExecutionEnvironment
从上述步骤中可以看出,真正需要操作的只有3个过程:创建数据源、指定
第5章 DataStream API 1 37
对接收的数据进行转换操作的逻辑、指定数据计算的输出结果方式。为了支持这3个过
程的操作,Flink提供了一套功能完整的数据流处理API,即DataStream API。Datastream
API主要包含3个模块:数据源、数据转换和数据输出。数据源模块(Source)定义了输
入接入功能,可以将各种数据源接入Flink系统中,并将接入数据转换成DataStream 数
据集。数据转换模块(Transformation)定义了对DataStream 数据集执行的各种转换操
作,如map、flatMap、filter、reduce等。数据输出模块(Sink)负责把数据输出到文件或其
他系统中(如Kafka)。
此外,需要在pom.xml文件中引入flink-streaming-scala_2.12依赖库,具体如下:
org.apache.flink
flink-streaming-scala_2.12
1.11.2
5.1.1 数据源
数据源模块定义了DataStream API中的数据输入操作,Flink将数据源主要分为两
种类型:内置数据源和第三方数据源。内置数据源包括文件数据源、Socket数据源和集
合数据源。第三方数据源包括Kafka、AmazonKinesisStreams、RabbitMQ、NiFi等。
1.内置数据源
内置数据源在Flink系统内部已经实现,不需要引入其他依赖库,用户可以直接调用
相关方法使用。
1)文件数据源
Flink支持从文件中读取数据,它会逐行读取数据并将其转换成DataStream 返回。
可以使用readTextFile(path)方法直接读取文本文件,其中,path表示文本文件的路径。
以下是一个具体实例:
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FileSource{
def main(args: Array[String]): Unit ={
//获取执行环境
val env =StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源
val dataStream =env.readTextFile("file:///usr/local/flink/README.txt")
1 38 Flink编程基础(Scala版)
//打印输出
dataStream.print()
//程序触发执行
env.execute()
}
}
2)Socket数据源
Flink可以通过调用socketTextStream 方法从Socket端口中接入数据,在调用
socketTextStream 方法时,一般需要提供两个参数,即IP地址和端口,下面是一个实例:
val socketDataStream =env.socketTextStream(“localhost”,9999)
4.3.3节中的实例已经演示了Socket数据源的应用场景,这里不再赘述。
3)集合数据源
Flink可以直接将Java或Scala程序中集合类转换成DataStream 数据集,这里给出
两个具体实例。
使用fromElements方法从元素集合中创建DataStream 数据集,语句如下:
val dataStream =env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
使用fromCollection方法从列表创建DataStream 数据集,语句如下:
val dataStream =env.fromCollection(List(1,2,3))
2.Kafka数据源
1)Kafka简介
Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,
这里介绍一下Kafka的相关概念。
(1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个
Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于
何处。
(3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。
(4)Producer:负责发布消息到KafkaBroker。
(5)Consumer:消息消费者,向KafkaBroker读取消息的客户端。
(6)ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,可为每个
Consumer指定GroupName,若不指定GroupName,则属于默认的Group。
2)Kafka准备工作
访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定
第5章 DataStream API 1 39
版本kafka_2.12-2.6.0.tgz,或者直接到本教程官网“下载专区”栏目的“软件”目录中下载
安装文件kafka_2.12-2.6.0.tgz。下载完安装文件以后,就可以安装到Linux系统中,具体
安装过程可以参照本教程官网“实验指南”栏目的“Kafka的安装和使用方法”。为了让
Flink应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意,
Kafka版本号一定要和自己计算机上已经安装的Scala版本号一致才可以。本教程安装
的Flink版本号是1.11.2,Scala版本号是2.12,所以,一定要选择Kafka版本号是2.12开
头的。例如,到Kafka官网中,可以下载安装文件kafka_2.12-2.6.0.tgz,前面的2.12就是
支持的Scala版本号,后面的2.6.0是Kafka自身的版本号。
首先需要启动Kafka,登录Linux系统(本教程统一使用hadoop用户登录),打开一
个终端,输入下面命令启动Zookeeper服务:
$cd /usr/local/kafka
$./bin/zookeeper-server-start.sh config/zookeeper.properties
注意,执行上面命令以后,终端窗口会返回一堆信息,然后停住不动,没有回到Shell
命令提示符状态,这时,不要误以为是死机了,而是Zookeeper服务器已经启动,正在处于
服务状态。所以,不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。
另外打开第二个终端,然后输入下面命令启动Kafka服务:
$cd /usr/local/kafka
$./bin/kafka-server-start.sh config/server.properties
同样,执行上面命令以后,终端窗口会返回一堆信息,然后停住不动,没有回到Shell
命令提示符状态,这时,同样不要误以为是死机了,而是Kafka服务器已经启动,正在处于
服务状态。所以,不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。
当然,还有一种方式是采用下面加了“&”的命令:
$cd /usr/local/kafka
$bin/kafka-server-start.sh config/server.properties &
这样,Kafka就会在后台运行,即使关闭了这个终端。不过,采用这种方式时,有时候
我们常忘记还有Kafka在后台运行,所以,建议暂时不要用这种命令形式。
下面先测试一下Kafka是否可以正常使用。再打开第三个终端,然后输入下面命令
创建一个自定义名称为wordsendertest的Topic:
$cd /usr/local/kafka
$./bin/kafka-topics.sh --create --zookeeper localhost: 2181 \
>--replication-factor 1 --partitions 1 --topic wordsendertest
#这个Topic 叫wordsendertest, 2181 是Zookeeper 默认的端口号, - - partitions 是
Topic 里面的分区数,--replication-factor 是备份的数量,在Kafka 集群中使用,由于这里
是单机版,所以不用备份
#可以用list 列出所有创建的Topic,来查看上面创建的Topic 是否存在
$./bin/kafka-topics.sh --list --zookeeper localhost: 2181
这个名称为wordsendertest的Topic,就是专门负责采集发送一些单词的。
1 40 Flink编程基础(Scala版)
下面用生产者(Producer)来产生一些数据,在当前终端内继续输入下面命令:
$./bin/kafka-console-producer.sh --broker-list localhost: 9092 \
>--topic wordsendertest
上面命令执行后,就可以在当前终端(假设名称为“生产者终端”)内输入一些英文
单词:
hello hadoop
hello spark
这些单词就是数据源,会被Kafka捕捉到以后发送给消费者。现在可以启动一个消
费者,来查看刚才生产者产生的数据。另外打开第四个终端,输入下面命令:
$cd /usr/local/kafka
$./bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 \
>--topic wordsendertest --from-beginning
可以看到,屏幕上会显示如下结果,也就是刚才在另外一个终端里面输入的内容:
hello hadoop
hello spark
注意,到这里为止,前面打开的所有Linux终端窗口都不要关闭,以供后面步骤继续
使用。
3)编写Flink程序使用Kafka数据源
在~/flinkapp/src/main/scala目录下新建代码文件KafkaWordCount.scala,内容
如下:
package cn.edu.xmu.dblab
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaWordCount {
def main(args: Array[String]): Unit ={
val kafkaProps =new Properties()
//Kafka 的一些属性
kafkaProps.setProperty("bootstrap.servers", "localhost: 9092")
//所在的消费组
kafkaProps.setProperty("group.id", "group1")
//获取当前的执行环境
第5章 DataStream API 1 41
val env =StreamExecutionEnvironment.getExecutionEnvironment
//创建Kafka 的消费者,wordsendertest 是要消费的Topic
val kafkaSource = new FlinkKafkaConsumer [String]( " wordsendertest ", new
SimpleStringSchema,kafkaProps)
//设置从最新的offset 开始消费
kafkaSource.setStartFromLatest()
//自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)
//绑定数据源
val stream =env.addSource(kafkaSource)
//设置转换操作逻辑
val text = stream. flatMap { _. toLowerCase ( ). split ( " \ \ W +") filter { _.
nonEmpty} }
.map{(_,1)}
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
//打印输出
text.print()
//程序触发执行
env.execute("Kafka Word Count")
}
}
在这个KafkaWordCount程序中,FlinkKafkaConsumer的构造函数有3个参数。第
一个参数定义的是读入的目标Topic的名称。第二个参数是一个DeserializationSchema
或KeyedDeserializationSchema对象。Kafka中的消息是以纯字节消息存储的,所以需要
被反序列化为Java或Scala对象。这里用到的SimpleStringSchema对象是一个内置的
DeserializationSchema对象,可以将字节数据反序列化为一个String对象。第三个参数
是一个Properties对象,用于配置Kafka的客户端,该对象至少要包含两个条目,即
bootstrap.servers与group.id。
另外,在FlinkKafkaConsumer开始读Kafka消息时,可以配置它的读起始位置,有
以下4种。
(1)setStartFromGroupOffsets()。默认读取上次保存的offset信息,若是第一次启
动应用,读取不到上次的offset信息,则会根据参数auto.offset.reset的值来进行数据
读取。
(2)setStartFromEarliest()。从最早的数据开始进行消费,忽略存储的offset信息。
1 42 Flink编程基础(Scala版)
(3)setStartFromLatest()。从最新的数据进行消费,忽略存储的offset信息。
(4)setStartFromSpecificOffsets(Map)。从指定位
置进行消费。
KafkaWordCount程序中,“设置转换操作逻辑”部分的代码用于实现词频统计,里面
用到了flatMap、map、keyBy、timeWindow和sum 操作。
下面在~/flinkapp目录下再新建一个pom.xml文件,内容如下:
cn.edu.xmu.dblab
wordcount
4.0.0
WordCount
jar
1.0
alimaven
aliyun maven
http://maven.aliyun.com/nexus/content/groups/public/
org.apache.flink
flink-scala_2.12
1.11.2
org.apache.flink
flink-streaming-scala_2.12
1.11.2
org.apache.flink
flink-clients_2.12
1.11.2
org.apache.flink
flink-connector-kafka_2.12
1.11.2
第5章 DataStream API 1 43
net.alchim31.maven
scala-maven-plugin
3.4.6
compile
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
在这个pom.xml文件中,添加了一个新的依赖flink-connector-kafka_2.12,用于实现
Flink和Kafka之间的连接。
使用Maven工具对KafkaWordCount程序进行编译打包,打包成功以后,新建一个
Linux终端,执行如下命令运行程序(确认已经启动Flink):
$cd ~/flinkapp
$/usr/local/flink/bin/flink run \
>--class cn.edu.xmu.dblab.KafkaWordCount \
1 44 Flink编程基础(Scala版)
>./target/wordcount-1.0-jar-with-dependencies.jar
注意,运行KafkaWordCount程序需要依赖外部JAR 包(用于支持Flink和Kafka
之间的连接),因此,这里需要提交wordcount-1.0-jar-with-dependencies.jar,而不是提交
wordcount-1.0.jar。
在前面已经打开的“生产者终端”内,继续输入以下内容(每输入一行后按Enter键):
hello wuhan
hello xiamen
然后,新建一个Linux终端,执行如下命令:
$cd /usr/local/flink/log
$tail -f flink*.out
可以看到屏幕上会输出如下信息:
==>flink-hadoop-taskexecutor-0-ubuntu.out <==
(hello,1)
(wuhan,1)
(hello,1)
(xiamen,1)
上述信息就是KafkaWordCount程序的词频统计结果。
3.HDFS数据源
HDFS在大数据领域具有广泛的应用,Flink也经常需要读取来自HDFS的数据。
为了演示方便,需要在Linux系统中提前启动HDFS(假设使用hadoop 用户名登录
Linux系统,Hadoop系统版本为3.1.3),并在HDFS的/user/hadoop目录中创建一个文
本文件word.txt,里面包含如下3行内容:
hello hadoop
hello spark
hello flink
在~/flinkapp/src/main/scala 目录下新建代码文件ReadHDFSFile.scala,内容
如下:
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object ReadHDFSFile{
def main(args: Array[String]): Unit ={
//获取执行环境
第5章 DataStream API 1 45
val env =StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源
val dataStream = env. readTextFile ( " hdfs://localhost: 9000/user/hadoop/
word.txt")
//打印输出
dataStream.print()
//程序触发执行
env.execute()
}
}
为了让Flink能够支持访问HDFS,需要在pom.xml中添加依赖hadoop-common和
hadoop-client,具体内容如下:
cn.edu.xmu.dblab
wordcount
4.0.0
WordCount
jar
1.0
alimaven
aliyun maven
http://maven.aliyun.com/nexus/content/groups/public/
org.apache.flink
flink-scala_2.12
1.11.2
org.apache.flink
flink-streaming-scala_2.12
1.11.2
org.apache.flink
flink-clients_2.12