Spark_08_其他
Enoch

1. spark的historyServer

1
2
3
4
5
6
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop:9000/sparkHistory

vim spark-evn.sh
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop:9000/sparkHistory/"

创建hdfs目录: hadoop fs -mkdir hdfs://hadoop:9000/sparkHistory/

启动: sh sbin/start-history-server.sh

2. Spark的thriftserver

spark on hive : 通过spark来执行任务(spark作为sql入口),解析sql和执行sql都是由spark来完成,但是底层表的一些元数据信息由hive来提供(metastore)

跟hiveserver2一样,Spark也可以启动一个thriftserver进程,用于直接使用SparkSQL(不再需要创建项目,获取sparkSession之后再写SQL)

启动thriftServer之前需要先进行配置(已经在集成环境中配置好了)

  1. 将hadoop和hive的配置文件放到spark的conf目录下(如果需要使用spark连接hive做操作的话)
  2. 将hive-site.xml中的 hive.metastore.schema.verification设置为false
  3. 将MySQL的驱动包放到spark的jars下
  4. 启动 sh sbin/start-thriftserver.sh
  5. 通过 spark安装目录下的 bin下的beeline进行连接 bin/beeline -u jdbc:hive2://hadoop:10000

3. 项目jdk版本问题

image-20250325210340576

4. checkpoint+kafka恢复任务

整体思路: 设置检查点 + 数据重放

spark streaming + kafka

  1. 任务本身开启了checkpoint(在生产环境中,checkpoint路径一般是hdfs上的,利用hdfs的分布式和副本机制)

    1
    2
    3
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
    // spark 开启checkpoint检查点机制
    ssc.checkpoint(checkpointDirectory);
  2. 重启任务的时候,一定是从上次结束的地方(检查点 checkpoint)继续

    1
    2
    3
    4
    JavaStreamingContext ssc =
    // 有就获取 —> checkpoint中有,就从checkpoint中获取
    // 没有就新建
    JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
  3. 数据消费的时候,只有消费成功的时候才提交offset信息到kafka中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Kafka参数配置
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "spark-streaming-group");
    kafkaParams.put("auto.offset.reset", <从指定offset启动>);
    kafkaParams.put("enable.auto.commit", false); // 关闭自动提交
    // 中间处理数据
    // 处理完数据之后再提交offset
    ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
 评论
评论插件加载失败
正在加载评论插件
由 Hexo 驱动 & 主题 Keep
访客数 访问量