DStream.window(window length,sliding interval)

batch interval:批处理时间间隔,spark streaming将消息源(Kafka)的数据,以流的方式按批处理时间间隔切片,一个批处理间隔时间对应1个切片对应生成的1个RDD

window length :窗口时间长度,每个批处理间隔将会实际处理的RDD个数(1...n)。是批处理间隔的N(N>=1)倍。

sliding interval:滑动窗口时间长度,窗口操作执行的时间间隔。如果设置为=batch interval,则每个批处理时间间隔都会执行一次窗口操作,如果设置为=N*processingInterval(N>1,N为Int),则每N个批处理时间间隔会执行一次窗口操作。

假设spark streaming 从kafka的largest 偏移量处开始消费

对于一个新的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length对应的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

对于一个旧的消费者:

每隔一次batch interval,会更新一次offset(拉取的数据为该batch interval内进入kafka的实时数据+之前保存的offset位置到当前位置的历史数据)

每隔一次sliding interval,会进行生成windowed DStream 操作,并执行逻辑,最后更新一次offset。其中生成的 windowed DStream的数据源为当前最后  window length包含的N个RDD的和(N>=sliding interval,且N=n*batch interval)。

1.如果,window length=3Min,sliding interval=1Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔1分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存,并拉取当前最后1分钟的数据生成一个windowed DStream执行print等action操作(为什么是当前最后1分钟?因为当前只有1分钟的数据)

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,并拉取当前最后2分钟的数据生成一个windowed DStream执行print等action操作

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

2. 如果,window length=3Min,sliding interval=2Min,batch interval=1Min,假设spark streaming 从kafka的largest 偏移量处开始消费。

上述语义为:每隔2分钟,将当前最后3分钟的数据生成一个windowed DStream(如果有多个RDD,则合并他们)

在14个batch interval 里会执行7次窗口数据处理,除了第一个窗口长度为2个batch interval以外,其他都为3个batch interval。

在第一个分钟里,会从kafka里面拉取新进入kafka里的第一分钟的数据并封装为RDD存储到内存

两分钟过去后,会从kafka里面拉取新进入kafka里的第2分钟的数据并封装为RDD存储到内存,执行print等action操作,这次会执行2个RDD里面的数据。

3分钟过去后,会从kafka里面拉取新进入kafka里的第3分钟的数据并封装为RDD存储到内存,不会执行print等action操作

4分钟过去后,会从kafka里面拉取新进入kafka里的第4分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

5分钟过去后,会从kafka里面拉取新进入kafka里的第5分钟的数据并封装为RDD存储到内存,不会执行print等action操作

6分钟过去后,会从kafka里面拉取新进入kafka里的第6分钟的数据并封装为RDD存储到内存,并拉取当前最后3分钟的数据生成一个windowed DStream执行print等action操作

7分钟过去后,会从kafka里面拉取新进入kafka里的第7分钟的数据并封装为RDD存储到内存,不会执行print等action操作

.....

在实际应用中: window length - sliding interval >=应用中给定的需要统计的累计最大时长 ,这样才不会因为当前窗口遗漏某些特殊时间段的数据。

如有这样一个逻辑:要求判断 连续30分钟 的数据满足条件A,则得出结果B

如果,让window length=30,sliding interval=10,batch interval=10,即window length - sliding interval < 30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将不能正常得到结果B

如果,让window length=40,sliding interval=10,batch interval=10,即window length - sliding interval =30分钟

那么 从第5分钟开始连续,直到35分钟时结束连续的这段数据,将可以正常得到结果B

觉得还行的话,右上角点个赞哟。

Spark 创建流式应用的本质,还是依赖了 spark 最核心的那些技术,只是在这些技术上又封装了一层流式接口。 Spark streaming 机制简单来说,就是将连续的时间序列切割成不同的离散时间段。针对某个时间段,将该时间段内的所有输入数据组成一个RDD,接下来的工作就如同一个传统的sprark应用一样,对这个RDD进行各种变换,直到最终输出数据。可以认为, Spark Streamin 以如下代码为例(SocketInputDStream): Spark Streaming 从Socket读取数据的代码是在SocketReceiver的receive方法中,撇开异常情况不谈(Receiver有重连机制,restart方法,默认情况下在Receiver挂了之后,间隔两秒钟重新建立Socket连接),读取到的数据通过调用store(textRead)方法进行存储。数据的流转... 创建 spark Streaming 环境,需要依赖 Spark core 环境 创建 spark streaming 环境,需要设置执行执行的间隔时间 * 1、读取实时的数据源 * yum install nc * 启动socket服务 * nc -lk 8888 package c... Spark DStream提供了Window操作,我们可以使用Window算子对数据进行一系列的算子运算。和Flink不同, Spark DStream提供的window操作比较简单。只能基于数据的处理时间来进行运算。 Spark 的窗口可以分为滚动窗口和 滑动窗口 两类。滚动窗口窗口的大小是固定大小的时间间隔窗口与窗口之间无交集。窗口每次滑动的步长等于窗口的大小 滑动窗口 窗口的大小是固定大小的时间间隔窗口与窗口之间有交集。窗口每次滑动的步长小于窗口的大小。 Spark Streaming 状态操作和 滑动窗口 Spark 是以采集周期处理数据,可以做到秒级。因此统计的是当前周期的数据汇总,但有些场景我们需要统计累加数据,例如:当天登录人数累加,当天某个广告的点击量累计,按小时统计首次登录人数等。这里我们需要用到以前周期的数据,因此我们需要将周期内的统计结果保存起来,用于下个周期使用。其实,我们自己也可以使用Redis缓存类工具做手动保存。这里 Spark Streaming 提供了状态维持的方法,便于操作,简单易用。 1、UpdateStateByKey 二、 滑动窗口 (Sliding Windows)与滚动窗口类似, 滑动窗口 的大小也是固定的。定义 滑动窗口 的参数有两个:除去窗口大小(window size)之外,还有一个滑动步长(window slide),代表窗口计算的频率。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。.window(Seconds(10),Seconds(10)) 10秒的窗口大小和10秒的滑动大小,不存在重叠部分。.window(Seconds(10),Seconds(5)) 10秒的窗口大小和5秒的活动大小,存在重叠部分。 window操作就是窗口函数。 Spark Streaming 提供了 滑动窗口 操作的支持,从而让我们可以对一个 滑动窗口 内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次 滑动窗口 计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行 滑动窗口 计算。所以每个 滑动窗口 操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是 batch 间 前面有几篇关于 Spark Streaming 的博客,那会只是作为 Spark 入门,快速体验 Spark 之用,只是照着葫芦画瓢。本文结合 Spark 官网上 Spark Streaming 的编程指南对 Spark Streaming 进行介绍 Streaming Context 如同 Spark Context一样, Streaming Context也是 Spark Streaming 应用程序通往 Spark 集群的通... 1. 简介 Spark Streaming 提供了 滑动窗口 的操作的支持,从而让我们可以对一个 滑动窗口 内的数据执行计算操作。每次落在窗口里面的RDD 数据,会被集合起来,然后生成新的RDD 会作为windows DStream 的一个RDD ,例如对每三秒钟执行一次 滑动窗口 计算。所以每个 滑动窗口 的操作,都必须只当两个参数,窗口的长度,以及滑动间隔,而且这两个参数都必须是 batch 间隔的整 Spark - Streaming 之window 滑动窗口 应用, Spark Streaming 提供了 滑动窗口 操作的支持,从而让我们可以对一个 滑动窗口 内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。 窗口长度10s < 滑动间隔15s:每隔15s计算最近10s的数据--会丢失数据,开发不用。窗口长度10s = 滑动间隔10s:每隔10s计算最近10s的数据--滚动窗口。窗口长度10s > 滑动间隔5s:每隔5s计算最近10s的数据-- 滑动窗口 。使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!窗口长度:要计算多久的数据。滑动间隔:每隔多久计算一次。 每隔我们设置的 batch inte rve的time,就去找ReceverTracker,将其中的,从上次划分的 batch 的时间,到目前为止的这个 batch inte rval time间隔的block,给封装成为一个 batch . 其次,就会将这个 batch 中的数据,去创建一个初始化RDD,这个RDD中有几个partition呢?这是 batch 和block之间的关系的微妙之处,事实上,是这样的,一个... ** Spark Streaming **就是将连续的数据 **持久化**,**离散化**,然后进行**批量处理**的框架; ** Spark Streaming **是**准实时**(秒级别)、**微批次**的数据处理框架;