# Spark

# 一、运行模式

# 1.1 Local 模式

  1. 进入Spark包解压缩后的路径,执行如下指令
bin/spark-shell
1
  1. 访问Web UI http://localhost:4040/jobs/
  2. 数据可以放到 spark-3.1.2-bin-hadoop3.2/data目录中
  3. 简单的word count
sc.textFile("data/word.txt").flatMap(_.split("
")).map((_,1)).reduceByKey(_+_).collect
1
2
  1. 退出本地模式
quit
1
  1. 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
1
2
3
4
5
  • --class 表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  • --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟 CPU 核数量
  • spark-examples_2.12-3.0.0.jar 运行的应用类所在的 jar 包,实际使用时,可以设定为咱们自己打的 jar 包
  • 数字 10 表示程序的入口参数,用于设定当前应用的任务数量

# 1.2 Standalone 模式

Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。

# 1.2.1 修改配置文件

  1. 解压Spark包
  2. 进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
mv slaves.template slaves 
1
  1. 修改 slaves 文件,添加 work 节点
linux1
linux2
linux3
1
2
3
  1. 修改 spark-env.sh.template 文件名为 spark-env.sh
mv spark-env.sh.template spark-env.sh
1
  1. 修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点
export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=linux1
SPARK_MASTER_PORT=7077
1
2
3
  1. 分发 spark-standalone 目录

  2. 启动群集

sbin/start-all.sh
1
  1. 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077 \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10

1) --class 表示要执行程序的主类
2) --master spark://linux1:7077 独立部署模式,连接到 Spark 集群
3) spark-examples_2.12-3.1.2.jar 运行类所在的 jar 包
4) 数字 10 表示程序的入口参数,用于设定当前应用的任务数量
1
2
3
4
5
6
7
8
9
10
  1. 查看 Master 资源监控 Web UI 界面: http://xxx:8080

# 1.2.2 配置历史服务

由于 spark-shell 停止掉后,集群监控 linux1:4040 页面就看不到历史任务的运行情况,所以 开发时都配置历史服务器记录任务运行情况。

  1. 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
1
  1. 修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true
spark.eventLog.dir hdfs://xxx:8020/directory
1
2
  1. 修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://xxx:8020/directory
-Dspark.history.retainedApplications=30"

参数1含义:WEBUI访问的端口号为18080
参数2含义:指定历史服务器日志存储路径
参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
1
2
3
4
5
6
7
8
  1. 分发配置文件
  2. 重新启动集群和历史服务
sbin/stop-all.sh
sbin/start-all.sh
sbin/start-history-server.sh
1
2
3
  1. 查看历史服务:http://xxx:18080

# 1.2.3 配置高可用(HA)

Linux1 Linux2 Linux3
Master Zookeeper Worker Master Zookeeper Worker Zookeeper Worker
  1. 停止集群
sbin/stop-all.sh
1
  1. 修改 spark-env.sh 文件添加如下配置
注释如下内容:
#SPARK_MASTER_HOST=linux1
#SPARK_MASTER_PORT=7077

添加如下内容:
#Master 监控页面默认访问端口为 8080,但是可能会和 Zookeeper 冲突,所以改成 8989,也可以自定义
SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=linux1,linux2,linux3
-Dspark.deploy.zookeeper.dir=/spark"
1
2
3
4
5
6
7
8
9
10
11
  1. 分发配置文件
  2. 启动群集
sbin/start-all.sh
1
  1. 启动 linux2 的单独 Master 节点,此时 linux2 节点 Master 状态处于备用状态
sbin/start-master.sh
1
  1. 提交应用到高可用集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077,linux2:7077 \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
1
2
3
4
5
  1. 停止 linux1 的 Master 资源监控进程

  2. 查看 linux2 的 Master 资源监控 Web UI,稍等一段时间后,linux2 节点的 Master 状态 提升为活动状态

# 1.3 Yarn 模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这 种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主 要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是 和其他专业的资源调度框架集成会更靠谱一些。

# 1.3.1 配置Yarn模式

  1. 修改 hadoop 配置文件/hadoop/etc/hadoop/yarn-site.xml, 并分发
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认 是 true -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认 是 true -->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>
1
2
3
4
5
6
7
8
9
10
  1. 修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置
mv spark-env.sh.template spark-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
export YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
1
2
3
4
  1. 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
1
2
3
4
5
6

# 1.3.2 配置历史服务器

  1. 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
1
  1. 修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true
spark.eventLog.dir hdfs://xxx:8020/directory
1
2
  1. 修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://xxx:8020/directory
-Dspark.history.retainedApplications=30"

参数1含义:WEBUI访问的端口号为18080
参数2含义:指定历史服务器日志存储路径
参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
1
2
3
4
5
6
7
8
  1. 修改 spark-defaults.conf
spark.yarn.historyServer.address=linux1:18080
spark.history.ui.port=18080
1
2
  1. 启动历史服务
sbin/start-history-server.sh
1
  1. 重新提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
1
2
3
4
5
6

# 二、Spark 运行架构

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master, 负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。

# 2.1 核心组件

# 2.1.1 Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。 Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)
  • 在Executor之间调度任务(task)
  • 跟踪Executor的执行情况
  • 通过UI展示查询运行情况

# 2.1.2 Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。

Executor 有两个核心功能:

  • 负责运行组成Spark应用的任务,并将结果返回给驱动器进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。

# 2.1.3 Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

# 2.1.4 ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。 说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。

# 2.2 核心概念

# 2.2.1 有向无环图(DAG)

大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map 阶段 和 Reduce 阶段。 对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框 架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来 说,大多还是批处理的任务。接下来就是以 Spark 为代表的第三代的计算引擎。第三代计 算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。 这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据 流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观, 更便于理解,可以用于表示程序的拓扑结构。

# 2.3 提交流程

# 2.3.1 Yarn Client 模式

 ./bin/spark-shell --master yarn --deploy-mode client
1

Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。

  • Driver在任务提交的本地机器上运行
  • Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
  • ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
  • ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行 main 函数
  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生 成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

# 2.3.2 Yarn Cluster 模式

$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar \
    my-main-jar.jar \
    app_arg1 app_arg2
1
2
3
4
5
6

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。

  • 在YARNCluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,
  • 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的 ApplicationMaster 就是 Driver。
  • Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动Executor 进程
  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main 函数,
  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

# 三、Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于 处理不同的应用场景。三大数据结构分别是:

  • RDD: 弹性分布式数据集
  • 累加器: 分布式共享只写变量
  • 广播变量: 分布式共享只读变量

# 3.1 RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性
    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD封装了计算逻辑,并不保存数据
  • 数据抽象:RDD是一个抽象类,需要子类具体实现
  • 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

# 3.1.1 RDD的五个重要属性

org/apache/spark/rdd/RDD.scala

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on
  • (e.g. block locations for an HDFS file)
  1. 分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]
1
2
3
4
5
6
7
8
  1. 分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]
1
2
3
4
5
6
  1. RDD之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps
1
2
3
4
5
  1. 分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None
1
2
  1. 首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
1
2
3
4

# 3.2 RDD 算子

所谓RDD的算子就是RDD方法

# 3.2.1 转换算子

把旧的RDD包装成一个新的RDD就是转换

# 3.2.2 行动算子

触发任务的调度和作业的执行

# 四、SparkSQL

# 4.1 基础操作

## 支持的格式
spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

## 读取Json
spark.read.json("input/user.json")
val df = spark.read.json("input/user.json")

## 创建临时表
df.createTempView("user")

## 如果表存在直接替换掉
df.createOrReplaceTempView("user")
spark.sql("select * from user").show

## 创建全局表
df.createGlobalTempView("people")
df.createOrReplaceGlobalTempView("people")

## 查询全局表
spark.sql("select * from global_temp.people").show

## 换一个新的session连接
spark.newSession.sql("select age from user").show
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 4.2 DSL 语法

# 查看DataFrame的Schema 
df.printSchema

# 只查看"username"列数据,
df.select("username").show()

# 应用数据,进行计算
df.select($"age"+1).show

# 单引号,引用数据计算
df.select('age +1).show

# 别名
df.select('username,'age+1 as "newage").show()

# 查看"age"大于"30"的数据
df.filter($"age">30).show

# 按照"age"分组,查看数据条数
df.groupBy("age").count.show

# RDD 转换为 DataFrame
# 在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入 import spark.implicits._
val idRDD = sc.textFile("data/id.txt")
idRDD.toDF("id").show

# 样例类将 RDD 转换为 DataFrame
case class User(name:String, age:Int)
sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show

# DataFrame 转换为 RDD,此时得到的 RDD 存储类型为 Row
val rdd = df.rdd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 4.3 DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

# 创建 DataSet
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("zhangsan",2)).toDS()

# 使用基本类型的序列创建 DataSet
val ds = Seq(1,2,3,4,5).toDS

#df 转ds
case  class Emp
1
2
3
4
5
6
7
8
9
更新时间: 8/28/2021, 10:59:44 AM