创建Kafka生产者和消费者程序_使用Scala API
在本节课中,我们将学习如何使用Scala API编写Kafka生产者(发送消息到Kafka主题)和Kafka消费者(从主题读取消息)。Kafka生产者以记录record的形式发送消息到Kafka主题,每个记录record是一个键值对连同主题名称一起,消费者从一个主题接收消息。
1. 创建Maven项目,添加依赖
首先创建一个Maven项目,并修改pom.xml文件,添加对kafka-clients的依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
2. 编写Kafka生产者程序
这个Kafka Producer scala示例将消息作为Record发布到一个主题。Record是一个键-值对,其中键是可选的,值是必须的。在这个例子中,键和值都是字符串,因此我们使用StringSerializer。
创建一个名为KafkaProducerApp 的Scala object,编辑代码如下:
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object KafkaProducerApp { def main(args: Array[String]): Unit = { val props:Properties = new Properties() props.put("bootstrap.servers","xueai8:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("acks","all") val producer = new KafkaProducer[String, String](props) val topic = "my-example-topic" try { for (i <- 0 to 15) { val record = new ProducerRecord[String, String](topic, i.toString, "xueai8.com " + i) val metadata = producer.send(record) printf(s"sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(), record.value(), metadata.get().partition(), metadata.get().offset()) } }catch{ case e:Exception => e.printStackTrace() }finally { producer.close() } } }
KafkaProducer的send方法返回我们能找到的元数据,哪个分区消息已写入并偏移。
3. 编写Kafka消费者程序
这个Kafka Consumer scala示例订阅一个主题并接收到一个到达主题的消息(记录record)。此消息包含键、值、分区和偏移量。Kafka中的所有消息都是序列化的,因此,消费者应该使用反序列化器来转换为适当的数据类型。这里我们对键和值都使用了StringDeserializer。
import java.util.Properties import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.KafkaConsumer object KafkaConsumerApp { def main(args: Array[String]): Unit = { val props:Properties = new Properties() props.put("group.id", "test") props.put("bootstrap.servers","xueai8:9092") props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") val consumer = new KafkaConsumer(props) val topics = List("my-example-topic") try { consumer.subscribe(topics.asJava) // 订阅主题 while (true) { val records = consumer.poll(10) // 轮询 for (record <- records.asScala) { println("Topic: " + record.topic() + ",Key: " + record.key() + ",Value: " + record.value() + ", Offset: " + record.offset() + ", Partition: " + record.partition()) } } }catch{ case e:Exception => e.printStackTrace() }finally { consumer.close() } } }
4. 执行测试
请按以下步骤执行。
4.1)启动 Zookeeper
在第1个终端窗口,执行以下命令,启动Zookeeper:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
4.2)启动 Zookeeper
在第2个终端窗口,执行以下命令,启动Kafka:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
4.3)创建topic主题 my-example-topic
在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create \ --replication-factor 1 \ --partitions 1 \ --topic my-example-topic \ --zookeeper localhost:2181 # 查看 $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
4.4)在IDE中执行程序KafkaConsumerApp.scala,Kafka消费者程序开始轮询my-example-topic主题。
4.5)在IDE中运行Kafka生产者程序KafkaProducerApp.scala,输出内容如下:
sent record(key=0 value=xueai8.com 0) meta(partition=0, offset=15) sent record(key=1 value=xueai8.com 1) meta(partition=0, offset=16) sent record(key=2 value=xueai8.com 2) meta(partition=0, offset=17) sent record(key=3 value=xueai8.com 3) meta(partition=0, offset=18) sent record(key=4 value=xueai8.com 4) meta(partition=0, offset=19) sent record(key=5 value=xueai8.com 5) meta(partition=0, offset=20) sent record(key=6 value=xueai8.com 6) meta(partition=0, offset=21) sent record(key=7 value=xueai8.com 7) meta(partition=0, offset=22) sent record(key=8 value=xueai8.com 8) meta(partition=0, offset=23) sent record(key=9 value=xueai8.com 9) meta(partition=0, offset=24) sent record(key=10 value=xueai8.com 10) meta(partition=0, offset=25) sent record(key=11 value=xueai8.com 11) meta(partition=0, offset=26) sent record(key=12 value=xueai8.com 12) meta(partition=0, offset=27) sent record(key=13 value=xueai8.com 13) meta(partition=0, offset=28) sent record(key=14 value=xueai8.com 14) meta(partition=0, offset=29) sent record(key=15 value=xueai8.com 15) meta(partition=0, offset=30)
4.6)回到KafkaConsumerApp.scala消费者程序运行的控制台窗口,可以看到这个消费者程序输出从my-example-topic主题收到的消息,如下所示:
Topic: my-example-topic,Key: 0,Value: xueai8.com 0, Offset: 31, Partition: 0 Topic: my-example-topic,Key: 1,Value: xueai8.com 1, Offset: 32, Partition: 0 Topic: my-example-topic,Key: 2,Value: xueai8.com 2, Offset: 33, Partition: 0 Topic: my-example-topic,Key: 3,Value: xueai8.com 3, Offset: 34, Partition: 0 Topic: my-example-topic,Key: 4,Value: xueai8.com 4, Offset: 35, Partition: 0 Topic: my-example-topic,Key: 5,Value: xueai8.com 5, Offset: 36, Partition: 0 Topic: my-example-topic,Key: 6,Value: xueai8.com 6, Offset: 37, Partition: 0 Topic: my-example-topic,Key: 7,Value: xueai8.com 7, Offset: 38, Partition: 0 Topic: my-example-topic,Key: 8,Value: xueai8.com 8, Offset: 39, Partition: 0 Topic: my-example-topic,Key: 9,Value: xueai8.com 9, Offset: 40, Partition: 0 Topic: my-example-topic,Key: 10,Value: xueai8.com 10, Offset: 41, Partition: 0 Topic: my-example-topic,Key: 11,Value: xueai8.com 11, Offset: 42, Partition: 0 Topic: my-example-topic,Key: 12,Value: xueai8.com 12, Offset: 43, Partition: 0 Topic: my-example-topic,Key: 13,Value: xueai8.com 13, Offset: 44, Partition: 0 Topic: my-example-topic,Key: 14,Value: xueai8.com 14, Offset: 45, Partition: 0 Topic: my-example-topic,Key: 15,Value: xueai8.com 15, Offset: 46, Partition: 0