发布日期:2022-04-12 VIP内容

赛题模拟实现-数据采集与实时计算

任务内容:

启动业务系统,按照要求使用Flume将用户操作日志采集并存入Kafka中并使用Flink、Scala消费Kafka中的数据将其进行聚合计算出商城在线人数,将结果存入Redis中,并统计该电商系统的UV与PV将结果存入MySQL中。

实现原理:

从前面的任务剖析部分可以知道,Flink采用的版本是1.10.2。整个任务实现的架构和流程如下图:

实现过程

1)在IDEA中创建一个Flink Maven项目:Flink102Example。

参考教程:使用IntelliJ IDEA+Maven开发Flink项目

2)完成第一个任务:“使用Flume采集某电商系统用户操作日志存入Kafka中”。

2.1)技术参考:

2.2)在$FLUME_HOME/conf/目录下,创建一个配置文件file-to-kafka.conf,并编辑内容如下:

# file-to-kafka.conf
# 命名这个agent的组件
agent1.sources = r1
agent1.sinks = sk1
agent1.channels = ch1

# 描述和配置source,注意监视的路径
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir=/home/hduser/data/flink/logdata

# 使用一个在内存中缓冲事件的channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 100

# 描述sink
agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sk1.kafka.topic = mylog
agent1.sinks.sk1.kafka.bootstrap.servers = localhost:9092
agent1.sinks.sk1.kafka.flumeBatchSize = 20
agent1.sinks.sk1.kafka.producer.acks = 1
agent1.sinks.sk1.kafka.producer.linger.ms = 10
agent1.sinks.sk1.kafka.producer.compression.type = snappy

# 将source和sink绑定到channel
agent1.sources.r1.channels = ch1
agent1.sinks.sk1.channel = ch1

2.3)启动Zookeeper。打开一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

2.4)启动Kafka。打开另一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

2.5)在Kafka中创建主题user-behavior。打开另一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

# 创建一个名为mylog的topic
$ ./bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic user-behavior

2.6)将用户操作日志文件(例如,UserBehavior.csv)拷贝到系统的指定路径下,比如/home/hduser/data/flink/logdata/目录下。

2.7)启动Flume Agent。在终端窗口中,执行如下命令:

# 切换到Flume安装目录
$ cd ~/bigdata/flume-1.9.0

$ ./bin/flume-ng agent \
  -n agent1 \
  -c ./conf -f ./conf/file-to-kafka.conf \
  -Dflume.root.logger=INFO,console

2.8)运行Kafka自带的消费者脚本,让其监听user-behavior主题。打开另一个终端窗口,执行下面的命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic user-behavior \
    --from-beginning

注意观察Kafka消费者脚本运行窗口,会看到Flume已经快速而准确地把这个日志文件发送给了Kafka,并且消费者脚本程序及时地消费并输出了该日志文件的内容。

3) 完成第二个任务:“使用Flink消费Kafka中的数据”。

代码实现:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

import java.util.Properties

/**
 * 完成第二个任务:“使用Flink消费Kafka中的数据”
 */
object ConsumeKafkaDemo {
  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 设置并行度
    env.setParallelism(1)

    // kafka连接属性配置// kafka连接属性配置
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "my-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    // 定义Kafka消费者
    val myConsumer = new FlinkKafkaConsumer010[String]("user-behavior",new SimpleStringSchema(),props)
    myConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
    // myConsumer.setStartFromLatest()        // 从最新的记录开始
    // myConsumer.setStartFromTimestamp(...)  // 从指定的时间戳(毫秒)开始
    // myConsumer.setStartFromGroupOffsets()  // 默认的行为

    // 读取数据源并在控制台输出内容
    env
      .addSource(myConsumer)            // 指定Kafka数据源
      .print()                          // 输出到控制台

    // 触发流程序执行
    env.execute("Consume Kafka Topic")
  }
}

观察控制台,可以看到类似如下的输出内容:

4) 完成第三个任务:“聚合计算出系统在线人数并将结果存入Redis中”。

日志数据样例:(详细数据集说明在下载的源码包中)

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000

字段说明:

列名称 说明
用户ID 整数类型,序列化后的用户ID
商品ID 整数类型,序列化后的商品ID
商品类目ID 整数类型,序列化后的商品所属类目ID
行为类型 字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳 行为发生的时间戳

实现思路:每个UserId代表一个单独的用户。计算在线人数,也就是有多少个唯一ID。

实现方法:滚动窗口 + 窗口函数。

实现代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper._
import org.apache.flink.util.Collector

import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import scala.collection.mutable

object OnlineUsers {

  // 输入事件类型 [用户id, 1]
  case class InputEvent(userid: Int, count: Int)

  // 输出事件类型 [时间戳,在线人数]
  case class OutputEvent(winend:Long, count:Int)

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 设置并行度
    env.setParallelism(1)

    // kafka连接属性配置// kafka连接属性配置
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "my-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    // 定义Kafka消费者
    val myConsumer = new FlinkKafkaConsumer010[String]("user-behavior",new SimpleStringSchema(),props)
    myConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
    // myConsumer.setStartFromLatest()        // 从最新的记录开始
    // myConsumer.setStartFromTimestamp(...)  // 从指定的时间戳(毫秒)开始
    // myConsumer.setStartFromGroupOffsets()  // 默认的行为

    // 分配水印
    myConsumer.assignTimestampsAndWatermarks(
       new AssignerWithPunctuatedWatermarks[String] {
          // 分配水印
          override def checkAndGetNextWatermark(t: String, l: Long): Watermark = {
               if (null != t && t.contains(",")) {
                  val parts = t.split(",")
                  val watermark = new Watermark(parts(4).toLong * 1000)
                  watermark
               }else{
                  null
               }
          }

      // 分配时间戳
      override def extractTimestamp(t: String, l: Long): Long = {
        if (null != t && t.contains(",")) {
          val parts = t.split(",")
          parts(4).toLong * 1000
        }else{
          0
        }
      }
    })

    // 定义redis sink的配置 (默认端口号6379)
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

    val online = env
      .addSource(myConsumer)            		// 指定Kafka数据源
      .map { _.split(",") }      					// 先分割每一行
      .map(arr => InputEvent(arr(0).toInt, 1))   	// 转换为InputEvent
      .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))  // 1小时滚动窗口
      .process(new MyProcessWindowFunction()) 	// 窗口处理函数

    // 输出到控制台
    online.print()

    // 写出到Redis		
    online.addSink(new RedisSink[OutputEvent](conf, new RedisExampleMapper))

    // 触发流程序执行
    env.execute("count online")
  }

  // 自定义窗口处理函数
  class MyProcessWindowFunction extends ProcessAllWindowFunction[InputEvent,OutputEvent,TimeWindow]{
    override def process(context: Context,
                         elements: Iterable[InputEvent],
                         out: Collector[OutputEvent]): Unit = {
      val usersSet = mutable.Set[Int]()   // 注意,这是个可变集合
      for(e <- elements){
        usersSet.add(e.userid)         // 把用户id添加到Set集合中
      }
      // 写出(时间戳,在线人数)
      out.collect(OutputEvent(context.window.getEnd, usersSet.size))
    }
  }

  // redisMap接口,设置key和value
  class RedisExampleMapper extends RedisMapper[OutputEvent]{
    // getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
    override def getCommandDescription: RedisCommandDescription = {
      // RedisCommand.SET 指定存储类型
      new RedisCommandDescription(RedisCommand.SET, "online")
    }

    /**
     * 获取 value值 value的数据是键值对
     * @param data
     * @return
     */
    //指定key
    override def getKeyFromData(t: OutputEvent): String = "online"

    // 指定value
    override def getValueFromData(t: OutputEvent): String = tranTimeToString(t.winend) + ": " + t.count

    // 时间戳转日期表示格式
    def tranTimeToString(ts:Long) :String={
      val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val tim = fm.format(new Date(ts))
      tim
    }
  }
}

执行步骤:

1)启动Zookeeper。打开一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

2)启动Kafka。打开另一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

3)启动Redis Server。再打开一个终端窗口,执行如下命令:

# 切换到Redis的安装目录
$ cd ~/bigdata/redis-6.2.6
$ ./bin/redis-server ./conf/redis.conf

4)运行上面编写的流执行程序(相当于Kafka的消费者程序),在控制台应当可以看到输出的在线人数统计结果。如下图所示:

5)运行Redis cli,查看写入的结果。在一个终端窗口,执行下面的命令:

# 切换到Redis的安装目录
$ cd ~/bigdata/redis-6.2.6
$ ./bin/redis-cli

然后在redis命令行下,执行查询:

redis> get online

应当可以看到写入的值。如下:

5) 完成第四个任务:“统计系统的UV与PV并存入MySQL中”。

  • PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。
  • UV(独立访客):即Unique Visitor,访问网站的一台电脑客户端为一个访客。

日志数据样例:

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000

实现思路:滚动窗口 + 窗口函数。其中uv要去重,pv要累加。

实现代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.util.Collector

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import scala.collection.mutable

object PvAndUv {

  // 输入事件类型
  case class InputEvent(userid: Int, behavior:String, count: Int)

  // 输出事件类型
  case class OutputEvent(winend:Long, uv:Int, pv:Int)

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 设置并行度
    env.setParallelism(1)

    // kafka连接属性配置
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "my-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    // 定义Kafka消费者
    val myConsumer = new FlinkKafkaConsumer010[String]("user-behavior",new SimpleStringSchema(),props)
    myConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
    // myConsumer.setStartFromLatest()        // 从最新的记录开始
    // myConsumer.setStartFromTimestamp(...)  // 从指定的时间戳(毫秒)开始
    // myConsumer.setStartFromGroupOffsets()  // 默认的行为

    // 分配水印
    myConsumer.assignTimestampsAndWatermarks(
       new AssignerWithPunctuatedWatermarks[String] {
          // 分配水印
          override def checkAndGetNextWatermark(t: String, l: Long): Watermark = {
            if (null != t && t.contains(",")) {
              val parts = t.split(",")
              val watermark = new Watermark(parts(4).toLong * 1000)
              watermark
            }else{
              null
            }
          }

          // 分配时间戳
          override def extractTimestamp(t: String, l: Long): Long = {
            if (null != t && t.contains(",")) {
              val parts = t.split(",")
              parts(4).toLong * 1000
            }else{
              0
            }
          }
    })

    val streamResult = env
      .addSource(myConsumer)               		// 指定Kafka数据源
      .map { _.split(",") }         					// 先分割每一行
      .map(arr => InputEvent(arr(0).toInt,arr(3), 1))  	// 转换为InputEvent
      .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))  // 1小时滚动窗口
      .process(new MyProcessWindowFunction())

    streamResult.print()

    //  Flink-1.10及以前,没有提供JDBC Sink, 使用自定义的Sink
    streamResult.addSink(new MyJDBCSink())

    // 触发流程序执行
    env.execute("count uv and pv")
  }

  // 自定义窗口处理函数
  class MyProcessWindowFunction extends ProcessAllWindowFunction[InputEvent,OutputEvent,TimeWindow]{
    override def process(context: Context,
                         elements: Iterable[InputEvent],
                         out: Collector[OutputEvent]): Unit = {
      val usersSet = mutable.Set[Int]()   	// 注意,这是个可变集合
      var pvCount = 0                  	// 统计pv数
      for(e <- elements){
        usersSet.add(e.userid)			// uv要去重
        if(e.behavior=="pv"){
          pvCount += 1					// pv要累加
        }
      }
      // 发送到下游
      out.collect(OutputEvent(context.window.getEnd, usersSet.size, pvCount))
    }
  }

  //指定Sink的范型People
  class MyJDBCSink() extends RichSinkFunction[OutputEvent] {
    // 定义mysql数据库连接url和驱动程序及账号、密码
    val url = "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&useSSL=false"
    val driver = "com.mysql.jdbc.Driver"
    val username = "root"
    val userpwd = "admin"

    //定义SQL的连接、预编译器,给定初始值占位符
    var conn: Connection = _
    var stmt: PreparedStatement = _

    /**
     * 重写open方法:加载驱动,建立连接,预编译SQL语句
     * 先于invoker方法用,仅执行一次
     * @param parameters
     */
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)

      Class.forName(driver)   // 加载驱动程序
      conn = DriverManager.getConnection(url, username, userpwd)  // 连接数据库
      // 执行SQL语句
      val sql = "insert into uv_pv_tb (ts, uv, pv) values (?, ?, ?)"
      stmt = conn.prepareStatement(sql)
    }

    /**
     * 调用方法,调用逻辑都是在里面,数据类型、格式组合都是在里面进行
     * 编写
     * @param value
     * @param context
     */
    override def invoke(value: OutputEvent, context: SinkFunction.Context[_]): Unit = {
      //执行update SQL
      stmt.setLong(1, value.winend)
      stmt.setInt(2, value.uv)
      stmt.setInt(3, value.pv)
      stmt.executeUpdate
    }

    /**
     * close方法用来关闭资源,清理工作,类似于Finally
     */
    override def close(): Unit = {
      super.close()
      if (stmt != null) stmt.close()
      if (conn != null) conn.close()
    }
  }
}

执行步骤:

1) 在MySQL数据库中创建数据表:

   create table uv_pv_tb(
         ts bigint,
         uv int,
         pv int
    );

2)启动Zookeeper。打开一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

3)启动Kafka。打开另一个终端窗口,执行如下命令:

# 切换到Kafka的安装目录
$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

4)运行上面编写的流执行程序(相当于Kafka的消费者程序),在控制台应当可以看到输出的uv和pv统计结果。如下图所示:

5)在MySQL中执行如下的查询语句,查看uv_pv_tb表中的内容:

mysql> select * from uv_pv_tb;

这时应当可以看到写入的uv和pv值:

至此,数据采集与实时计算任务已经全部完成了!