使用自定义序列化器

在Kafka中,我们可以创建自己的序列化器和反序列化器,这样就可以产生和使用不同的数据类型,如Json, POJO, avro等。在本节课中,我们将演示如何生产和消费“User” POJO对象。要流式处理POJO对象,就需要创建自定义序列化器和反序列化器。

1. 创建Maven项目,添加依赖

首先创建一个Maven项目,并在pom.xml文件中添加如下依赖:

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.4.1</version>
</dependency>
<dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-mapper-asl</artifactId>
     <version>1.9.13</version>
</dependency>

2. 编辑源码

2.1) 首先创建User POJO类。

class User() {
  private var name:String = ""
  private var age:Int = 0

  def this(name: String, age: Int) {
    this()
    this.name =name
    this.age = age
  }

  def getName: String = this.name
  def getAge: Int = this.age
  override def toString: String = "User(" + name + ", " + age + ")"
}

2.2) 通过继承Kafka的Serializer创建User序列化器类。

import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper

class UserSerializer extends Serializer[User]{
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }

  override def serialize(s: String, t: User): Array[Byte] = {
    if(t==null)
      null
    else
     {
       val objectMapper = new ObjectMapper()
       objectMapper.writeValueAsString(t).getBytes
     }
  }
  override def close(): Unit = {
  }
}

2.3) 通过继承Kafka的Deserializer创建User反序列化器类。

import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper

class UserDeserializer extends Deserializer[User] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }

  override def deserialize(s: String, bytes: Array[Byte]): User = {
    val mapper = new ObjectMapper()
    val user = mapper.readValue(bytes, classOf[User])
    user
  }
  override def close(): Unit = {
  }
}

2.4) 创建一个Kafka消费者程序,并为value.deserializer属性使用UserDeserializer。

import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._

object KafkaConsumerWithUserObject {
  def main(args: Array[String]): Unit = {
    val prop:Properties = new Properties()
    prop.put("group.id", "test")
    prop.put("bootstrap.servers","xueai8:9092")
    prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("value.deserializer","com.xueai8.UserDeserializer")
    prop.put("enable.auto.commit", "true")
    prop.put("auto.commit.interval.ms", "1000")

    val consumer = new KafkaConsumer[String,User](prop)
    val topics = List("my-example-topic")

    try{
      consumer.subscribe(topics.asJava)
      while(true){
        val records = consumer.poll(10)
        for(record<-records.asScala){
          println("Topic: "+record.topic()+", Key: "+record.key() +", Value: "+record.value().getName +
            ", Offset: "+record.offset() +", Partition: "+record.partition())
        }
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      consumer.close()
    }
  }
}

2.5) 创建一个Kafka生产者,并为value.serializer属性指定UserSerializer。

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducerWithUserObject {
  def main(args: Array[String]): Unit = {
    val props:Properties = new Properties()
    props.put("bootstrap.servers","xueai8:9092")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","com.xueai8.UserSerializer")
    props.put("acks","all")

    val producer = new KafkaProducer[String, User](props)

    try{
      for(i <- 0 to 100) {
        val user = new User("My Name - "+i,i)
        val record = new ProducerRecord[String, User]("my-example-topic",i.toString,user)
        val metadata = producer.send(record)
        printf(s"sent record(key=%s value=%s) " +
          "meta(partition=%d, offset=%d)\n",
          record.key(), record.value(), metadata.get().partition(),
          metadata.get().offset());
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      producer.close()
    }
  }
}

3. 执行测试

请按以下步骤执行。

1.1)启动 Zookeeper

在第1个终端窗口,执行以下命令,启动Zookeeper:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

3.2)启动 Zookeeper

在第2个终端窗口,执行以下命令,启动Kafka:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

3.3)创建topic主题 my-example-topic

在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-topics.sh --create \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-example-topic \
  --zookeeper localhost:2181
# 查看
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.4)在IDE中执行消费者程序KafkaConsumerWithUserObject.scala,Kafka消费者程序开始轮询my-example-topic主题。

3.5)在IDE中运行Kafka生产者程序KafkaProducerWithUserObject.scala,输出内容如下:

sent record(key=0 value=User(My Name - 0, 0)) meta(partition=0, offset=47)
sent record(key=1 value=User(My Name - 1, 1)) meta(partition=0, offset=48)
sent record(key=2 value=User(My Name - 2, 2)) meta(partition=0, offset=49)
sent record(key=3 value=User(My Name - 3, 3)) meta(partition=0, offset=50)
sent record(key=4 value=User(My Name - 4, 4)) meta(partition=0, offset=51)
sent record(key=5 value=User(My Name - 5, 5)) meta(partition=0, offset=52)
sent record(key=6 value=User(My Name - 6, 6)) meta(partition=0, offset=53)
sent record(key=7 value=User(My Name - 7, 7)) meta(partition=0, offset=54)
sent record(key=8 value=User(My Name - 8, 8)) meta(partition=0, offset=55)
sent record(key=9 value=User(My Name - 9, 9)) meta(partition=0, offset=56)
sent record(key=10 value=User(My Name - 10, 10)) meta(partition=0, offset=57)
sent record(key=11 value=User(My Name - 11, 11)) meta(partition=0, offset=58)
sent record(key=12 value=User(My Name - 12, 12)) meta(partition=0, offset=59)
......

3.6)回到KafkaConsumerWithUserObject.scala消费者程序运行的控制台窗口,可以看到这个消费者程序输出从my-example-topic主题收到的消息,如下所示:

Topic: my-example-topic,Key: 0,Value: xueai8.com 0, Offset: 31, Partition: 0
Topic: my-example-topic, Key: 0, Value: My Name - 0, Offset: 47, Partition: 0
Topic: my-example-topic, Key: 1, Value: My Name - 1, Offset: 48, Partition: 0
Topic: my-example-topic, Key: 2, Value: My Name - 2, Offset: 49, Partition: 0
Topic: my-example-topic, Key: 3, Value: My Name - 3, Offset: 50, Partition: 0
Topic: my-example-topic, Key: 4, Value: My Name - 4, Offset: 51, Partition: 0
Topic: my-example-topic, Key: 5, Value: My Name - 5, Offset: 52, Partition: 0
Topic: my-example-topic, Key: 6, Value: My Name - 6, Offset: 53, Partition: 0
Topic: my-example-topic, Key: 7, Value: My Name - 7, Offset: 54, Partition: 0
Topic: my-example-topic, Key: 8, Value: My Name - 8, Offset: 55, Partition: 0
Topic: my-example-topic, Key: 9, Value: My Name - 9, Offset: 56, Partition: 0
Topic: my-example-topic, Key: 10, Value: My Name - 10, Offset: 57, Partition: 0
Topic: my-example-topic, Key: 11, Value: My Name - 11, Offset: 58, Partition: 0
Topic: my-example-topic, Key: 12, Value: My Name - 12, Offset: 59, Partition: 0
......

《Flink原理深入与编程实战》