一、实时流处理
实时流处理,就是一种 处理连续、动态数据流的 计算技术,核心特点如下:
- 低延迟:数据输入后能够快速相应和处理
- 持续处理:能够连续处理无边界的数据流
- 动态计算:实时对数据进行分析、聚合和转换等
应用场景
- 实时推荐系统
- 金融交易监控
- 网络安全监控
- 社交媒体趋势分析
- ….
二、 Spark streaming介绍

数据是源源不断产生的,我们通过SparkStreaming实时接收这种数据,并通过将数据进行切分的方式来处理。
1. 流处理思想
一个无边界的数据流,只要我们按照时间片段(一般是比较短的时间片段)进行切割,就可以变成无数多个 有边界的数据,这个有边界的数据在Spark中就是 RDD

JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(5));
这里的第二个参数,就是控制时间片段的大小,通常是按照秒级切分
2. DStream概念
SparkStreaming中的数据抽象叫做DStream,英文全称 Discretized Stream(离散流),它代表一个持续不断的数据流。
代表连续的数据流
底层基于RDD实现
支持RDD所支持的各种transformation和action

3. DStream的操作
无状态转换(跟RDD基本没有区别)
1
2
3
4
5JavaReceiverInputDStream<String> dStream = streamingContext.socketTextStream("localhost", 9999);
JavaPairDStream<String, Integer> result = dStream
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((a, b) -> a + b);有状态转换
- 无状态:只对当前窗口的数据进行处理,不会依赖任何的历史时间窗口处理过的数据,这就是无状态计算(简单,但是不够丰富)
- 有状态(累积结果):除了对当前窗口的数据进行处理之外,还需要依赖历史的窗口处理的数据结果,这就是有状态计算
updateStateByKey 和 mapWIthState(Experimental)
1
2
3
4
5
6
7
8
9
10
11// updateStateByKey
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (values, state) -> {
Integer newSum = state.orElse(0); // 如果state存在,则取state,否则取0
for (Integer i : values) {
newSum = i + newSum;
}
return Optional.of(newSum);
};
// 将当前的新数据和历史的状态数据进行累加,得到的结果作为新的状态返回。 (shuffle)
JavaPairDStream<String, Integer> newResult = result.<Integer>updateStateByKey(updateFunction);- 每个批次处理时,会对所有的已存在的key重新计算状态,全量更新的方式,会导致即使某些key没有新数据,也会进行处理(效率低)
- 使用上比较简单,但是状态会无限增长(也就是key的个数会膨胀)
1
2
3
4
5
6
7
8
9
10
11
12
13// mapWithState https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-apache-spark/
// Function3<String, Option<Integer>,State<Integer>,Tuple2<String,Integer>>
// (KeyType, Option[ValueType], State[StateType]) => MappedType
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> specFunction = StateSpec.function(
(Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>)
(word, value, state) -> {
int newState = value.orElse(0) + (state.exists() ? state.get() : 0);
state.update(newState);
return new Tuple2<>(word, newState);
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> newResult =
result.mapWithState(specFunction);- 增量更新状态,只会处理当前批次有变化的key
- 使用时需要定义StateSpec函数,泛型包含 key类型,value类型,状态类型以及 MappedType map中每条数据的类型
- 可以配置状态的超时时间,超时后自动清除状态,防止状态无限膨胀
- 可以控制状态类型,不一定要跟数据的value类型一致
- 实现上比较复杂,适用于大规模的状态管理(实验性接口)
如果即想使用状态,又怕状态无限膨胀,最佳实践是使用额外存储作为状态后端。
窗口操作
滑动窗口
每间隔多长时间,统计多大时间窗口的数据
比如:每5分钟 统计过去一小时的销售量/额

- 累加器、广播变量、Checkpoint故障恢复
Accumulators, Broadcast Variables, and Checkpoints
累加器(executor只写)
- executor只能对累加器做累加的动作
- 多个executor在处理数据的时候,都对累加器做了操作,但是不会产生安全问题,因为spark帮我们保证了(一致性协议)
- 一般用于统计
广播变量(executor只读)
- 广播变量由driver创建并广播,executor只能读取值,不能修改值
- 广播变量广播之后,会在每个executor中保存一份
- 广播变量发送给executor只会发送一次,不会因为每个executor由多个task而发送多次
从checkpoint重启spark streaming程序
- 累加状态被重置了
- 依赖外部存储(redis、hbase),重启的时候从外部初始化初始值
- 假设streaming挂了,但是socket还在不断的发数据
挂了到重启的这段时间的数据就会丢失- 回放数据(socket数据源不支持) –> 使用支持回放的数据源,比如kafka,通过offset机制来保证
- 累加状态被重置了
4. 数据的输出
println打印到控制台 (本地调试)
保存到文件 (streaming用的比较少,spark core用的多,尤其是保存到hdfs) saveAsTextFiles saveAsNewAPIHadoopFiles saveAsHadoopFiles
保存到外部的存储(数据库、kv存储)
1
2
3
4
5
6
7resultDstream.foreachRDD(rdd -> {
// 获取数据库连接(连接池中get connection
// insert or update 到数据库
rdd.foreach(record ->{
saveToDb()
})
});
5. SQL 的方式处理DStream
工作原理:
DStream是基于 rdd 的数据流,当我们使用foreachRDD的时候,就变成了 rdd
rdd + schema 信息 就转成了 DataFrame,结构化,可以使用 SQL的方式处理
1 | // sparkSession sparkSession sparkContext streamingContext |