相关文章推荐
没人理的玉米  ·  为什么提交Spark ...·  9 月前    · 
没人理的玉米  ·  spark ...·  9 月前    · 
没人理的玉米  ·  Spark Streaming + ...·  9 月前    · 
没人理的玉米  ·  Spark Streaming ...·  9 月前    · 
package com.shujia.spark.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo11WordCount {
  def main(args: Array[String]): Unit = {
     * 创建spark Streaming 环境,需要依赖Spark core 环境
    val conf = new SparkConf()
    //指定两个core执行代码
    conf.setMaster("local[2]")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)
     * 创建spark streaming 环境,需要设置执行执行的间隔时间
    //创建spark streaming 环境
    val ssc = new StreamingContext(sc, Durations.seconds(5))
     * 1、读取实时的数据源
     * yum install nc
     * 启动socket服务
     * nc -lk 8888
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
     * 统计单词数量
    val wordDS: DStream[String] = linesDS.flatMap(_.split(","))
    val kvDS: DStream[(String,Int)] = wordDS.map(word => (word,1))
    val countDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)
     * 打印结果
    countDS.print()
     * 启动spark Streaming 程序
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

统计全局单词的数量,关键词updateStateByKey

有状态算子 -- 使用当前批次的数据加上之前批次的统计结果,返回新的结果
有状态算子需要将计算的状态保存在hdfs中,所以需要设计checkpoint 的路径
 seq:当前批次 一个key所有的value
option:之前批次统计的结果(状态),如果是第一次统计这个key,option 为None
package com.shujia.spark.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo22UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    //想用sparkStreaming 就必须先创建spark core 环境
    val conf = new SparkConf()
    //必须创建两个core
    conf.setMaster("local[2]")
    conf.setAppName("update")
    val sc: SparkContext = new SparkContext(conf)
    val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
    ssc.checkpoint("data/checkpoint")
     * 统计全局单词的数量
    //读取socket中的数据
    val lineDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
    val kvDS: DStream[(String, Int)] = lineDS.flatMap(_.split(",")).map(word => (word, 1))
     * 有状态算子 -- 使用当前批次的数据加上之前批次的统计结果,返回新的结果
     * 有状态算子需要将计算的状态保存在hdfs中,所以需要设计checkpoint 的路径
     * seq:当前批次 一个key所有的value
     * option:之前批次统计的结果(状态),如果是第一次统计这个key,option 为None
    val countDS: DStream[(String, Int)]= kvDS.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      //统计当前批次的单词量
      val currCount: Int = seq.sum
      //获取之前批次统计的单词的数量
      val count: Int = option.getOrElse(0)
      //计算新的单词的数量并返回
      Option(currCount + count)
    countDS.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

窗口-滑动窗口滚动窗口

package com.shujia.spark.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.language.postfixOps
object Demo33ReduceByKeyAndWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("window")
    val sc: SparkContext = new SparkContext(conf)
    val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
    val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split('.')).map((_, 1))
     * 滑动窗口
     * 统计最近15秒单词的数据量,每间隔5秒就统计一次
     * 窗口的大小和滑动时间都必须是sparkStreaming 的整数倍
   /* val countDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow(
      (x: Int, y:Int) => x +y, //聚合函数
      Durations.seconds(15), //窗口的大小
      Durations.seconds(5) //滑动时间
    //countDS.print()
     * 滚动窗口 --每隔15秒计算一次
     * 窗口的大小等于滑动时间
    val countDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow(
      (x: Int, y:Int) => x +y, //聚合函数
      Durations.seconds(15), //窗口的大小
      Durations.seconds(15) //滑动时间
    countDS.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()