第 5 章 DataStream API 实时分析是当前比较热门的数据处理技术,因为许多不同领域的数据都需 要进行实时处理、计算。随着大数据技术在各行各业的广泛应用,对海量数据 进行实时分析的需求越来越多,同时,数据处理的业务逻辑也越来越复杂。传 统的批处理方式和早期的流处理框架(如Storm)越来越难以在延迟性、吞吐量、 容错能力以及使用便捷性等方面满足业务日益严苛的要求。在这种形式下,新 型流处理框架Flink通过创造性地把现代大规模并行处理技术应用到流处理 中,极大地改善了以前的流处理框架所存在的问题。为了满足实时计算需求, Flink提供了数据流处理API,即DataStream API,它基于GoogleDataflow 模 型,支持原生数据流处理,可以让用户灵活且高效地编写流应用程序。虽然 Spark也提供了流计算的支持,但是,相比较而言,Flink在流计算上有明显优 势,核心架构和模型也更透彻和灵活一些。 本章将重点介绍如何利用DataStream API开发流式应用。首先介绍 DataStream 编程模型(包括数据源、数据转换、数据输出)和窗口的划分;其次介 绍时间概念,包括事件生成时间、事件接入时间和事件处理时间;再次介绍窗口 计算,包括窗口类型和窗口计算函数;最后介绍水位线、延迟数据处理和状态 编程。 5.1 DataStream 编程模型 Flink流处理程序的基本运行流程包括以下5个步骤。 (1)创建流处理执行环境。 (2)创建数据源。 (3)指定对接收的数据进行转换操作的逻辑。 (4)指定数据计算的输出结果方式。 (5)程序触发执行。 第(1)步中创建流处理执行环境的方式如下: val env =StreamExecutionEnvironment.getExecutionEnvironment 从上述步骤中可以看出,真正需要操作的只有3个过程:创建数据源、指定 第5章 DataStream API 1 37 对接收的数据进行转换操作的逻辑、指定数据计算的输出结果方式。为了支持这3个过 程的操作,Flink提供了一套功能完整的数据流处理API,即DataStream API。Datastream API主要包含3个模块:数据源、数据转换和数据输出。数据源模块(Source)定义了输 入接入功能,可以将各种数据源接入Flink系统中,并将接入数据转换成DataStream 数 据集。数据转换模块(Transformation)定义了对DataStream 数据集执行的各种转换操 作,如map、flatMap、filter、reduce等。数据输出模块(Sink)负责把数据输出到文件或其 他系统中(如Kafka)。 此外,需要在pom.xml文件中引入flink-streaming-scala_2.12依赖库,具体如下: org.apache.flink flink-streaming-scala_2.12 1.11.2 5.1.1 数据源 数据源模块定义了DataStream API中的数据输入操作,Flink将数据源主要分为两 种类型:内置数据源和第三方数据源。内置数据源包括文件数据源、Socket数据源和集 合数据源。第三方数据源包括Kafka、AmazonKinesisStreams、RabbitMQ、NiFi等。 1.内置数据源 内置数据源在Flink系统内部已经实现,不需要引入其他依赖库,用户可以直接调用 相关方法使用。 1)文件数据源 Flink支持从文件中读取数据,它会逐行读取数据并将其转换成DataStream 返回。 可以使用readTextFile(path)方法直接读取文本文件,其中,path表示文本文件的路径。 以下是一个具体实例: package cn.edu.xmu.dblab import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object FileSource{ def main(args: Array[String]): Unit ={ //获取执行环境 val env =StreamExecutionEnvironment.getExecutionEnvironment //加载或创建数据源 val dataStream =env.readTextFile("file:///usr/local/flink/README.txt") 1 38 Flink编程基础(Scala版) //打印输出 dataStream.print() //程序触发执行 env.execute() } } 2)Socket数据源 Flink可以通过调用socketTextStream 方法从Socket端口中接入数据,在调用 socketTextStream 方法时,一般需要提供两个参数,即IP地址和端口,下面是一个实例: val socketDataStream =env.socketTextStream(“localhost”,9999) 4.3.3节中的实例已经演示了Socket数据源的应用场景,这里不再赘述。 3)集合数据源 Flink可以直接将Java或Scala程序中集合类转换成DataStream 数据集,这里给出 两个具体实例。 使用fromElements方法从元素集合中创建DataStream 数据集,语句如下: val dataStream =env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L)) 使用fromCollection方法从列表创建DataStream 数据集,语句如下: val dataStream =env.fromCollection(List(1,2,3)) 2.Kafka数据源 1)Kafka简介 Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka, 这里介绍一下Kafka的相关概念。 (1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。 (2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。 物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个 Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于 何处。 (3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。 (4)Producer:负责发布消息到KafkaBroker。 (5)Consumer:消息消费者,向KafkaBroker读取消息的客户端。 (6)ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,可为每个 Consumer指定GroupName,若不指定GroupName,则属于默认的Group。 2)Kafka准备工作 访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定 第5章 DataStream API 1 39 版本kafka_2.12-2.6.0.tgz,或者直接到本教程官网“下载专区”栏目的“软件”目录中下载 安装文件kafka_2.12-2.6.0.tgz。下载完安装文件以后,就可以安装到Linux系统中,具体 安装过程可以参照本教程官网“实验指南”栏目的“Kafka的安装和使用方法”。为了让 Flink应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意, Kafka版本号一定要和自己计算机上已经安装的Scala版本号一致才可以。本教程安装 的Flink版本号是1.11.2,Scala版本号是2.12,所以,一定要选择Kafka版本号是2.12开 头的。例如,到Kafka官网中,可以下载安装文件kafka_2.12-2.6.0.tgz,前面的2.12就是 支持的Scala版本号,后面的2.6.0是Kafka自身的版本号。 首先需要启动Kafka,登录Linux系统(本教程统一使用hadoop用户登录),打开一 个终端,输入下面命令启动Zookeeper服务: $cd /usr/local/kafka $./bin/zookeeper-server-start.sh config/zookeeper.properties 注意,执行上面命令以后,终端窗口会返回一堆信息,然后停住不动,没有回到Shell 命令提示符状态,这时,不要误以为是死机了,而是Zookeeper服务器已经启动,正在处于 服务状态。所以,不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。 另外打开第二个终端,然后输入下面命令启动Kafka服务: $cd /usr/local/kafka $./bin/kafka-server-start.sh config/server.properties 同样,执行上面命令以后,终端窗口会返回一堆信息,然后停住不动,没有回到Shell 命令提示符状态,这时,同样不要误以为是死机了,而是Kafka服务器已经启动,正在处于 服务状态。所以,不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。 当然,还有一种方式是采用下面加了“&”的命令: $cd /usr/local/kafka $bin/kafka-server-start.sh config/server.properties & 这样,Kafka就会在后台运行,即使关闭了这个终端。不过,采用这种方式时,有时候 我们常忘记还有Kafka在后台运行,所以,建议暂时不要用这种命令形式。 下面先测试一下Kafka是否可以正常使用。再打开第三个终端,然后输入下面命令 创建一个自定义名称为wordsendertest的Topic: $cd /usr/local/kafka $./bin/kafka-topics.sh --create --zookeeper localhost: 2181 \ >--replication-factor 1 --partitions 1 --topic wordsendertest #这个Topic 叫wordsendertest, 2181 是Zookeeper 默认的端口号, - - partitions 是 Topic 里面的分区数,--replication-factor 是备份的数量,在Kafka 集群中使用,由于这里 是单机版,所以不用备份 #可以用list 列出所有创建的Topic,来查看上面创建的Topic 是否存在 $./bin/kafka-topics.sh --list --zookeeper localhost: 2181 这个名称为wordsendertest的Topic,就是专门负责采集发送一些单词的。 1 40 Flink编程基础(Scala版) 下面用生产者(Producer)来产生一些数据,在当前终端内继续输入下面命令: $./bin/kafka-console-producer.sh --broker-list localhost: 9092 \ >--topic wordsendertest 上面命令执行后,就可以在当前终端(假设名称为“生产者终端”)内输入一些英文 单词: hello hadoop hello spark 这些单词就是数据源,会被Kafka捕捉到以后发送给消费者。现在可以启动一个消 费者,来查看刚才生产者产生的数据。另外打开第四个终端,输入下面命令: $cd /usr/local/kafka $./bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 \ >--topic wordsendertest --from-beginning 可以看到,屏幕上会显示如下结果,也就是刚才在另外一个终端里面输入的内容: hello hadoop hello spark 注意,到这里为止,前面打开的所有Linux终端窗口都不要关闭,以供后面步骤继续 使用。 3)编写Flink程序使用Kafka数据源 在~/flinkapp/src/main/scala目录下新建代码文件KafkaWordCount.scala,内容 如下: package cn.edu.xmu.dblab import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time object KafkaWordCount { def main(args: Array[String]): Unit ={ val kafkaProps =new Properties() //Kafka 的一些属性 kafkaProps.setProperty("bootstrap.servers", "localhost: 9092") //所在的消费组 kafkaProps.setProperty("group.id", "group1") //获取当前的执行环境 第5章 DataStream API 1 41 val env =StreamExecutionEnvironment.getExecutionEnvironment //创建Kafka 的消费者,wordsendertest 是要消费的Topic val kafkaSource = new FlinkKafkaConsumer [String]( " wordsendertest ", new SimpleStringSchema,kafkaProps) //设置从最新的offset 开始消费 kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //绑定数据源 val stream =env.addSource(kafkaSource) //设置转换操作逻辑 val text = stream. flatMap { _. toLowerCase ( ). split ( " \ \ W +") filter { _. nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) //打印输出 text.print() //程序触发执行 env.execute("Kafka Word Count") } } 在这个KafkaWordCount程序中,FlinkKafkaConsumer的构造函数有3个参数。第 一个参数定义的是读入的目标Topic的名称。第二个参数是一个DeserializationSchema 或KeyedDeserializationSchema对象。Kafka中的消息是以纯字节消息存储的,所以需要 被反序列化为Java或Scala对象。这里用到的SimpleStringSchema对象是一个内置的 DeserializationSchema对象,可以将字节数据反序列化为一个String对象。第三个参数 是一个Properties对象,用于配置Kafka的客户端,该对象至少要包含两个条目,即 bootstrap.servers与group.id。 另外,在FlinkKafkaConsumer开始读Kafka消息时,可以配置它的读起始位置,有 以下4种。 (1)setStartFromGroupOffsets()。默认读取上次保存的offset信息,若是第一次启 动应用,读取不到上次的offset信息,则会根据参数auto.offset.reset的值来进行数据 读取。 (2)setStartFromEarliest()。从最早的数据开始进行消费,忽略存储的offset信息。 1 42 Flink编程基础(Scala版) (3)setStartFromLatest()。从最新的数据进行消费,忽略存储的offset信息。 (4)setStartFromSpecificOffsets(Map)。从指定位 置进行消费。 KafkaWordCount程序中,“设置转换操作逻辑”部分的代码用于实现词频统计,里面 用到了flatMap、map、keyBy、timeWindow和sum 操作。 下面在~/flinkapp目录下再新建一个pom.xml文件,内容如下: cn.edu.xmu.dblab wordcount 4.0.0 WordCount jar 1.0 alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ org.apache.flink flink-scala_2.12 1.11.2 org.apache.flink flink-streaming-scala_2.12 1.11.2 org.apache.flink flink-clients_2.12 1.11.2 org.apache.flink flink-connector-kafka_2.12 1.11.2 第5章 DataStream API 1 43 net.alchim31.maven scala-maven-plugin 3.4.6 compile org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single 在这个pom.xml文件中,添加了一个新的依赖flink-connector-kafka_2.12,用于实现 Flink和Kafka之间的连接。 使用Maven工具对KafkaWordCount程序进行编译打包,打包成功以后,新建一个 Linux终端,执行如下命令运行程序(确认已经启动Flink): $cd ~/flinkapp $/usr/local/flink/bin/flink run \ >--class cn.edu.xmu.dblab.KafkaWordCount \ 1 44 Flink编程基础(Scala版) >./target/wordcount-1.0-jar-with-dependencies.jar 注意,运行KafkaWordCount程序需要依赖外部JAR 包(用于支持Flink和Kafka 之间的连接),因此,这里需要提交wordcount-1.0-jar-with-dependencies.jar,而不是提交 wordcount-1.0.jar。 在前面已经打开的“生产者终端”内,继续输入以下内容(每输入一行后按Enter键): hello wuhan hello xiamen 然后,新建一个Linux终端,执行如下命令: $cd /usr/local/flink/log $tail -f flink*.out 可以看到屏幕上会输出如下信息: ==>flink-hadoop-taskexecutor-0-ubuntu.out <== (hello,1) (wuhan,1) (hello,1) (xiamen,1) 上述信息就是KafkaWordCount程序的词频统计结果。 3.HDFS数据源 HDFS在大数据领域具有广泛的应用,Flink也经常需要读取来自HDFS的数据。 为了演示方便,需要在Linux系统中提前启动HDFS(假设使用hadoop 用户名登录 Linux系统,Hadoop系统版本为3.1.3),并在HDFS的/user/hadoop目录中创建一个文 本文件word.txt,里面包含如下3行内容: hello hadoop hello spark hello flink 在~/flinkapp/src/main/scala 目录下新建代码文件ReadHDFSFile.scala,内容 如下: package cn.edu.xmu.dblab import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object ReadHDFSFile{ def main(args: Array[String]): Unit ={ //获取执行环境 第5章 DataStream API 1 45 val env =StreamExecutionEnvironment.getExecutionEnvironment //加载或创建数据源 val dataStream = env. readTextFile ( " hdfs://localhost: 9000/user/hadoop/ word.txt") //打印输出 dataStream.print() //程序触发执行 env.execute() } } 为了让Flink能够支持访问HDFS,需要在pom.xml中添加依赖hadoop-common和 hadoop-client,具体内容如下: cn.edu.xmu.dblab wordcount 4.0.0 WordCount jar 1.0 alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ org.apache.flink flink-scala_2.12 1.11.2 org.apache.flink flink-streaming-scala_2.12 1.11.2 org.apache.flink flink-clients_2.12