window滑动窗口
Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行 计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作 为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3 秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口 计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数 值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强 大的)
window滑动窗口操作函数
Transform
|
意义
|
window
|
对每个滑动窗口的数据执行自定义的计算
|
countByWindow
|
对每个滑动窗口的数据执行count操作
|
reduceByWindow
|
对每个滑动窗口的数据执行reduce操作
|
reduceByKeyAndWindow
|
对每个滑动窗口的数据执行reduceByKey操作
|
countByValueAndWindow
|
对每个滑动窗口的数据执行countByValue操作
|
案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出 排名最靠前的3个搜索词以及出现次数
执行reduceByKeyAndWindow,滑动窗口操作
第二个参数,是窗口长度,这里是60秒
第三个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对 一个RDD进行后续计算
Java语言实现:
package com.kfk.spark.window_hotwords_project;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.List;
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/18
* @time : 2:03 下午
public class WindowHotWordJava {
static JavaStreamingContext jssc = null;
public static void main(String[] args) throws InterruptedException {
jssc = CommStreamingContext.getJssc();
* 数据模型:java
* hive
* spark
* java
JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);
* <java,1>
* <hive,1>
* ...
JavaPairDStream<String, Integer> pair = inputDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
return new Tuple2<>(line,1);
});
* <java,5>
* <hive,3>
* <spark,6>
* <flink,10>
JavaPairDStream<String, Integer> windowWordCount = pair.reduceByKeyAndWindow(new Function2<Integer,Integer,Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}, Durations.seconds(60),Durations.seconds(10));
JavaDStream<Tuple2<String, Integer>> finalStream = windowWordCount.transform(
new Function<JavaPairRDD<String, Integer>, JavaRDD<Tuple2<String, Integer>>>() {
@Override
public JavaRDD<Tuple2<String, Integer>> call(JavaPairRDD<String, Integer> line) throws Exception {
JavaPairRDD<Integer,String> beginPair = line.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
});
JavaPairRDD<Integer,String> sortRdd = beginPair.sortByKey(false);
JavaPairRDD<String,Integer> sortPair = sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String,Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1);
});
List<Tuple2<String,Integer>> wordList = sortPair.take(3);
for (Tuple2<String, Integer> stringIntegerTuple2 : wordList) {
System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2);
return jssc.sparkContext().parallelize(wordList);
});
finalStream.print();
jssc.start();
jssc.awaitTermination();
Scala语言实现
package com.kfk.spark.window_hotwords_project
import com.kfk.spark.common.CommStreamingContextScala
import org.apache.spark.streaming.Seconds
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/18
* @time : 4:47 下午
object WindowHotWordScala {
def main(args: Array[String]): Unit = {
val jssc = CommStreamingContextScala.getJssc
val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)
* 数据模型:java
* hive
* spark
* java
val pairDStream = inputDstream.map(x => (x,1))
* <java,1>
* <hive,1>
* ...
val windowWordCount = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))
* 热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,
* 并打印出 排名最靠前的3个搜索词以及出现次数
val finalDStream = windowWordCount.transform(x => {
val sortRDD = x.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
val list = sortRDD.take(3)
jssc.sparkContext.parallelize(list)
finalDStream.print()
jssc.start()
jssc.awaitTermination()
测试数据:
运行结果:
-------------------------------------------
Time: 1608705518000 ms
-------------------------------------------
(hive,3)
(java ,2)
(java,2)
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
window滑动窗口Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行 计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作 为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3 秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口 计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数 值都必须是batch
本文介绍流处理的概念,流处理引擎环境和Apache
Spark Streaming概述。
批数据处理是指通过固定的输入数据集运行计算逻辑,并在结束时产生结果。这意味着处理将在到达数据集末尾时停止。
相比之下,流处理是关于通过无界数据集运行计算逻辑,因此处理是连续且长时间运行的。
虽然批处理数据与流数据的区别主要在于有限性,但由于流数据的无界数据性质、实时数据的传入顺序、
数据到达的不同速率以及面对机器故障时对正确性和低延迟的期望,流数据处理要比批数据处理复杂得多,也更具挑战性。
流处理的挑战
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.junit.Test
/** spark 滑动窗口测试(slide window)
* 参考 https://www.cnblogs..
Spark Steaming支持对某个时间窗口内实现对数据计算
上图描绘了是以3倍的微批次作为一个窗口长度,并且以2倍微批次作为滑动间隔。将落入到相同窗口的微批次合并成一个相对较大的微批次-窗口批次。
Spark要求所有的窗口的长度以及滑动的间隔必须是微批次的整数倍
滑动窗口:窗口长度 > 滑动间隔 窗口与窗口之间存在元素的重叠。
滚动窗口:窗口长度 = 滑动间隔 窗口...
SparkStreaming使用离散化流作为抽象表示,叫做DStream。Dstream是随时间推移而得到的数据序列。在内部,每个时间区间收到的数据都作为RDD的存在
而DStream是由这些RDD所组成的序列。DStream可以从各种输入源创建,比如Flume、KafaKa或者HDFS。创建出来的DStream支持两种操作(转换和输出)
SparkStreaming应用需要进行额外配置保证2...
目录一、Structured Streaming概述(1)Structured Streaming背景(2)Structured Streaming概念二、Structured Streaming编程模型(1)编程模型(2)EventTime(3)容错语义三、基于WordCount程序讲解Structured Streaming编程模型
一、Structured Streaming概述
(1)Structured Streaming背景
大多数的流式计算引擎(比如storm、spark streaming等
一、Spark与HBase的集成
Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。
数据模型:
row,addres,age,username
目录一、RDD、DataFrame和DataSet的定义二、RDD、DataFrame和DataSet的比较(1)Spark版本(2)数据表示形式(3)数据格式(4)编译时类型安全(5)序列化(6)垃圾回收(7)效率/内存使用(8)编程语言支持(9)聚合操作(Aggregation)(10)结论
一、RDD、DataFrame和DataSet的定义
在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Datasets的定义: