案例:运输公司车辆超速实时监测
让我们想象一个车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和许多操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。公司管理层希望利用这一遥测数据流来实现一系列应用程序,以帮助他们在业务的运营和财务方面提高管理效率。
使用到目前为止我们所知道的Spark结构化流特性,我们可以实现其中许多用例,比如使用事件时间窗口来监控每天行驶的公里数,或者通过应用过滤器来找到燃油不足预警的车辆。下面我们要实现的是其中一个功能,即监控运输车辆是否超速。我们将创建一个简单的近实时的Spark结构化流应用程序来计算车辆每几秒钟的平均速度。
我们使用Kafka作为流数据源,将从Kafka的“cars”主题来读取这些事件。
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "cars")
.load()
请按下面的步骤实现。我们将先分步讲解,在最后给出完整的代码实现。
1. 创建模拟数据源
为了模拟车辆向我们发送传感器数据,我们将创建一个Kafka producer,它将id、speed、acceleration 和timestamp写入Kafka的“cars”主题。
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.annotation.tailrec
import scala.util.{Random => random}
object RandomCarsKafkaProducer {
def main(args: Array[String]): Unit = {
// kafka broker连接属性
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// kafka生产者
val producer = new KafkaProducer[String, String](props)
val interval = 1000
val topic = "cars" // 主题
// val numRecsToProduce: Option[Int] = None // None = infinite
val numRecsToProduce: Option[Int] = Option(1000) // 连续产生1000条数据
// 生成事件的方法
@tailrec // @tailrec 确保方法是尾递归。尾递归可以保持内存需求不变。
def produceRecord(numRecToProduce: Option[Int]): Unit = {
numRecToProduce match {
// 如果是有限数据集
case Some(x) if x > 0 => // ⇒
// 生成一条数据,发送一条数据
producer.send(generateCarRecord(topic))
Thread.sleep(interval)
produceRecord(Some(x - 1))
// 如果是无限数据集
case None =>
producer.send(generateCarRecord(topic))
Thread.sleep(interval)
produceRecord(None)
case _ =>
}
}
produceRecord(numRecsToProduce)
}
// 每次调用下面这个方法,发送一条车辆行驶数据给Kafka "cars" 主题
def generateCarRecord(topic: String): ProducerRecord[String, String] = {
val carName = s"car${random.nextInt(10)}" // 车名
val speed = random.nextInt(150) // 速度
val acc = random.nextFloat * 100 // 加速
val value = s"$carName, $speed, $acc, ${System.currentTimeMillis()}"
print(s"Writing $value\n")
val d = random.nextFloat() * 100 // 模拟随机延迟时间
if (d < 2) {
println("抱歉! 有一些网络延迟!")
Thread.sleep((d*100).toLong) // 产生随机延迟
}
new ProducerRecord[String, String](topic, "key", value) // 构造一条记录(事件)
}
}
注意,这里的时间戳是在事件源处生成事件(消息)的时间。
2. 定义流事件数据类型
接下来,我们将原始数据解析到一个case class类中,这样我们就有了一个可以使用的结构。
case class CarEvent( carId: String, speed: Option[Int], acceleration: Option[Double], timestamp: Timestamp)
object CarEvent {
def apply(rawStr: String): CarEvent = {
val parts = rawStr.split(",")
CarEvent(parts(0),
Some(Integer.parseInt(parts(1))),
Some(java.lang.Double.parseDouble(parts(2))),
new Timestamp(parts(3).toLong))
}
}
val cars: Dataset[CarEvent] = df.selectExpr("CAST(value AS STRING)").map(r => arEvent(r.getString(0)))
这会产生CarEvent类型的Dataset(即Dataset[CarEvent])。
3. 执行聚合,计算每辆车的平均速度
我们从求每辆车的平均速度开始。这可以通过对carId执行groupby并应用avg聚合函数来实现。
val aggregates = cars.groupBy("carId").agg("speed" -> "avg")
在结构化流程序中,可以使用触发器控制微批处理的时间间隔。在Spark中,触发器被设置为指定在检查新数据是否可用之前等待多长时间。如果没有设置触发器,一旦完成前一个微批处理执行,Spark将立即检查新数据的可用性。
这里我们需要计算车辆在过去5秒内的平均速度。为此,我们需要根据事件时间将事件分组为5秒间隔时间组。这种分组称为窗口(Windowing)。
为了实现窗口化,Spark会添加一个名为“window”的新列,并将提供的“timestamp”列分解为一个或多个行(基于它的值、窗口的大小和滑动),并在该列上执行groupBy。这将隐式地将属于一个时间间隔的所有事件拉到同一个“窗口”中。
这里我们根据“window”和carId对cars数据进行分组。注意,window()是Spark中的一个函数,它返回一个列。
// 4秒大小的滚动窗口
val aggregates = cars
.groupBy(window($"timestamp", "4 seconds"), $"carId")
.agg(avg("speed").alias("speed"))
.where("speed > 70")
在Spark中,水印用于根据当前最大事件时间决定何时清除状态。基于我们所指定的延迟,水印滞后于目前所看到的最大事件时间。例如,如果dealy是3秒,当前最大事件时间是10:00:45,那么水印是在10:00:42。这意味着Spark将保持结束时间小于10:00:42的窗口的状态。
val aggregates = cars
.withWatermark("timestamp", "3 seconds") // 使用timestamp字段设置水印,最大延迟为3s
.groupBy(window($"timestamp", "4 seconds"), $"carId")
.agg(avg("speed").alias("speed"))
.where("speed > 70")
4. 输出我们产生的结果到一个sink—一个Kafka主题
Apache Spark提供了三种输出模式—complete、update和append。这里我们使用update模式。
val writeToKafka = aggregates
.selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic", "fastcars")
.option("checkpointLocation", "/tmp/sparkcheckpoint/")
.queryName("kafka spark streaming kafka")
.outputMode("update")
.start()
在使用Kafka接收器时,检查点位置是必须的,它支持故障恢复和精确地一次处理。
完整代码
下面是完整的代码实现。
import java.sql.Timestamp
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
object KafkaSourceStreaming {
// 将聚合转换为类型化数据
case class CarEvent(carId: String, speed: Option[Int], acceleration: Option[Double], timestamp: Timestamp)
object CarEvent {
def apply(rawStr: String): CarEvent = {
val parts = rawStr.split(",")
CarEvent(
parts(0),
Some(Integer.parseInt(parts(1))),
Some(java.lang.Double.parseDouble(parts(2))),
new Timestamp(parts(3).toLong)
)
}
}
def main(args: Array[String]): Unit = {
// 创建一个spark session, 并以local mode运行
val spark = SparkSession.builder()
.appName("KafkaSourceStreaming")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
// 读取数据源
// 我们不能为kafka源设置一个模式。Kafka源有一个固定的模式(key, value)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "cars")
.load()
val cars = df
.selectExpr("CAST(value AS STRING)")
.map(r => CarEvent(r.getString(0)))
// 带有窗口的水印的聚合
val aggregates = cars
.withWatermark("timestamp", "3 seconds")
.groupBy(window($"timestamp","4 seconds"), $"carId") // 翻滚窗口,大小4秒(事件时间)
.agg(avg("speed").alias("speed"))
.where("speed > 70")
aggregates.printSchema()
// 将结果写出到控制台
val writeToConsole = aggregates.writeStream
.format("console")
.option("truncate", "false")
.queryName("cars streaming")
.outputMode("update")
.start()
// 将结果写出到Kafka
val writeToKafka = aggregates
.selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic", "fastcars")
//.option("startingOffsets", "earliest") // earliest, latest。默认是latest
//.option("endingOffsets", "latest") // 只用于批处理查询
.option("checkpointLocation", "/tmp/carsck/") // 当非memory或console输出时,必须有
.queryName("cars streaming")
//.outputMode("complete")
//.outputMode("append") // 只有当我们设置水印时才支持。只有输出新数据
.outputMode("update") // 输出新的和更新的
.start()
spark.streams.awaitAnyTermination() // 一次运行多个流
}
}
配套视频: