使用Kafka连接器

Flink为读写Kafka主题提供了专门的Kafka连接器,支持将Kafka做为Flink流应用程序的数据源和Data Sink。Flink Kafka消费者集成了Flink的检查点机制来提供精确一次的处理语义。为了实现这一点,Flink并不完全依赖Kafka的消费者组的偏移跟踪,而是在内部跟踪和检查这些偏移。

Kafka源

Kafka源提供了一个构建类来构造KafkaSource的实例。

下面的代码片段展示了如何构建一个KafkaSource来消费主题“input-topic”的最早偏移量的消息,带有消费组“my-group”,并且只将message的值反序列化为字符串。

KafkaSource source = KafkaSource.builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

【示例】使用Kafka自带的生产者脚本向Kafka的words主题写数据。编写一个Flink流应用程序,消费Kafka中words主题的数据。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

2、在pom.xml中设置依赖。

3、创建流应用程序主类。代码如下。

Scala代码:

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * Kafka Source使用(最新 1.13 版本)
 *
 * 读取 Kafka中 words 主题的数据
 */
object KafkaSourceDemo {

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 定义Kafka数据源
    val source = KafkaSource.builder[String]
      .setBootstrapServers("192.168.190.133:9092")
      .setTopics("words")
      .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build

    env
      // 指定Kafka数据源
      .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
      .flatMap(new FlatMapFunction[String, String]() {
        @throws[Exception]
        override def flatMap(s: String, collector: Collector[String]): Unit = {
          for (word <- s.split("\\W+")) {
            collector.collect(word)
          }
        }
      })
      // 输出从Kafka words topic拉取的数据
      .print

    // 触发流程序执行
    env.execute("Flink Kafka Source")
  }
}

Java代码:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Kafka Source使用(最新 1.13 版本)
 *
 * 读取 Kafka中 words 主题的数据
 */
public class KafkaSourceDemo {

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 定义Kafka数据源
		KafkaSource<String> source = KafkaSource.<String>builder()
				.setBootstrapServers("192.168.190.133:9092")
				.setTopics("words")
				.setGroupId("group-test")
				.setStartingOffsets(OffsetsInitializer.earliest())
				.setValueOnlyDeserializer(new SimpleStringSchema())
				.build();

		env
				// 指定Kafka数据源
				.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
				// 做flatMap转换
				.flatMap(new FlatMapFunction<String, String>() {
					@Override
					public void flatMap(String s, Collector<String> collector) throws Exception {
						for(String word : s.split("\\W+")){
							collector.collect(word);
						}
					}
				})
				// 输出从Kafka words topic拉取的数据
				.print();

		// 触发流程序执行
		env.execute("Flink Kafka Source");
	}
}

4、执行程序。请按以下步骤执行。

1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

2)打开另一个终端窗口,启动Kafka服务(不要关闭)

$ ./bin/kafka-server-start.sh config/server.properties

3)打开第三个终端窗口,在Kafka中创建一个名为"words"的主题(topic)

$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words

4)查看已经创建的Topic:

$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

5)运行我们上面编写的流执行程序(相当于Kafka的消费者程序)。

6)在第三个终端窗口,执行Kafka自带的生产者脚本:

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words

7)然后随意输入一些句子,单词之间以空格分隔开。如下:

good good study
day day up

5、可以得到如下的输出结果:

8> good
8> good
8> study
8> day
8> day
8> up

Kafka Sink

Flink的Kafka生产者被称为FlinkKafkaProducer,通过它可以将记录流写入一个或多个Kafka主题,这时Kafka作为Flink流程序的Data Sink。

FlinkKafkaProducer的构造函数接受以下参数:

  • 一个默认的输出主题,事件应该在其中写入;
  • SerializationSchema / KafkaSerializationSchema用于将数据序列化到Kafka中;
  • Kafka客户端的属性。以下属性是必需的:
    • “bootstrap.servers” (用逗号分隔的Kafka broker列表)。
    • 一个容错语义。

【示例】编写一个Flink流应用程序,将数据写入到Kafka中的指定主题words。使用Kafka自带的消费者脚本来查看写入的内容。

请按以下步骤操作。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

2、在pom.xml中设置依赖。

3、创建流应用程序主类。代码如下。

Scala代码:

import java.util.Properties

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.util.Collector

/**
 * 将流数据写入到 Kafka中 的 words 主题
 */
object KafkaProducerDemo {
  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 启用检查点
    env.enableCheckpointing(50000)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.190.133:9092")

    // 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义
    val myProducer = new FlinkKafkaProducer[String](
            "words",                    // 目标topic
            new SimpleStringSchema,     // 序列化schema
            properties                  // producer配置
         )

    // 将数据写入到Kafka的"words"主题
    env
      .fromElements("good good study", "day day up")
      .flatMap(new FlatMapFunction[String, String]() {
        @throws[Exception]
        override def flatMap(s: String, out: Collector[String]): Unit = {
          for (word <- s.split("\\W+")) {
            out.collect(word)
          }
        }
      })
      .addSink(myProducer)

    // 触发流程序执行
    env.execute("Flink Streaming Scala API Skeleton")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

/**
 * 将流数据写入到 Kafka中 的 words 主题
 */
public class KafkaProcuderDemo {

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 启用检查点
		env.enableCheckpointing(50000);

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "192.168.190.133:9092");

		// 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义
		FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
				"words",                  	                  	// 目标topic
				new SimpleStringSchema(), 				// 序列化schema
				properties                  	                	// producer配置
		);

		// 将数据写入到Kafka的"words"主题
		env
				.fromElements("good good study","day day up")
				.flatMap(new FlatMapFunction<String, String>() {
					@Override
					public void flatMap(String s, Collector out) throws Exception {
						for(String word : s.split("\\W+")){
							out.collect(word);
						}
					}
				})
				.addSink(myProducer);

		// 触发流程序执行
		env.execute("Kafka Flink Producer Demo");
	}
}

默认FlinkKafkaProducer使用的是AT_LEAST_ONCE语义,这要求启用检查点。随着Flink的检查点启用,Flink Kafka消费者将消费一个主题的记录,并定期检查点所有的Kafka偏移,以及其他操作的状态。如果检查点被禁用,Kafka消费者会定期向Zookeeper提交偏移量。

4、执行程序。请按以下步骤执行。

1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

2)打开另一个终端窗口,启动Kafka服务(不要关闭)

$ ./bin/kafka-server-start.sh config/server.properties

3)打开第三个终端窗口,在Kafka中创建一个名为“words”的主题(topic)

$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words

4)查看已经创建的Topic:

$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

5)在第三个终端窗口,执行Kafka自带的消费者脚本:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words

6)运行我们上面编写的流执行程序(相当于Kafka的生产者程序)。

5、在第三个终端窗口(运行Kafka消费者脚本的窗口),可以看到如下的输出内容:

day
day
up
good
good
study

在上面的代码中,我们使用默认的Semantic.AT_LEAST_ONCE语义,这时可以简单地指定序列化模式为SimpleStringSchema。但是,如果我们要指定FlinkKafkaProducer使用EXACTLY_ONCE语义,那么就需要自定义序列化模式,它需要实现KafkaSerializationSchema接口。


《PySpark原理深入与编程实战》