Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性。它是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入Hadoop的生态系统,以弥补缺失MapReduce的不足。
通过下面这张图,可以了解到Spark的主要组件及其功能。
可以看到,为了兼容Apache大数据生态,Spark在底层支持三种资源调度框架:
- Standalone: Spark原生的资源管理,由Master负责资源的分配。
- Apache Mesos: 与 Hadoop MapReduce兼容性良好的一种资源调度框架。
- Hadoop Yarn: 主要是指的Yarn中的ResourceManager。
架构
Spark内部主要涉及两个核心组件,下面是其关系图。
预习知识: 用户编写的Spark应用程序,其中包含了Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
Driver
:在一个Spark集群中,有一个节点负责中央协调,调度各个分布式工作节点;这个中央协调节点被称为驱动器节点。Driver运行Application的main()函数并创建SparkContext(负责与ClusterManager通信,进行资源的申请、任务的分配和监控等)。- 把用户程序(Application)转为任务
- 为执行器调度任务
Executor
:与之对应的工作节点。- 负责运行组成Spark应用的任务,并将结果返回给Driver;
- 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。
任务执行过程
通用流程
- 用户通过 spark-submit 脚本提交应用;
- spark-submit 脚本启动Driver,调用用户定义的 main() 方法;
- Driver与集群管理器通信,申请资源以启动Executor;
- 集群管理器为Driver启动Executor;
- Driver执行用户应用中的操作(根据程序中所定义的对RDD的转化操作和行动操作,Drvier把工作以任务的形式发送到Executor);
- 任务在Executor中进行计算并保存结果;
- 如果Driver的 main() 方法退出,或者调用了SparkContext.stop(),Driver会终止Executor,并且通过集群管理器释放资源。
Yarn-Cluster流程
- Spark Yarn Client向Yarn中提交应用程序;
- ResourceManager收到请求后,在集群中选择一个NodeManager,并为该应用程序分配一个Container,在这个 Container 中启动应用程序的ApplicationMaster,ApplicationMaster进行SparkContext等的初始化;
- ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
- ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,并在获得的Container中启动 CoarseGrainedExecutorBackend,启动后会向ApplicationMaster中的SparkContext注册并申请Task;
- ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
- 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
有没有发现这和之前讲的Yarn调度MapReduce的流程很像,具体见Hadoop原理-任务创建流程
RDD与操作
actions
:这类算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统。transformations
:转换, 从现有RDD创建一个新的RDD。这种变换并不触发提交作业,只是完成作业中间过程处理。Transformation 是延迟计算的,也就是说从一个 RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
实践
本地docker启动spark(自带hadoop集群,使用yarn来启动spark)
1
docker run -it -p 8088:8088 -p 8042:8042 -h sandbox sequenceiq/spark:1.6.0 bash
也可通过brew来安装
1
2
3
4
5[~] brew install apache-spark
[/usr/local/Cellar/apache-spark/2.4.2/bin] ls
docker-image-tool.sh load-spark-env.sh run-example spark-class spark-sql sparkR
find-spark-home pyspark spark-beeline spark-shell spark-submit
安装包
1 | wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz |
配置
spark-env.sh
1
2
3
4
5
6
7
8
9
10
11cd $SPARK_HOME/conf
cp spark-env.sh.template spark-env.sh
# 配置spark-env.sh
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_IP=localhost
export SPARK_LOCAL_DIRS=$SPARK_HOME
export SPARK_DRIVER_MEMORY=1g
# spark-history-server, 需要在hdfs上创建/spark-logs 目录
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://localhost:9000/spark-logs"slaves
1
2cd $SPARK_HOME/conf
cp slaves.template slavesspark-defaults.conf (用于启用history-server)
1
2
3
4
5
6
7
8cp spark-defaults.conf.template spark-defaults.conf
# 配置spark-defaults.conf
spark.master spark://localhost:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/spark-logs
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 1g
启动
启动服务
1
2
3
4
5
6
7
8
9
10
11
12$SPARK_HOME/sbin/start-all.sh
# 查看多了Master和Worker进程
[root@aliyun-vm spark-3.0.1]# jps
9633 Worker
2005 SecondaryNameNode
1654 NameNode
9576 Master
9720 Jps
1790 DataNode
2286 ResourceManager
2415 NodeManager启动history-server (可选)
1
2
3
4
5
6
7
8
9
10
11
12
13$SPARK_HOME/sbin/start-history-server.sh
# 查看多了HistoryServer进程
[root@aliyun-vm spark-3.0.1]# jps
9633 Worker
9762 HistoryServer
2005 SecondaryNameNode
1654 NameNode
9576 Master
9804 Jps
1790 DataNode
2286 ResourceManager
2415 NodeManager访问web界面
spark-master: http://localhost:8080
History-server: http://localhost:18080连接到spark
其中
--deploy-mode
参数的意思是:是否发布驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
如果设置client模式, 驱动直接在spark-submit进程中启动,输入输出都可以显示在控制台.。所以这种模式特别适合REPL(读取-求值-输出循环), 比如Spark shell。python方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14# 在yarn上调度spark任务,完成后,可以在yarn上看到任务信息。
#提前使用hdfs dfs命令上传data目录下的users.txt到hdfs的根目录,然后执行以下操作。
$SPARK_HOME/bin/pyspark \
--master yarn \
--deploy-mode client
>>> textFile = spark.read.text("/users.txt")
>>> textFile.count()
7
>>> textFile.first()
Row(value=u'1,BarackObama,Barack Obama')
>>> linesWithSpark = textFile.filter(textFile.value.contains("Obama"))
>>> textFile.filter(textFile.value.contains("Obama")).count()
1提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13# 在spark-master中启动应用,完成后,可以在spark web ui中看到任务信息。
$SPARK_HOME/bin/spark-submit \
--master spark://localhost:7077 \
--deploy-mode client \
examples/src/main/python/pi.py \
10
# 在yarn中启动应用,完成后,可以在spark history web ui中看到任务信息。
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
hdfs://localhost:9000/pi.py \
10java方式
1
$SPARK_HOME/bin/spark-shell
Spark-SQL
将Hive元数据的配置文件放到spark/conf下:
1
2
3cd $SPARK_HOME
cd conf
cp ${HIVE_HOME}/conf/hive-site.xml . #hive-site.xml中配置了元数据的连接在Spark中加入mysql的driver驱动包(因为Hive元数据是存储在MySQL上)
cp包进$SPARK_HOME/jars注意
- Hive底层是HDFS,一定要保证HDFS进程是开启的。
- mysql的服务需要是开启的。
- 想要使用Spark整合Hive,在编译Spark的时候需要在编译指令中加入-Phive -Phive-thriftserver。
- 因为Spark配置中加入了hive-site.xml,所以默认的文件系统就是HDFS。
否则就是本地文件系统,create table的时候就会在本地创建/user/hive/warehouse目录 。