使用自定义序列化器
在Kafka中,我们可以创建自己的序列化器和反序列化器,这样就可以产生和使用不同的数据类型,如Json, POJO, avro等。在本节课中,我们将演示如何生产和消费“User” POJO对象。要流式处理POJO对象,就需要创建自定义序列化器和反序列化器。
1. 创建Maven项目,添加依赖
首先创建一个Maven项目,并在pom.xml文件中添加如下依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency>
2. 编辑源码
2.1) 首先创建User POJO类。
class User() { private var name:String = "" private var age:Int = 0 def this(name: String, age: Int) { this() this.name =name this.age = age } def getName: String = this.name def getAge: Int = this.age override def toString: String = "User(" + name + ", " + age + ")" }
2.2) 通过继承Kafka的Serializer创建User序列化器类。
import java.util import com.sparkbyexamples.kafka.beans.User import org.apache.kafka.common.serialization.Serializer import org.codehaus.jackson.map.ObjectMapper class UserSerializer extends Serializer[User]{ override def configure(map: util.Map[String, _], b: Boolean): Unit = { } override def serialize(s: String, t: User): Array[Byte] = { if(t==null) null else { val objectMapper = new ObjectMapper() objectMapper.writeValueAsString(t).getBytes } } override def close(): Unit = { } }
2.3) 通过继承Kafka的Deserializer创建User反序列化器类。
import java.util import com.sparkbyexamples.kafka.beans.User import org.apache.kafka.common.serialization.Deserializer import org.codehaus.jackson.map.ObjectMapper class UserDeserializer extends Deserializer[User] { override def configure(map: util.Map[String, _], b: Boolean): Unit = { } override def deserialize(s: String, bytes: Array[Byte]): User = { val mapper = new ObjectMapper() val user = mapper.readValue(bytes, classOf[User]) user } override def close(): Unit = { } }
2.4) 创建一个Kafka消费者程序,并为value.deserializer属性使用UserDeserializer。
import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer import scala.collection.JavaConverters._ object KafkaConsumerWithUserObject { def main(args: Array[String]): Unit = { val prop:Properties = new Properties() prop.put("group.id", "test") prop.put("bootstrap.servers","xueai8:9092") prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") prop.put("value.deserializer","com.xueai8.UserDeserializer") prop.put("enable.auto.commit", "true") prop.put("auto.commit.interval.ms", "1000") val consumer = new KafkaConsumer[String,User](prop) val topics = List("my-example-topic") try{ consumer.subscribe(topics.asJava) while(true){ val records = consumer.poll(10) for(record<-records.asScala){ println("Topic: "+record.topic()+", Key: "+record.key() +", Value: "+record.value().getName + ", Offset: "+record.offset() +", Partition: "+record.partition()) } } }catch{ case e:Exception => e.printStackTrace() }finally { consumer.close() } } }
2.5) 创建一个Kafka生产者,并为value.serializer属性指定UserSerializer。
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object KafkaProducerWithUserObject { def main(args: Array[String]): Unit = { val props:Properties = new Properties() props.put("bootstrap.servers","xueai8:9092") props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer","com.xueai8.UserSerializer") props.put("acks","all") val producer = new KafkaProducer[String, User](props) try{ for(i <- 0 to 100) { val user = new User("My Name - "+i,i) val record = new ProducerRecord[String, User]("my-example-topic",i.toString,user) val metadata = producer.send(record) printf(s"sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(), record.value(), metadata.get().partition(), metadata.get().offset()); } }catch{ case e:Exception => e.printStackTrace() }finally { producer.close() } } }
3. 执行测试
请按以下步骤执行。
1.1)启动 Zookeeper
在第1个终端窗口,执行以下命令,启动Zookeeper:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
3.2)启动 Zookeeper
在第2个终端窗口,执行以下命令,启动Kafka:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
3.3)创建topic主题 my-example-topic
在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create \ --replication-factor 1 \ --partitions 1 \ --topic my-example-topic \ --zookeeper localhost:2181 # 查看 $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3.4)在IDE中执行消费者程序KafkaConsumerWithUserObject.scala,Kafka消费者程序开始轮询my-example-topic主题。
3.5)在IDE中运行Kafka生产者程序KafkaProducerWithUserObject.scala,输出内容如下:
sent record(key=0 value=User(My Name - 0, 0)) meta(partition=0, offset=47) sent record(key=1 value=User(My Name - 1, 1)) meta(partition=0, offset=48) sent record(key=2 value=User(My Name - 2, 2)) meta(partition=0, offset=49) sent record(key=3 value=User(My Name - 3, 3)) meta(partition=0, offset=50) sent record(key=4 value=User(My Name - 4, 4)) meta(partition=0, offset=51) sent record(key=5 value=User(My Name - 5, 5)) meta(partition=0, offset=52) sent record(key=6 value=User(My Name - 6, 6)) meta(partition=0, offset=53) sent record(key=7 value=User(My Name - 7, 7)) meta(partition=0, offset=54) sent record(key=8 value=User(My Name - 8, 8)) meta(partition=0, offset=55) sent record(key=9 value=User(My Name - 9, 9)) meta(partition=0, offset=56) sent record(key=10 value=User(My Name - 10, 10)) meta(partition=0, offset=57) sent record(key=11 value=User(My Name - 11, 11)) meta(partition=0, offset=58) sent record(key=12 value=User(My Name - 12, 12)) meta(partition=0, offset=59) ......
3.6)回到KafkaConsumerWithUserObject.scala消费者程序运行的控制台窗口,可以看到这个消费者程序输出从my-example-topic主题收到的消息,如下所示:
Topic: my-example-topic,Key: 0,Value: xueai8.com 0, Offset: 31, Partition: 0 Topic: my-example-topic, Key: 0, Value: My Name - 0, Offset: 47, Partition: 0 Topic: my-example-topic, Key: 1, Value: My Name - 1, Offset: 48, Partition: 0 Topic: my-example-topic, Key: 2, Value: My Name - 2, Offset: 49, Partition: 0 Topic: my-example-topic, Key: 3, Value: My Name - 3, Offset: 50, Partition: 0 Topic: my-example-topic, Key: 4, Value: My Name - 4, Offset: 51, Partition: 0 Topic: my-example-topic, Key: 5, Value: My Name - 5, Offset: 52, Partition: 0 Topic: my-example-topic, Key: 6, Value: My Name - 6, Offset: 53, Partition: 0 Topic: my-example-topic, Key: 7, Value: My Name - 7, Offset: 54, Partition: 0 Topic: my-example-topic, Key: 8, Value: My Name - 8, Offset: 55, Partition: 0 Topic: my-example-topic, Key: 9, Value: My Name - 9, Offset: 56, Partition: 0 Topic: my-example-topic, Key: 10, Value: My Name - 10, Offset: 57, Partition: 0 Topic: my-example-topic, Key: 11, Value: My Name - 11, Offset: 58, Partition: 0 Topic: my-example-topic, Key: 12, Value: My Name - 12, Offset: 59, Partition: 0 ......