第四篇|Spark-Streaming编程指南(1) 对Spark Streaming执行机制、Transformations与Output Operations、Spark Streaming数据源(Sources)、Spark Streaming 数据汇(Sinks)进行了讨论。本文将延续上篇内容,主要包括以下内容:

  • 有状态的计算
  • 基于时间的窗口操作
  • 检查点Checkpoint
  • 使用DataFrames & SQL处理流数据
  • 有状态的计算

    updateStateByKey

    上一篇文章中介绍了常见的无状态的转换操作,比如在WordCount的例子中,输出的结果只与当前batch interval的数据有关,不会依赖于上一个batch interval的计算结果。spark Streaming也提供了有状态的操作: updateStateByKey ,该算子会维护一个状态,同时进行信息更新 。该操作会读取上一个batch interval的计算结果,然后将其结果作用到当前的batch interval数据统计中。其源码如下:

    def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
    }

    该算子只能在key–value对的DStream上使用,需要接收一个状态更新函数 updateFunc作为参数。使用案例如下:

    object StateWordCount {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName(StateWordCount.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint("file:///e:/checkpoint")
    val lines = ssc.socketTextStream("localhost", 9999)

    // 状态更新函数
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {

    var oldvalue = stateValue.getOrElse(0) // 获取状态值
    // 遍历当前数据,并更新状态
    for (newValue <- newValues) {
    oldvalue += newValue
    }
    // 返回最新的状态
    Option(oldvalue)
    }

    val count = lines.flatMap(_.split(" "))
    .map(w => (w, 1))
    .updateStateByKey(updateFunc)
    count.print()
    ssc.start()
    ssc.awaitTermination()
    }

    }

    尖叫提示:上面的代码必须要开启checkpoint,否则会报错:

    Exception in thread “main” java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()

    updateStateByKey缺点

    运行上面的代码会发现一个现象:即便没有数据源输入,Spark也会为新的batch interval更新状态,即如果没有数据源输入,则会不断地输出之前的计算状态结果。

    updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

    mapwithState

    mapwithState是Spark提供的另外一个有状态的算子,该操作克服了updateStateByKey的缺点,从Spark 1.5开始引入。源码如下:

    def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
    self,
    spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
    }

    mapWithState只返回发生变化的key的值,对于没有发生变化的Key,则不返回。这样做可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(生产环境中建议使用)。

    object StatefulNetworkWordCount {
    def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
    .setAppName("StatefulNetworkWordCount")
    .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("file:///e:/checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    /**
    * word:当前key的值
    * one:当前key对应的value值
    * state:状态值
    */
    val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
    println(s">>> batchTime = $batchTime")
    println(s">>> word = $word")
    println(s">>> one = $one")
    println(s">>> state = $state")
    val output = (word, sum)
    state.update(sum) //更新当前key的状态值
    Some(output) //返回结果
    }
    // 通过StateSpec.function构建StateSpec
    val spec = StateSpec.function(mappingFunc)
    val stateDstream = wordDstream.mapWithState(spec)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
    }
    }

    基于时间的窗口操作

    Spark Streaming提供了两种类型的窗口操作,分别是滚动窗口和滑动窗口。具体分析如下:

    滚动窗口(Tumbling Windows)

    滚动窗口的示意图如下:滚动窗口只需要传入一个固定的时间间隔,滚动窗口是不存在重叠的。

    源码如下:

    /**
    * @param windowDuration:窗口的长度; 必须是batch interval的整数倍.
    */
    def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

    滑动窗口(Sliding Windows)

    滑动窗口的示意图如下:滑动窗口只需要传入两个参数,一个为窗口的长度,一个是滑动时间间隔。可以看出:滑动窗口是存在重叠的。

    源码如下:

    /**
    * @param windowDuration 窗口长度;必须是batching interval的整数倍
    *
    * @param slideDuration 滑动间隔;必须是batching interval的整数倍
    */
    def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
    }
  • window ( windowLength , slideInterval )

    基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream

    def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
    def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
    }
    /**
    * @param windowDuration window长度,必须是batch interval的倍数
    * @param slideDuration 滑动的时间间隔,必须是batch interval的倍数
    * 底层调用的是reduceByWindow
    */
    def countByWindow(
    windowDuration: Duration,
    slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
    }

    应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数

    def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
    }

    更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行 逆向reduce 操作。但是,只能用于 可逆reduce函数 ,即那些reduce函数都有一个对应的 逆向reduce函数 (以InvFunc参数传入)注意:必须开启 checkpointing

    def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration,
    partitioner: Partitioner,
    filterFunc: ((K, V)) => Boolean
    ): DStream[(K, V)] = ssc.withScope {

    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
    new ReducedWindowedDStream[K, V](
    self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
    windowDuration, slideDuration, partitioner
    )
    }
    def countByValueAndWindow(
    windowDuration: Duration,
    slideDuration: Duration,
    numPartitions: Int = ssc.sc.defaultParallelism)
    (implicit ord: Ordering[T] = null)
    : DStream[(T, Long)] = ssc.withScope {
    this.map((_, 1L)).reduceByKeyAndWindow(
    (x: Long, y: Long) => x + y,
    (x: Long, y: Long) => x - y,
    windowDuration,
    slideDuration,
    numPartitions,
    (x: (T, Long)) => x._2 != 0L
    )
    }

    使用案例

    val lines = ssc.socketTextStream("localhost", 9999)

    val count = lines.flatMap(_.split(" "))
    .map(w => (w, 1))
    .reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
    .print()
    //滚动窗口

    /* lines.window(Seconds(20))
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .print()*/

    持久化

    持久化是提升Spark应用性能的一种方式,在 第二篇|Spark core编程指南 一文中讲解了RDD持久化的使用方式。其实,DStream也是支持持久化的,同样是使用persist()与cache()方法,持久化通常在有状态的算子中使用,比如窗口操作,默认情况下,虽然没有显性地调用持久化方法,但是底层已经帮用户做了持久化操作,通过下面的源码可以看出。

    private[streaming]
    class WindowedDStream[T: ClassTag](
    parent: DStream[T],
    _windowDuration: Duration,
    _slideDuration: Duration)
    extends DStream[T](parent.ssc) {
    // 省略代码...
    // Persist parent level by default, as those RDDs are going to be obviously reused.
    parent.persist(StorageLevel.MEMORY_ONLY_SER)
    }

    注意 :与RDD的持久化不同,DStream的默认持久性级别将数据序列化在内存中,通过下面的源码可以看出:

    /** 给定一个持计划级别 */
    def persist(level: StorageLevel): DStream[T] = {
    if (this.isInitialized) {
    throw new UnsupportedOperationException(
    "Cannot change storage level of a DStream after streaming context has started")
    }
    this.storageLevel = level
    this
    }

    /** 默认的持久化级别为(MEMORY_ONLY_SER) */
    def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
    def cache(): DStream[T] = persist()

    从上面的源码可以看出persist()与cache()的主要区别是:

  • cache()方法底层调用的是persist()方法
  • persist()方法有两个重载的方法