使用Kafka连接器
Flink为读写Kafka主题提供了专门的Kafka连接器,支持将Kafka做为Flink流应用程序的数据源和Data Sink。Flink Kafka消费者集成了Flink的检查点机制来提供精确一次的处理语义。为了实现这一点,Flink并不完全依赖Kafka的消费者组的偏移跟踪,而是在内部跟踪和检查这些偏移。
Kafka源
Kafka源提供了一个构建类来构造KafkaSource的实例。
下面的代码片段展示了如何构建一个KafkaSource来消费主题“input-topic”的最早偏移量的消息,带有消费组“my-group”,并且只将message的值反序列化为字符串。
KafkaSourcesource = KafkaSource. builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
【示例】使用Kafka自带的生产者脚本向Kafka的words主题写数据。编写一个Flink流应用程序,消费Kafka中words主题的数据。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中设置依赖。
3、创建流应用程序主类。代码如下。
Scala代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * Kafka Source使用(最新 1.13 版本) * * 读取 Kafka中 words 主题的数据 */ object KafkaSourceDemo { def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 定义Kafka数据源 val source = KafkaSource.builder[String] .setBootstrapServers("192.168.190.133:9092") .setTopics("words") .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema) .build env // 指定Kafka数据源 .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source") .flatMap(new FlatMapFunction[String, String]() { @throws[Exception] override def flatMap(s: String, collector: Collector[String]): Unit = { for (word <- s.split("\\W+")) { collector.collect(word) } } }) // 输出从Kafka words topic拉取的数据 .print // 触发流程序执行 env.execute("Flink Kafka Source") } }
Java代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Kafka Source使用(最新 1.13 版本) * * 读取 Kafka中 words 主题的数据 */ public class KafkaSourceDemo { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 定义Kafka数据源 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.190.133:9092") .setTopics("words") .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env // 指定Kafka数据源 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") // 做flatMap转换 .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for(String word : s.split("\\W+")){ collector.collect(word); } } }) // 输出从Kafka words topic拉取的数据 .print(); // 触发流程序执行 env.execute("Flink Kafka Source"); } }
4、执行程序。请按以下步骤执行。
1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
2)打开另一个终端窗口,启动Kafka服务(不要关闭)
$ ./bin/kafka-server-start.sh config/server.properties
3)打开第三个终端窗口,在Kafka中创建一个名为"words"的主题(topic)
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
4)查看已经创建的Topic:
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5)运行我们上面编写的流执行程序(相当于Kafka的消费者程序)。
6)在第三个终端窗口,执行Kafka自带的生产者脚本:
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words
7)然后随意输入一些句子,单词之间以空格分隔开。如下:
good good study day day up
5、可以得到如下的输出结果:
8> good 8> good 8> study 8> day 8> day 8> up
Kafka Sink
Flink的Kafka生产者被称为FlinkKafkaProducer,通过它可以将记录流写入一个或多个Kafka主题,这时Kafka作为Flink流程序的Data Sink。
FlinkKafkaProducer的构造函数接受以下参数:
- 一个默认的输出主题,事件应该在其中写入;
- SerializationSchema / KafkaSerializationSchema用于将数据序列化到Kafka中;
- Kafka客户端的属性。以下属性是必需的:
- “bootstrap.servers” (用逗号分隔的Kafka broker列表)。
- 一个容错语义。
【示例】编写一个Flink流应用程序,将数据写入到Kafka中的指定主题words。使用Kafka自带的消费者脚本来查看写入的内容。
请按以下步骤操作。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中设置依赖。
3、创建流应用程序主类。代码如下。
Scala代码:
import java.util.Properties import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer import org.apache.flink.util.Collector /** * 将流数据写入到 Kafka中 的 words 主题 */ object KafkaProducerDemo { def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 启用检查点 env.enableCheckpointing(50000) val properties = new Properties() properties.setProperty("bootstrap.servers", "192.168.190.133:9092") // 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义 val myProducer = new FlinkKafkaProducer[String]( "words", // 目标topic new SimpleStringSchema, // 序列化schema properties // producer配置 ) // 将数据写入到Kafka的"words"主题 env .fromElements("good good study", "day day up") .flatMap(new FlatMapFunction[String, String]() { @throws[Exception] override def flatMap(s: String, out: Collector[String]): Unit = { for (word <- s.split("\\W+")) { out.collect(word) } } }) .addSink(myProducer) // 触发流程序执行 env.execute("Flink Streaming Scala API Skeleton") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.clients.producer.ProducerRecord; import java.nio.charset.StandardCharsets; import java.util.Properties; /** * 将流数据写入到 Kafka中 的 words 主题 */ public class KafkaProcuderDemo { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 启用检查点 env.enableCheckpointing(50000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.190.133:9092"); // 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义 FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>( "words", // 目标topic new SimpleStringSchema(), // 序列化schema properties // producer配置 ); // 将数据写入到Kafka的"words"主题 env .fromElements("good good study","day day up") .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collectorout) throws Exception { for(String word : s.split("\\W+")){ out.collect(word); } } }) .addSink(myProducer); // 触发流程序执行 env.execute("Kafka Flink Producer Demo"); } }
默认FlinkKafkaProducer使用的是AT_LEAST_ONCE语义,这要求启用检查点。随着Flink的检查点启用,Flink Kafka消费者将消费一个主题的记录,并定期检查点所有的Kafka偏移,以及其他操作的状态。如果检查点被禁用,Kafka消费者会定期向Zookeeper提交偏移量。
4、执行程序。请按以下步骤执行。
1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
2)打开另一个终端窗口,启动Kafka服务(不要关闭)
$ ./bin/kafka-server-start.sh config/server.properties
3)打开第三个终端窗口,在Kafka中创建一个名为“words”的主题(topic)
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
4)查看已经创建的Topic:
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5)在第三个终端窗口,执行Kafka自带的消费者脚本:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words
6)运行我们上面编写的流执行程序(相当于Kafka的生产者程序)。
5、在第三个终端窗口(运行Kafka消费者脚本的窗口),可以看到如下的输出内容:
day day up good good study
在上面的代码中,我们使用默认的Semantic.AT_LEAST_ONCE语义,这时可以简单地指定序列化模式为SimpleStringSchema。但是,如果我们要指定FlinkKafkaProducer使用EXACTLY_ONCE语义,那么就需要自定义序列化模式,它需要实现KafkaSerializationSchema