Spark_08_其他
1. spark的historyServer
1 | vim spark-defaults.conf |
创建hdfs目录: hadoop fs -mkdir hdfs://hadoop:9000/sparkHistory/
启动: sh sbin/start-history-server.sh
2. Spark的thriftserver
spark on hive : 通过spark来执行任务(spark作为sql入口),解析sql和执行sql都是由spark来完成,但是底层表的一些元数据信息由hive来提供(metastore)
跟hiveserver2一样,Spark也可以启动一个thriftserver进程,用于直接使用SparkSQL(不再需要创建项目,获取sparkSession之后再写SQL)
启动thriftServer之前需要先进行配置(已经在集成环境中配置好了)
- 将hadoop和hive的配置文件放到spark的conf目录下(如果需要使用spark连接hive做操作的话)
- 将hive-site.xml中的
hive.metastore.schema.verification设置为false - 将MySQL的驱动包放到spark的jars下
- 启动
sh sbin/start-thriftserver.sh - 通过 spark安装目录下的 bin下的beeline进行连接
bin/beeline -u jdbc:hive2://hadoop:10000
3. 项目jdk版本问题

4. checkpoint+kafka恢复任务
整体思路: 设置检查点 + 数据重放
spark streaming + kafka
任务本身开启了checkpoint(在生产环境中,checkpoint路径一般是hdfs上的,利用hdfs的分布式和副本机制)
1
2
3JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
// spark 开启checkpoint检查点机制
ssc.checkpoint(checkpointDirectory);重启任务的时候,一定是从上次结束的地方(检查点 checkpoint)继续
1
2
3
4JavaStreamingContext ssc =
// 有就获取 —> checkpoint中有,就从checkpoint中获取
// 没有就新建
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);数据消费的时候,只有消费成功的时候才提交offset信息到kafka中
1
2
3
4
5
6
7
8
9
10
11// Kafka参数配置
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-streaming-group");
kafkaParams.put("auto.offset.reset", <从指定offset启动>);
kafkaParams.put("enable.auto.commit", false); // 关闭自动提交
// 中间处理数据
// 处理完数据之后再提交offset
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
评论
评论插件加载失败
正在加载评论插件