# Spark

Apache Spark是一个开源的集群计算框架。它最初由加州大学伯克利分校的AMP实验室开发,后来Spark的源代码捐献给了Apache软件基金会,从此成了一个活跃的Apache项目。Spark提供了一套完整的集群编程接口,内含容错和并行数据处理能力。

Spark基本的数据结构叫做弹性分布式数据集(Resilient Distributed Datasets,简称RDD)。这是一个分布于集群节点的只读数据集合,并以容错的、并行的方式进行维护。传统的MapReduce框架强制在分布式编程中使用一种特定的线性数据流处理方式。MapReduce程序从磁盘读取输入数据,把数据分解成键/值对,经过混洗、排序、归并等数据处理后产生输出,并将最终结果保存在磁盘。Map阶段和Reduce阶段的结果均要写磁盘,这大大降低了系统性能。也是由于这个原因,MapReduce大都被用于执行批处理任务。为了解决MapReduce的性能问题,Spark使用RDD作为分布式程序的工作集合,它提供一种分布式共享内存的受限形式。在分布式共享内存系统中,应用可以向全局地址空间的任意位置进行读写操作,而RDD是只读的,对其只能进行创建、转化和求值等操作。

利用RDD可以方便地实现迭代算法,简单地说就是能够在一个循环中多次访问数据集合。RDD还适合探索式的数据分析,能够对数据重复执行类似于数据库风格的查询。相对于MapReduce的实现,Spark应用的延迟可以降低几个数量级,其中最为经典的迭代算法是用于机器学习系统的培训算法,这也是开发Spark的初衷。

Spark需要一个集群管理器和一个分布式存储系统作为支撑。对于集群管理,Spark支持独立管理(原生的Spark集群),Hadoop YARN和Apache Mesos。对于分布式存储,Spark可以与多种系统对接,包括HDFS、MapR文件系统、Cassandra、OpenStack Swift、Amazon S3、Kudu,或者一个用户自己实现的文件系统。Spark还支持伪分布的本地部署模式,但通常仅用于开发和测试目的。本地模式不需要分布式存储,而是用本地文件系统代替。在这种场景中,Spark运行在一个机器上,每个CPU核是一个执行器(executor)。

Spark框架含有Spark Core、Spark SQL、Spark Streaming、MLlib Machine Learning Library、GraphX等几个主要组件。

# Spark Core

Spark Core是所有Spark相关组件的基础。它以RDD这个抽象概念为核心,通过一组应用程序接口,提供分布式任务的分发、调度和基本的I/O功能。Spark Core的编程接口支持Java、Python、Scala和R等程序语言。这组接口使用的是函数式编程模式,即一个包含对RDD进行map、filter、reduce、join等并行操作的驱动程序,向Spark传递一个函数,然后Spark调度此函数在集群上并行执行。这些基本操作把RDD作为输入并产生新的RDD。RDD自身是一个不变的数据集,对RDD的所有转换操作都是lazy模式,即Spark不会立刻计算结果,而只是简单地记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始真正执行,这样的设计使Spark更加高效。容错功能是通过跟踪每个RDD的“血统”(lineage,指的是产生此RDD的一系列操作)实现的。一旦RDD的数据丢失,还可以使用血统进行重建。RDD可以由任意类型的Python、Java或Scala对象构成。除了面向函数的编程风格,Spark还有两种形式的共享变量:broadcast和accumulators。broadcast变量引用的是需要在所有节点上有效的只读数据,accumulators可以简便地对各节点返回给驱动程序的值进行聚合。

一个典型的Spark函数式编程的例子是,统计文本文件中每个单词出现的次数,也就是常说的词频统计。在下面这段Scala程序代码中,每个flatMap函数以一个空格作为分隔符,将文件分解为由单词组成的列表,map函数将每个单词列表条目转化为一个以单词为键,数字1为值的RDD对,reduceByKey函数对所有的单词进行计数。每个函数调用都将一个RDD转化为一个新的RDD。对比相同功能的Java代码,Scala语言的简洁性一目了然。

// 将一个本地文本文件读取到(文件名,文件内容)的RDD对。
val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README")
// 以一个空格作为分隔符,将文件分解成一个由单词组成的列表。
val words = data.flatMap(_.split(" "))
// 为每个单词添加计数,并进行聚合计算
val wordFreq = words.map((_, 1)).reduceByKey(_ + _)
// 取得出现次数最多的10个单词
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10)

# Spark SQL

Spark SQL是基于Spark Core之上的一个组件,它引入了名为DataFrames的数据抽象。DataFrames能够支持结构化、半结构化数据。Spark SQL提供了一种“领域特定语言”(Domain-Specific Language,简称DSL),用于在Scala、Java或Python中操纵DataFrames。同时Spark SQL也通过命令行接口或ODBC/JDBC提供对SQL语言的支持。下面是一段Scala里的Spark SQL代码。

val url = "jdbc:mysql://127.0.0.1/information_schema?user=root&password=xxxxxx"
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "tables").load()
df.printSchema()
val countsByDatabase = df.groupBy("TABLE_SCHEMA").count().show()

这段代码用Spark SQL连接本地的MySQL数据库,屏幕打印information_schema.tables的表结构,并按table_schema字段分组,计算并显示每组的记录数。

# Spark Streaming

Spark Streaming利用Spark Core的快速调度能力执行流数据的分析。它以最小批次获取数据,并对批次上的数据执行RDD转化。这样的设计,可以让用于批处理分析的Spark应用程序代码也可以用于流数据分析,因此便于实时大数据处理架构的实现。但是这种便利性带来的问题是处理最小批次数据的延时。其他流数据处理引擎,例如Storm和Flink的streaming组件,都是以事件而不是最小批次为单位处理流数据的。Spark Streaming支持从Kafka、Flume、Twitter、ZeroMQ、Kinesis和TCP/IP sockets接收数据。

# MLlib Machine Learning Library

Spark中还包含一个机器学习程序库,叫做MLlib。MLlib提供了很多机器学习算法,包括分类、回归、聚类、协同过滤等,还支持模型评估、数据导入等额外的功能。MLlib还提供了一些更底层的机器学习原语,如一个通用的梯度下降算法等。所有这些方法都被设计为可以在集群上轻松伸缩的架构。

# GraphX

GraphX是Spark上的图(如社交网络的朋友关系图)处理框架。可以进行并行的图计算。与Spark Streaming和Spark SQL类似,GraphX也扩展了Spark的RDD API,能用来创建一个顶点和边都包含任意属性的有向图。GraphX还支持针对图的各种操作,比如进行图分割的subgraph和操作所有顶点的mapVertices,以及一些常用的图算法,如PageRank和三角计算等。由于RDD是只读的,因此GraphX不适合需要更新图的场景。

更新时间: 3/21/2020, 3:57:55 PM