发布日期:2021-11-24 VIP内容

案例:股票交易实时仪表盘程序

下面通过一个示例“为证券公司建立股票仪表盘程序”,来说明编写Spark结构化流应用程序的步骤。

在这个示例中,数据源是证券公司实时发布的市场交易数据,比如证券、股票等的买卖。我们现在需要构建一个实时仪表板应用程序,用来实现回答以下问题:

  • 计算每10秒钟的销售和购买订单数量;
  • 根据购买或出售的总金额来统计前5个客户;
  • 找出过去一个小时内前5个交易量最多的股票。

数据集说明

在PBLP平台的~/data/spark/stream/目录下,我们准备了一个文件orders.txt,它包含500,000行股票交易数据,每行代表一个买/卖订单。每一行包含如下用逗号分隔的元素:

  • ■ Order时间戳 — 格式为yyyy-mm-dd hh:MM:ss
  • ■ Order ID — 连续递增的整数
  • ■ Client ID — 从1到100的范围的整数
  • ■ Stock代码 — 共80个股票代码
  • ■ 股票买卖的数量 — 从1到1000
  • ■ 股票买卖的价格 — 从1到100
  • ■ 字符B 或 S — 代表一个订单的买(B)/卖(S)事件

可以使用如下命令查看文件的前5行内容:

$ head -n 5 ~/data/spark/stream/orders.txt

内容如下所示:

2016-03-22 20:25:28,1,80,EPE,710,51.00,B
2016-03-22 20:25:28,2,70,NFLX,158,8.00,B
2016-03-22 20:25:28,3,53,VALE,284,5.00,B
2016-03-22 20:25:28,4,14,SRPT,183,34.00,B
2016-03-22 20:25:28,5,62,BP,241,36.00,S

以上这些格式的数据就是我们要处理的流程序的数据。

我们使用一个shell脚本发送股票交易数据给Kafka orders主题。Spark结构化流处理程序将从这个主题读取交易数据并将计算出的结果写到另一个Kafka metrics主题。然后我们将使用Kafka的kafka-console-consumer.sh脚本来接收并显示结果。处理流程如下图所示:

发送股票交易数据的脚本streamOrders.sh也位置PBLP的~/data/spark/stream/目录下,它会调用Kafka自带的生产者脚本,依次读取orders.txt中的每条记录发送给Kafka的order主题,间隔时间为100毫秒。(暂时不需要启动该脚本,稍后我们会用到。)

1. 计算每10秒钟的销售和购买订单数量

完整的结构化流处理代码如下:

  import java.sql.Timestamp
  import java.text.SimpleDateFormat

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._

  // case class定义schema
  case class Order(time: java.sql.Timestamp,
                    orderId:Long,
                    clientId:Long,
                    symbol:String,
                    amount:Int,
                    price:Double,
                    buy:Boolean
                  )

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("orders streaming").getOrCreate()

    // 创建一个流来监听test topic的消息:
    val dataDF = spark.readStream
                      .format("kafka")
                      .option("kafka.bootstrap.servers", " localhost:9092")
                      .option("subscribe", "orders")
                      .option("startingOffsets", "earliest") 
                      .load()

    // 获得data这个DataFrame的schema
    dataDF.printSchema()

    // 将消息内容转换为String类型,再从DataFrame转为DataSet[String]
    import spark.implicits._        // 将DataFrame转为DataSet,必须引入这个包
    val ordersDF = dataDF.selectExpr("CAST(value AS STRING) as v").as[String]

    // 解析从Kafka读取到的数据
    // 数据格式:2016-03-22 20:25:28,1,80,EPE,710,51.00,B
    val orders = ordersDF.flatMap(record => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
      val s = record.split(",")
      try {
        assert(s(6) == "B" || s(6) == "S")
        List(Order(new Timestamp(dateFormat.parse(s(0)).getTime),
                   s(1).toLong,
                   s(2).toLong,
                   s(3),
                   s(4).toInt,
                   s(5).toDouble,
                   s(6) == "B"))
      }catch {
        case e : Throwable => println("错误的行格式 (" + e + "): " + record)
          List()        // 如果无法正确解析,就返回空列表
      }
    })

    // 1)统计股票买/卖总数量
    val bool2str = udf((flag:Boolean) => if (flag) "buys" else "sells")
    val numPerType = orders.groupBy(window($"ts", "10 seconds"), $"buy")
                           .agg(count("*") as "total")
                           .select(bool2str($"buy") as "type", $"total")

    // 将要写入到Kafka的结果重新组织,放在名为"value"的列中(Kafka的要求)
    import org.apache.spark.sql.types._
    val numPerTypeValues = numPerType.select(concat_ws(",", col("type"),col("total")).cast(StringType).as("value"))

    // 执行
    val query = numPerType.writeStream
                          .outputMode("complete")				// 写出模式
                          .format("kafka")					// 格式
                          .option("kafka.bootstrap.servers", "localhost:9092")	// kafka服务器地址
                          .option("topic","metrics")				// 要写入的主题
                          .option("checkpointLocation", "tmp/ck")	  	// 指定检查点
                          .start()

    println("starting......")
    query.awaitTermination()
  }

2. 根据购买或出售的总金额来统计前5个客户

这是一个top N问题。修改代码如下所示:

  import java.sql.Timestamp
  import java.text.SimpleDateFormat

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._

  // case class定义schema
  case class Order(ts: java.sql.Timestamp,
                    orderId:Long,
                    clientId:Long,
                    symbol:String,
                    amount:Int,
                    price:Double,
                    buy:Boolean
                  )

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("orders streaming").getOrCreate()
    // spark.sparkContext.setLogLevel("WARN")      // 设置日志级别

    // 创建一个流来监听test topic的消息:
    val dataDF = spark.readStream
                      .format("kafka")
                      .option("kafka.bootstrap.servers", "localhost:9092")
                      .option("subscribe", "orders")
                      .option("startingOffsets", "earliest") 
                      .load()

    // 获得data这个DataFrame的schema
    dataDF.printSchema()

    // 将消息内容转换为String类型,再从DataFrame转为DataSet[String]
    import spark.implicits._        // 将DataFrame转为DataSet,必须引入这个包
    val ordersDS = dataDF.selectExpr("CAST(value AS STRING) as v").as[String]

    // 解析从Kafka读取到的数据
    // 数据格式:2016-03-22 20:25:28,1,80,EPE,710,51.00,B
    val orders = ordersDS.flatMap(record => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
      val s = record.split(",")
      try {
        assert(s(6) == "B" || s(6) == "S")
        List(Order(new Timestamp(dateFormat.parse(s(0)).getTime),
                   s(1).toLong,
                   s(2).toLong,
                   s(3),
                   s(4).toInt,
                   s(5).toDouble,
                   s(6) == "B"))
      }catch {
        case e : Throwable => println("错误的行格式 (" + e + "): " + record)
          List()        // 如果无法正确解析,就返回空列表
      }
    })

    // 2)top 5客户
    val amountPerClient = orders.select($"clientId",($"amount" * $"price") as "volume")
    val top5clients = amountPerClient.groupBy(window($"ts", "10 seconds"), $"clientId")
                                     .agg(sum("volume") as "volume")
                                     .orderBy($"volume".desc)
                                     .limit(5)

    // 将要写入到Kafka的结果重新组织,放在名为”value”的列中(Kafka的要求)
    import org.apache.spark.sql.types._
    val numPerTypeValues = top5clients.select(concat_ws(",", col("clientId"), col("volume")).cast(StringType).as("value"))

    // 执行
    val query = numPerTypeValues
      .writeStream
      .outputMode("complete")					// 写出模式
      .format("kafka")						// 格式
      .option("kafka.bootstrap.servers", " localhost:9092")	// kafka服务器地址
      .option("topic","metrics")			        // 要写入的主题
      .option("checkpointLocation", "tmp/ck")	  		// 指定检查点
      .start()

    println("starting......")
    query.awaitTermination()
  }

3. 找出过去一个小时内前5个交易量最多的股票

这种类型的计算是通过“窗口操作”完成的。窗口的持续时间是一个小时。但是滑动周期与mini-batch持续时间相同(5秒),因为我们想要每5秒报告一次交易量排名前五的股票。

修改代码如下所示:

  import java.sql.Timestamp
  import java.text.SimpleDateFormat

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._

  // case class定义schema
  case class Order(ts: java.sql.Timestamp,
                    orderId:Long,
                    clientId:Long,
                    symbol:String,
                    amount:Int,
                    price:Double,
                    buy:Boolean
                  )

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("orders streaming").getOrCreate()
    // spark.sparkContext.setLogLevel("WARN")    

    // 创建一个流来监听test topic的消息:
    val dataDF = spark.readStream
                      .format("kafka")
                      .option("kafka.bootstrap.servers", "localhost:9092")
                      .option("subscribe", "orders")
                      .option("startingOffsets", "earliest") 
                      .load()

    // 获得data这个DataFrame的schema
    dataDF.printSchema()

    // 将消息内容转换为String类型,再从DataFrame转为DataSet[String]
    import spark.implicits._        // 将DataFrame转为DataSet,必须引入这个包
    val ordersDS = dataDF.selectExpr("CAST(value AS STRING) as v").as[String]

    // 解析从Kafka读取到的数据
    // 数据格式2016-03-22 20:25:28,1,80,EPE,710,51.00,B
    val orders = ordersDS.flatMap(record => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
      val s = record.split(",")
      try {
        assert(s(6) == "B" || s(6) == "S")
        List(Order(new Timestamp(dateFormat.parse(s(0)).getTime),
                   s(1).toLong,
                   s(2).toLong,
                   s(3),
                   s(4).toInt,
                   s(5).toDouble,
                   s(6) == "B"))
      }catch {
        case e : Throwable => println("错误的行格式 (" + e + "): " + record)
          List()        // 如果无法正确解析,就返回空列表
      }
    })

    // 3)找出过去一个小时内前5个交易量最多的股票
    //    如果是每5秒钟报告一次过去一个小时内前5个交易量最多的股票
    val topStocks = orders.select($"ts",$"symbol",$"amount")
                          .groupBy(window($"ts", "60 minutes", "5 seconds"), $"symbol")
                          .agg(sum("amount") as "amount")
                          .orderBy($"amount".desc)
                          .outputMode("complete")
                          .format("console")
                          .option("truncate","false")
                          .start()

    // 将要写入到Kafka的结果重新组织,放在名为"value"的列中(Kafka的要求)
    import org.apache.spark.sql.types._
    val numPerTypeValues = topStocks.select(concat_ws(",", col("symbol"), col("amount")).cast(StringType).as("value"))

    // 执行
    val query = numPerTypeValues
      .writeStream
      .outputMode("complete")						// 写出模式
      .format("kafka")							// 格式
      .option("kafka.bootstrap.servers", "localhost:9092")		// kafka服务器地址
      .option("topic","metrics")			          	// 要写入的主题
      .option("checkpointLocation", "tmp/ck")	  			// 指定检查点
      .start()

    println("starting......")
    query.awaitTermination()
  }

注:该案例详细执行过程,请参阅《Spark实用教程》对应章节,里面有详细执行步骤讲解。