Spark_07_spark streaming
Enoch

一、实时流处理

实时流处理,就是一种 处理连续、动态数据流的 计算技术,核心特点如下:

  • 低延迟:数据输入后能够快速相应和处理
  • 持续处理:能够连续处理无边界的数据流
  • 动态计算:实时对数据进行分析、聚合和转换等

应用场景

  • 实时推荐系统
  • 金融交易监控
  • 网络安全监控
  • 社交媒体趋势分析
  • ….

二、 Spark streaming介绍

image-20250325211945822

数据是源源不断产生的,我们通过SparkStreaming实时接收这种数据,并通过将数据进行切分的方式来处理。

1. 流处理思想

一个无边界的数据流,只要我们按照时间片段(一般是比较短的时间片段)进行切割,就可以变成无数多个 有边界的数据,这个有边界的数据在Spark中就是 RDD

image-20250325213017249

JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(5));

这里的第二个参数,就是控制时间片段的大小,通常是按照秒级切分

2. DStream概念

SparkStreaming中的数据抽象叫做DStream,英文全称 Discretized Stream(离散流),它代表一个持续不断的数据流。

  • 代表连续的数据流

  • 底层基于RDD实现

  • 支持RDD所支持的各种transformation和action

image-20250325213321364

3. DStream的操作

  1. 无状态转换(跟RDD基本没有区别)

    1
    2
    3
    4
    5
        JavaReceiverInputDStream<String> dStream = streamingContext.socketTextStream("localhost", 9999);
    JavaPairDStream<String, Integer> result = dStream
    .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
    .reduceByKey((a, b) -> a + b);
  2. 有状态转换

    1. 无状态:只对当前窗口的数据进行处理,不会依赖任何的历史时间窗口处理过的数据,这就是无状态计算(简单,但是不够丰富)
    2. 有状态(累积结果):除了对当前窗口的数据进行处理之外,还需要依赖历史的窗口处理的数据结果,这就是有状态计算
  • updateStateByKey 和 mapWIthState(Experimental)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // updateStateByKey
    Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (values, state) -> {
    Integer newSum = state.orElse(0); // 如果state存在,则取state,否则取0
    for (Integer i : values) {
    newSum = i + newSum;
    }
    return Optional.of(newSum);
    };
    // 将当前的新数据和历史的状态数据进行累加,得到的结果作为新的状态返回。 (shuffle)
    JavaPairDStream<String, Integer> newResult = result.<Integer>updateStateByKey(updateFunction);

    • 每个批次处理时,会对所有的已存在的key重新计算状态,全量更新的方式,会导致即使某些key没有新数据,也会进行处理(效率低)
    • 使用上比较简单,但是状态会无限增长(也就是key的个数会膨胀)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // mapWithState  https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-apache-spark/
    // Function3<String, Option<Integer>,State<Integer>,Tuple2<String,Integer>>
    // (KeyType, Option[ValueType], State[StateType]) => MappedType
    StateSpec<String, Integer, Integer, Tuple2<String, Integer>> specFunction = StateSpec.function(
    (Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>)
    (word, value, state) -> {
    int newState = value.orElse(0) + (state.exists() ? state.get() : 0);
    state.update(newState);
    return new Tuple2<>(word, newState);
    });

    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> newResult =
    result.mapWithState(specFunction);
    • 增量更新状态,只会处理当前批次有变化的key
    • 使用时需要定义StateSpec函数,泛型包含 key类型,value类型,状态类型以及 MappedType map中每条数据的类型
    • 可以配置状态的超时时间,超时后自动清除状态,防止状态无限膨胀
    • 可以控制状态类型,不一定要跟数据的value类型一致
    • 实现上比较复杂,适用于大规模的状态管理(实验性接口)

如果即想使用状态,又怕状态无限膨胀,最佳实践是使用额外存储作为状态后端。

  1. 窗口操作

    滑动窗口

    每间隔多长时间,统计多大时间窗口的数据

    比如:每5分钟 统计过去一小时的销售量/额

image-20250329104359232

  1. 累加器、广播变量、Checkpoint故障恢复

Accumulators, Broadcast Variables, and Checkpoints

  • 累加器(executor只写)

    • executor只能对累加器做累加的动作
    • 多个executor在处理数据的时候,都对累加器做了操作,但是不会产生安全问题,因为spark帮我们保证了(一致性协议)
    • 一般用于统计
  • 广播变量(executor只读)

    • 广播变量由driver创建并广播,executor只能读取值,不能修改值
    • 广播变量广播之后,会在每个executor中保存一份
    • 广播变量发送给executor只会发送一次,不会因为每个executor由多个task而发送多次
  • 从checkpoint重启spark streaming程序

    • 累加状态被重置了
      • 依赖外部存储(redis、hbase),重启的时候从外部初始化初始值
    • 假设streaming挂了,但是socket还在不断的发数据
      挂了到重启的这段时间的数据就会丢失
      • 回放数据(socket数据源不支持) –> 使用支持回放的数据源,比如kafka,通过offset机制来保证

4. 数据的输出

  1. println打印到控制台 (本地调试)

  2. 保存到文件 (streaming用的比较少,spark core用的多,尤其是保存到hdfs) saveAsTextFiles saveAsNewAPIHadoopFiles saveAsHadoopFiles

  3. 保存到外部的存储(数据库、kv存储)

    1
    2
    3
    4
    5
    6
    7
    resultDstream.foreachRDD(rdd -> {
    // 获取数据库连接(连接池中get connection
    // insert or update 到数据库
    rdd.foreach(record ->{
    saveToDb()
    })
    });

5. SQL 的方式处理DStream

工作原理:

  1. DStream是基于 rdd 的数据流,当我们使用foreachRDD的时候,就变成了 rdd

  2. rdd + schema 信息 就转成了 DataFrame,结构化,可以使用 SQL的方式处理

1
2
//    sparkSession            sparkSession            sparkContext    streamingContext
// 注册一个 words 临时表 <--- dataframe(dataset) <--- rdd + schema <--- dStream

 评论
评论插件加载失败
正在加载评论插件
由 Hexo 驱动 & 主题 Keep
访客数 访问量