发布日期:2021-11-25 VIP内容

案例:运输公司车辆超速实时监测

让我们想象一个车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和许多操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。公司管理层希望利用这一遥测数据流来实现一系列应用程序,以帮助他们在业务的运营和财务方面提高管理效率。

使用到目前为止我们所知道的Spark结构化流特性,我们可以实现其中许多用例,比如使用事件时间窗口来监控每天行驶的公里数,或者通过应用过滤器来找到燃油不足预警的车辆。下面我们要实现的是其中一个功能,即监控运输车辆是否超速。我们将创建一个简单的近实时的Spark结构化流应用程序来计算车辆每几秒钟的平均速度。

我们使用Kafka作为流数据源,将从Kafka的“cars”主题来读取这些事件。

val df: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "cars")
      .load()

请按下面的步骤实现。我们将先分步讲解,在最后给出完整的代码实现。

1. 创建模拟数据源

为了模拟车辆向我们发送传感器数据,我们将创建一个Kafka producer,它将id、speed、acceleration 和timestamp写入Kafka的“cars”主题。

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.annotation.tailrec
import scala.util.{Random => random}

object RandomCarsKafkaProducer {
  def main(args: Array[String]): Unit = {
    // kafka broker连接属性
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    // kafka生产者
    val producer = new KafkaProducer[String, String](props)
    val interval = 1000
    val topic = "cars"        // 主题
    // val numRecsToProduce: Option[Int] = None            // None = infinite
    val numRecsToProduce: Option[Int] = Option(1000)   // 连续产生1000条数据

    // 生成事件的方法
    @tailrec	// @tailrec 确保方法是尾递归。尾递归可以保持内存需求不变。
    def produceRecord(numRecToProduce: Option[Int]): Unit = {
      numRecToProduce match {
        // 如果是有限数据集
        case Some(x) if x > 0 =>     // ⇒
          // 生成一条数据,发送一条数据
          producer.send(generateCarRecord(topic))
          Thread.sleep(interval)
          produceRecord(Some(x - 1))
        // 如果是无限数据集
        case None =>
          producer.send(generateCarRecord(topic))
          Thread.sleep(interval)
          produceRecord(None)
        case _ =>
      }
    }

    produceRecord(numRecsToProduce)
  }

  // 每次调用下面这个方法,发送一条车辆行驶数据给Kafka "cars" 主题
  def generateCarRecord(topic: String): ProducerRecord[String, String] = {
    val carName = s"car${random.nextInt(10)}"    		// 车名
    val speed = random.nextInt(150)              	// 速度
    val acc = random.nextFloat * 100             		// 加速

    val value = s"$carName, $speed, $acc, ${System.currentTimeMillis()}"
    print(s"Writing $value\n")
    val d = random.nextFloat() * 100            		// 模拟随机延迟时间
    if (d < 2) {
      println("抱歉! 有一些网络延迟!")
      Thread.sleep((d*100).toLong)              		// 产生随机延迟
    }
    new ProducerRecord[String, String](topic, "key", value)    // 构造一条记录(事件)
  }
}

注意,这里的时间戳是在事件源处生成事件(消息)的时间。

2. 定义流事件数据类型

接下来,我们将原始数据解析到一个case class类中,这样我们就有了一个可以使用的结构。

case class CarEvent( carId: String, speed: Option[Int], acceleration: Option[Double], timestamp: Timestamp)

object CarEvent {
	def apply(rawStr: String): CarEvent = {
		val parts = rawStr.split(",")
		CarEvent(parts(0), 
			 Some(Integer.parseInt(parts(1))), 
			 Some(java.lang.Double.parseDouble(parts(2))), 
			 new Timestamp(parts(3).toLong))
	}
}

val cars: Dataset[CarEvent] = df.selectExpr("CAST(value AS STRING)").map(r => arEvent(r.getString(0)))

这会产生CarEvent类型的Dataset(即Dataset[CarEvent])。

3. 执行聚合,计算每辆车的平均速度

我们从求每辆车的平均速度开始。这可以通过对carId执行groupby并应用avg聚合函数来实现。

val aggregates = cars.groupBy("carId").agg("speed" -> "avg")

在结构化流程序中,可以使用触发器控制微批处理的时间间隔。在Spark中,触发器被设置为指定在检查新数据是否可用之前等待多长时间。如果没有设置触发器,一旦完成前一个微批处理执行,Spark将立即检查新数据的可用性。

这里我们需要计算车辆在过去5秒内的平均速度。为此,我们需要根据事件时间将事件分组为5秒间隔时间组。这种分组称为窗口(Windowing)。

为了实现窗口化,Spark会添加一个名为“window”的新列,并将提供的“timestamp”列分解为一个或多个行(基于它的值、窗口的大小和滑动),并在该列上执行groupBy。这将隐式地将属于一个时间间隔的所有事件拉到同一个“窗口”中。

这里我们根据“window”和carId对cars数据进行分组。注意,window()是Spark中的一个函数,它返回一个列。

// 4秒大小的滚动窗口
val aggregates = cars
	.groupBy(window($"timestamp", "4 seconds"), $"carId")
	.agg(avg("speed").alias("speed"))
	.where("speed > 70")

在Spark中,水印用于根据当前最大事件时间决定何时清除状态。基于我们所指定的延迟,水印滞后于目前所看到的最大事件时间。例如,如果dealy是3秒,当前最大事件时间是10:00:45,那么水印是在10:00:42。这意味着Spark将保持结束时间小于10:00:42的窗口的状态。

val aggregates = cars
  .withWatermark("timestamp", "3 seconds") 		// 使用timestamp字段设置水印,最大延迟为3s
  .groupBy(window($"timestamp", "4 seconds"), $"carId")
  .agg(avg("speed").alias("speed"))
  .where("speed > 70")

4. 输出我们产生的结果到一个sink—一个Kafka主题

Apache Spark提供了三种输出模式—complete、update和append。这里我们使用update模式。

val writeToKafka = aggregates
  .selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic", "fastcars")
  .option("checkpointLocation", "/tmp/sparkcheckpoint/")
  .queryName("kafka spark streaming kafka")
  .outputMode("update")
  .start()

在使用Kafka接收器时,检查点位置是必须的,它支持故障恢复和精确地一次处理。

完整代码

下面是完整的代码实现。

import java.sql.Timestamp
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

object KafkaSourceStreaming {

  // 将聚合转换为类型化数据
  case class CarEvent(carId: String, speed: Option[Int], acceleration: Option[Double], timestamp: Timestamp)

  object CarEvent {
    def apply(rawStr: String): CarEvent = {
      val parts = rawStr.split(",")
      CarEvent(
        parts(0),
        Some(Integer.parseInt(parts(1))),
        Some(java.lang.Double.parseDouble(parts(2))),
        new Timestamp(parts(3).toLong)
      )
    }
  }

  def main(args: Array[String]): Unit = {

    // 创建一个spark session, 并以local mode运行
    val spark = SparkSession.builder()
      .appName("KafkaSourceStreaming")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    // 读取数据源
    // 我们不能为kafka源设置一个模式。Kafka源有一个固定的模式(key, value)
    val df: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "cars")
      .load()

    val cars = df
      .selectExpr("CAST(value AS STRING)")
      .map(r => CarEvent(r.getString(0)))

    // 带有窗口的水印的聚合
    val aggregates = cars
      .withWatermark("timestamp", "3 seconds")
      .groupBy(window($"timestamp","4 seconds"), $"carId") 	// 翻滚窗口,大小4秒(事件时间)
      .agg(avg("speed").alias("speed"))
      .where("speed > 70")

    aggregates.printSchema()

    // 将结果写出到控制台
    val writeToConsole = aggregates.writeStream
      .format("console")
      .option("truncate", "false")
      .queryName("cars streaming")
      .outputMode("update")
      .start()

    // 将结果写出到Kafka
    val writeToKafka = aggregates
      .selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers","localhost:9092")
      .option("topic", "fastcars")
      //.option("startingOffsets", "earliest")		// earliest, latest。默认是latest
      //.option("endingOffsets", "latest") 		// 只用于批处理查询
      .option("checkpointLocation", "/tmp/carsck/") 	// 当非memory或console输出时,必须有
      .queryName("cars streaming")
      //.outputMode("complete")
      //.outputMode("append")  	        // 只有当我们设置水印时才支持。只有输出新数据
      .outputMode("update") 		// 输出新的和更新的
      .start()

    spark.streams.awaitAnyTermination() 	// 一次运行多个流
  }
}

配套视频:

Spark结构化流案例_车辆超速实时检测与告警_1

Spark结构化流案例_车辆超速实时检测与告警_2

Spark结构化流案例_车辆超速实时检测与告警_3