创建Kafka生产者和消费者程序_使用Scala API

在本节课中,我们将学习如何使用Scala API编写Kafka生产者(发送消息到Kafka主题)和Kafka消费者(从主题读取消息)。Kafka生产者以记录record的形式发送消息到Kafka主题,每个记录record是一个键值对连同主题名称一起,消费者从一个主题接收消息。

1. 创建Maven项目,添加依赖

首先创建一个Maven项目,并修改pom.xml文件,添加对kafka-clients的依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

2. 编写Kafka生产者程序

这个Kafka Producer scala示例将消息作为Record发布到一个主题。Record是一个键-值对,其中键是可选的,值是必须的。在这个例子中,键和值都是字符串,因此我们使用StringSerializer。

创建一个名为KafkaProducerApp 的Scala object,编辑代码如下:

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducerApp {
  def main(args: Array[String]): Unit = {
    val props:Properties = new Properties()
    props.put("bootstrap.servers","xueai8:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks","all")

    val producer = new KafkaProducer[String, String](props)
    val topic = "my-example-topic"

    try {
      for (i <- 0 to 15) {
        val record = new ProducerRecord[String, String](topic, i.toString, "xueai8.com " + i)
        val metadata = producer.send(record)
        printf(s"sent record(key=%s value=%s) " +
          "meta(partition=%d, offset=%d)\n",
          record.key(), record.value(),
          metadata.get().partition(),
          metadata.get().offset())
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      producer.close()
    }
  }
}

KafkaProducer的send方法返回我们能找到的元数据,哪个分区消息已写入并偏移。

3. 编写Kafka消费者程序

这个Kafka Consumer scala示例订阅一个主题并接收到一个到达主题的消息(记录record)。此消息包含键、值、分区和偏移量。Kafka中的所有消息都是序列化的,因此,消费者应该使用反序列化器来转换为适当的数据类型。这里我们对键和值都使用了StringDeserializer。

import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaConsumerApp {

  def main(args: Array[String]): Unit = {
    val props:Properties = new Properties()
    props.put("group.id", "test")
    props.put("bootstrap.servers","xueai8:9092")
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")

    val consumer = new KafkaConsumer(props)
    val topics = List("my-example-topic")

    try {
      consumer.subscribe(topics.asJava)   // 订阅主题
      while (true) {
        val records = consumer.poll(10)   // 轮询
        for (record <- records.asScala) {
          println("Topic: " + record.topic() +
            ",Key: " + record.key() +
            ",Value: " + record.value() +
            ", Offset: " + record.offset() +
            ", Partition: " + record.partition())
        }
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      consumer.close()
    }
  }
}

4. 执行测试

请按以下步骤执行。

4.1)启动 Zookeeper

在第1个终端窗口,执行以下命令,启动Zookeeper:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

4.2)启动 Zookeeper

在第2个终端窗口,执行以下命令,启动Kafka:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

4.3)创建topic主题 my-example-topic

在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-topics.sh --create \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-example-topic \
  --zookeeper localhost:2181
# 查看
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

4.4)在IDE中执行程序KafkaConsumerApp.scala,Kafka消费者程序开始轮询my-example-topic主题。

4.5)在IDE中运行Kafka生产者程序KafkaProducerApp.scala,输出内容如下:

sent record(key=0 value=xueai8.com 0) meta(partition=0, offset=15)
sent record(key=1 value=xueai8.com 1) meta(partition=0, offset=16)
sent record(key=2 value=xueai8.com 2) meta(partition=0, offset=17)
sent record(key=3 value=xueai8.com 3) meta(partition=0, offset=18)
sent record(key=4 value=xueai8.com 4) meta(partition=0, offset=19)
sent record(key=5 value=xueai8.com 5) meta(partition=0, offset=20)
sent record(key=6 value=xueai8.com 6) meta(partition=0, offset=21)
sent record(key=7 value=xueai8.com 7) meta(partition=0, offset=22)
sent record(key=8 value=xueai8.com 8) meta(partition=0, offset=23)
sent record(key=9 value=xueai8.com 9) meta(partition=0, offset=24)
sent record(key=10 value=xueai8.com 10) meta(partition=0, offset=25)
sent record(key=11 value=xueai8.com 11) meta(partition=0, offset=26)
sent record(key=12 value=xueai8.com 12) meta(partition=0, offset=27)
sent record(key=13 value=xueai8.com 13) meta(partition=0, offset=28)
sent record(key=14 value=xueai8.com 14) meta(partition=0, offset=29)
sent record(key=15 value=xueai8.com 15) meta(partition=0, offset=30)

4.6)回到KafkaConsumerApp.scala消费者程序运行的控制台窗口,可以看到这个消费者程序输出从my-example-topic主题收到的消息,如下所示:

Topic: my-example-topic,Key: 0,Value: xueai8.com 0, Offset: 31, Partition: 0
Topic: my-example-topic,Key: 1,Value: xueai8.com 1, Offset: 32, Partition: 0
Topic: my-example-topic,Key: 2,Value: xueai8.com 2, Offset: 33, Partition: 0
Topic: my-example-topic,Key: 3,Value: xueai8.com 3, Offset: 34, Partition: 0
Topic: my-example-topic,Key: 4,Value: xueai8.com 4, Offset: 35, Partition: 0
Topic: my-example-topic,Key: 5,Value: xueai8.com 5, Offset: 36, Partition: 0
Topic: my-example-topic,Key: 6,Value: xueai8.com 6, Offset: 37, Partition: 0
Topic: my-example-topic,Key: 7,Value: xueai8.com 7, Offset: 38, Partition: 0
Topic: my-example-topic,Key: 8,Value: xueai8.com 8, Offset: 39, Partition: 0
Topic: my-example-topic,Key: 9,Value: xueai8.com 9, Offset: 40, Partition: 0
Topic: my-example-topic,Key: 10,Value: xueai8.com 10, Offset: 41, Partition: 0
Topic: my-example-topic,Key: 11,Value: xueai8.com 11, Offset: 42, Partition: 0
Topic: my-example-topic,Key: 12,Value: xueai8.com 12, Offset: 43, Partition: 0
Topic: my-example-topic,Key: 13,Value: xueai8.com 13, Offset: 44, Partition: 0
Topic: my-example-topic,Key: 14,Value: xueai8.com 14, Offset: 45, Partition: 0
Topic: my-example-topic,Key: 15,Value: xueai8.com 15, Offset: 46, Partition: 0

《PySpark原理深入与编程实战》