Flume集成Kafka

在生产环境下,通常使用Flume采集日志数据,并将采集到的日志数据发送到Kafka上。这里Flume充当了Kafka生产者角色。而对于Kafka来说,关键是如何接收来自Flume的数据。我们需要在Kafka中创建一个主题topic,然后Flume将其采集到的日志数据发送到该topic上即可。从整体上讲,逻辑应该是比较简单的,如下图所示:

这里我们使用Kafka自带的消费者角色来读取Kafka接收到的日志数据并输出到控制台上显示。

Flume提供了将数据写入Kafka的Kafka Sink组件,因此针对上面这个架构,主要就是配置Flume的Agent,Agent负责实时采集日志文件,将采集到的数据写入Kafka中。

Kafka Sink

Kafka sink是一个Flume sink实现,用于发布数据到Kafka topic。它背后的主要目标是集成Apache Flume和Kafka。这使得基于拉的处理系统可以处理来自各种Flume源的数据。该接收器目前支持Kafka服务器0.10.1.0版本或更高版本。

Kafka Sink的一些属性如下表所示:

属性名称 默认值 描述
type - 它指定组件类型。必须是 org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers - 它指定了Kafka-Sink将要连接到的broker列表,以获得topic分区的列表。这个列表可以是brokers的部分列表。使用的格式是用逗号分隔的主机名:hostname:port。
kafka.topic default-flume-topic 它指定消息将被发布到的Kafka主题。如果我们配置这个参数,那么消息将被发布到这个主题。如果事件头包含一个“topic”字段,那么该事件将被发布到该主题,覆盖这里配置的主题。
kafka.flumeBatchSize 100 它指定在一个批处理中应该处理多少条消息。大批量可以提高吞吐量,同时增加延迟。
producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。
kafka.producer.acks 1 它指定在认为消息成功写入之前,必须有多少副本确认消息。接受的值为0,表示从不等待应答,1表示只等待leader, -1表示等待所有副本。将此设置为-1,以避免在某些leader失败的情况下数据丢失。
kafka.producer.linger.ms 0 默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。这个值大,则减少了网络IO,提升了整体的TPS。

Flume集成Kafka实现

下面我们将演示如何具体实现Flume与Kafka的集成,按照本节一开始描述的流程。请按下面的步骤进行操作。

1)在$FLUME_HOME/conf/目录下,创建一个配置文件file-to-kafka.conf,并编辑内容如下:

# 命名这个agent的组件
agent1.sources = r1
agent1.sinks = sk1
agent1.channels = ch1

# 描述和配置source,注意监视的路径
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir=/home/hduser/data/flink/logdata
# agent1.sources.r1.includePattern=log-*.txt

# 使用一个在内存中缓冲事件的channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 100

# 描述sink
agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sk1.kafka.topic = mylog
agent1.sinks.sk1.kafka.bootstrap.servers = xueai8:9092
agent1.sinks.sk1.kafka.flumeBatchSize = 20
agent1.sinks.sk1.kafka.producer.acks = 1
agent1.sinks.sk1.kafka.producer.linger.ms = 10
agent1.sinks.sk1.kafka.producer.compression.type = snappy

# 将source和sink绑定到channel
agent1.sources.r1.channels = ch1
agent1.sinks.sk1.channel = ch1

在上面的配置文件中,指定监视的文件源路径是/home/hduser/data/flink/logdata。

2)在Kafka中创建主题mylog

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

# 创建一个名为mylog的topic
$ ./bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic mylog

3)启动Zookeeper。打开一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

等待约30秒左右,等待ZooKeeper启动,然后保持zookeeper运行。

4)启动Kafka。打开另一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./bin/kafka-server-start.sh config/server.properties

等待大约30秒左右,Kafka启动。然后保持kafka运行。

5)运行Kafka自带的消费者脚本,让其监听mylog主题。打开另一个终端窗口,执行下面的命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --from-beginning

保持运行。

6)启动Flume Agent。另打开一个终端窗口,执行如下命令:

$ ./bin/flume-ng agent \
  -n agent1 \
  -c ./conf -f ./conf/file-to-kafka.conf \
  -Dflume.root.logger=INFO,console

7)任意位置创建一个文件,例如log-01.txt,随意编辑一些内容,比如:

good good study
day day up

保存并关闭文件。然后使用如下命令将该文件拷贝或移动到Flume监视的目录位置,命令如下:

$ mv log-01.txt ~/home/hduser/data/flink/logdata/

注意观察Kafka消费者脚本运行窗口,会看到Flume已经快速而准确地把这个日志文件发送给了Kafka,并且消费者脚本程序及时地消费并输出了该日志文件的内容。如下图所示:

另一个示例:模拟日志产生的实时采集

上一个示例中,我们把整个日志文件拷贝到了Flume监视的spoolDir目录下(在file-to-kafka.conf文件中配置的)。实际生产中,日志记录是随时间一行一行产生的。下面我们就配置一个这样的Flume源,并同样发送到Kafka。

同样在$FLUME_HOME/conf/目录下,新创建一个exec-to-kafka.conf配置文件,并编辑其内容如下:

# 命名这个agent的三个组件
agent1.sources = r1
agent1.sinks = sk1
agent1.channels = ch1

# 配置source源,注意这里的类型是exec
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hduser/data/flink/logdata/mylog.log

# 使用一个在内存中缓冲事件的channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 100

# 配置kafka sink,这个保持不变
agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sk1.kafka.topic = mylog
agent1.sinks.sk1.kafka.bootstrap.servers = xueai8:9092
agent1.sinks.sk1.kafka.flumeBatchSize = 20
agent1.sinks.sk1.kafka.producer.acks = 1
agent1.sinks.sk1.kafka.producer.linger.ms = 10
agent1.sinks.sk1.kafka.producer.compression.type = snappy

# 将source和sink绑定到channel
agent1.sources.r1.channels = ch1
agent1.sinks.sk1.channel = ch1

然后,与上一示例类似,先后启动Zookeeper、Kafka、kafka-console-consume.sh消费者脚本程序和Flume Agent。

等以上服务或程序都运行之后,接下来打开一个终端窗口,执行如下语句,将内容(模拟日志记录数据)写入到日志文件log-01.txt的文件末尾:

$ echo "good good study" > /home/hduser/data/flink/log-01.txt

此时Flume可以通过tail -F命令实时监控文件中的新增数据,发现有新数据就写入kafka,然后kafka后面的消费者脚本实时拉取数据并输出到控制台。


《Spark原理深入与编程实战》