使用自定义序列化器
在Kafka中,我们可以创建自己的序列化器和反序列化器,这样就可以产生和使用不同的数据类型,如Json, POJO, avro等。在本节课中,我们将演示如何生产和消费“User” POJO对象。要流式处理POJO对象,就需要创建自定义序列化器和反序列化器。
1. 创建Maven项目,添加依赖
首先创建一个Maven项目,并在pom.xml文件中添加如下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
2. 编辑源码
2.1) 首先创建User POJO类。
class User() {
private var name:String = ""
private var age:Int = 0
def this(name: String, age: Int) {
this()
this.name =name
this.age = age
}
def getName: String = this.name
def getAge: Int = this.age
override def toString: String = "User(" + name + ", " + age + ")"
}
2.2) 通过继承Kafka的Serializer创建User序列化器类。
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper
class UserSerializer extends Serializer[User]{
override def configure(map: util.Map[String, _], b: Boolean): Unit = {
}
override def serialize(s: String, t: User): Array[Byte] = {
if(t==null)
null
else
{
val objectMapper = new ObjectMapper()
objectMapper.writeValueAsString(t).getBytes
}
}
override def close(): Unit = {
}
}
2.3) 通过继承Kafka的Deserializer创建User反序列化器类。
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper
class UserDeserializer extends Deserializer[User] {
override def configure(map: util.Map[String, _], b: Boolean): Unit = {
}
override def deserialize(s: String, bytes: Array[Byte]): User = {
val mapper = new ObjectMapper()
val user = mapper.readValue(bytes, classOf[User])
user
}
override def close(): Unit = {
}
}
2.4) 创建一个Kafka消费者程序,并为value.deserializer属性使用UserDeserializer。
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerWithUserObject {
def main(args: Array[String]): Unit = {
val prop:Properties = new Properties()
prop.put("group.id", "test")
prop.put("bootstrap.servers","xueai8:9092")
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","com.xueai8.UserDeserializer")
prop.put("enable.auto.commit", "true")
prop.put("auto.commit.interval.ms", "1000")
val consumer = new KafkaConsumer[String,User](prop)
val topics = List("my-example-topic")
try{
consumer.subscribe(topics.asJava)
while(true){
val records = consumer.poll(10)
for(record<-records.asScala){
println("Topic: "+record.topic()+", Key: "+record.key() +", Value: "+record.value().getName +
", Offset: "+record.offset() +", Partition: "+record.partition())
}
}
}catch{
case e:Exception => e.printStackTrace()
}finally {
consumer.close()
}
}
}
2.5) 创建一个Kafka生产者,并为value.serializer属性指定UserSerializer。
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerWithUserObject {
def main(args: Array[String]): Unit = {
val props:Properties = new Properties()
props.put("bootstrap.servers","xueai8:9092")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","com.xueai8.UserSerializer")
props.put("acks","all")
val producer = new KafkaProducer[String, User](props)
try{
for(i <- 0 to 100) {
val user = new User("My Name - "+i,i)
val record = new ProducerRecord[String, User]("my-example-topic",i.toString,user)
val metadata = producer.send(record)
printf(s"sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d)\n",
record.key(), record.value(), metadata.get().partition(),
metadata.get().offset());
}
}catch{
case e:Exception => e.printStackTrace()
}finally {
producer.close()
}
}
}
3. 执行测试
请按以下步骤执行。
1.1)启动 Zookeeper
在第1个终端窗口,执行以下命令,启动Zookeeper:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
3.2)启动 Zookeeper
在第2个终端窗口,执行以下命令,启动Kafka:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
3.3)创建topic主题 my-example-topic
在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create \ --replication-factor 1 \ --partitions 1 \ --topic my-example-topic \ --zookeeper localhost:2181 # 查看 $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3.4)在IDE中执行消费者程序KafkaConsumerWithUserObject.scala,Kafka消费者程序开始轮询my-example-topic主题。
3.5)在IDE中运行Kafka生产者程序KafkaProducerWithUserObject.scala,输出内容如下:
sent record(key=0 value=User(My Name - 0, 0)) meta(partition=0, offset=47) sent record(key=1 value=User(My Name - 1, 1)) meta(partition=0, offset=48) sent record(key=2 value=User(My Name - 2, 2)) meta(partition=0, offset=49) sent record(key=3 value=User(My Name - 3, 3)) meta(partition=0, offset=50) sent record(key=4 value=User(My Name - 4, 4)) meta(partition=0, offset=51) sent record(key=5 value=User(My Name - 5, 5)) meta(partition=0, offset=52) sent record(key=6 value=User(My Name - 6, 6)) meta(partition=0, offset=53) sent record(key=7 value=User(My Name - 7, 7)) meta(partition=0, offset=54) sent record(key=8 value=User(My Name - 8, 8)) meta(partition=0, offset=55) sent record(key=9 value=User(My Name - 9, 9)) meta(partition=0, offset=56) sent record(key=10 value=User(My Name - 10, 10)) meta(partition=0, offset=57) sent record(key=11 value=User(My Name - 11, 11)) meta(partition=0, offset=58) sent record(key=12 value=User(My Name - 12, 12)) meta(partition=0, offset=59) ......
3.6)回到KafkaConsumerWithUserObject.scala消费者程序运行的控制台窗口,可以看到这个消费者程序输出从my-example-topic主题收到的消息,如下所示:
Topic: my-example-topic,Key: 0,Value: xueai8.com 0, Offset: 31, Partition: 0 Topic: my-example-topic, Key: 0, Value: My Name - 0, Offset: 47, Partition: 0 Topic: my-example-topic, Key: 1, Value: My Name - 1, Offset: 48, Partition: 0 Topic: my-example-topic, Key: 2, Value: My Name - 2, Offset: 49, Partition: 0 Topic: my-example-topic, Key: 3, Value: My Name - 3, Offset: 50, Partition: 0 Topic: my-example-topic, Key: 4, Value: My Name - 4, Offset: 51, Partition: 0 Topic: my-example-topic, Key: 5, Value: My Name - 5, Offset: 52, Partition: 0 Topic: my-example-topic, Key: 6, Value: My Name - 6, Offset: 53, Partition: 0 Topic: my-example-topic, Key: 7, Value: My Name - 7, Offset: 54, Partition: 0 Topic: my-example-topic, Key: 8, Value: My Name - 8, Offset: 55, Partition: 0 Topic: my-example-topic, Key: 9, Value: My Name - 9, Offset: 56, Partition: 0 Topic: my-example-topic, Key: 10, Value: My Name - 10, Offset: 57, Partition: 0 Topic: my-example-topic, Key: 11, Value: My Name - 11, Offset: 58, Partition: 0 Topic: my-example-topic, Key: 12, Value: My Name - 12, Offset: 59, Partition: 0 ......