Spark介绍

Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性。它是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入Hadoop的生态系统,以弥补缺失MapReduce的不足。

通过下面这张图,可以了解到Spark的主要组件及其功能。

可以看到,为了兼容Apache大数据生态,Spark在底层支持三种资源调度框架:

  1. Standalone: Spark原生的资源管理,由Master负责资源的分配。
  2. Apache Mesos: 与 Hadoop MapReduce兼容性良好的一种资源调度框架。
  3. Hadoop Yarn: 主要是指的Yarn中的ResourceManager。

架构

Spark内部主要涉及两个核心组件,下面是其关系图。

预习知识: 用户编写的Spark应用程序,其中包含了Driver功能的代码和分布在集群中多个节点上运行的Executor代码。

  • Driver:在一个Spark集群中,有一个节点负责中央协调,调度各个分布式工作节点;这个中央协调节点被称为驱动器节点。Driver运行Application的main()函数并创建SparkContext(负责与ClusterManager通信,进行资源的申请、任务的分配和监控等)。

    1. 把用户程序(Application)转为任务
    2. 为执行器调度任务
  • Executor:与之对应的工作节点。

    1. 负责运行组成Spark应用的任务,并将结果返回给Driver;
    2. 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。

任务执行过程

通用流程

  1. 用户通过 spark-submit 脚本提交应用;
  2. spark-submit 脚本启动Driver,调用用户定义的 main() 方法;
  3. Driver与集群管理器通信,申请资源以启动Executor;
  4. 集群管理器为Driver启动Executor;
  5. Driver执行用户应用中的操作(根据程序中所定义的对RDD的转化操作和行动操作,Drvier把工作以任务的形式发送到Executor);
  6. 任务在Executor中进行计算并保存结果;
  7. 如果Driver的 main() 方法退出,或者调用了SparkContext.stop(),Driver会终止Executor,并且通过集群管理器释放资源。

Yarn-Cluster流程

  1. Spark Yarn Client向Yarn中提交应用程序;
  2. ResourceManager收到请求后,在集群中选择一个NodeManager,并为该应用程序分配一个Container,在这个 Container 中启动应用程序的ApplicationMaster,ApplicationMaster进行SparkContext等的初始化;
  3. ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
  4. ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,并在获得的Container中启动 CoarseGrainedExecutorBackend,启动后会向ApplicationMaster中的SparkContext注册并申请Task;
  5. ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
  6. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

有没有发现这和之前讲的Yarn调度MapReduce的流程很像,具体见Hadoop原理-任务创建流程

RDD与操作

  • actions:这类算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统。
  • transformations:转换, 从现有RDD创建一个新的RDD。这种变换并不触发提交作业,只是完成作业中间过程处理。Transformation 是延迟计算的,也就是说从一个 RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

实践

  • 本地docker启动spark(自带hadoop集群,使用yarn来启动spark)

    1
    docker run -it -p 8088:8088 -p 8042:8042 -h sandbox sequenceiq/spark:1.6.0 bash
  • 也可通过brew来安装

    1
    2
    3
    4
    5
    [~] brew install apache-spark

    [/usr/local/Cellar/apache-spark/2.4.2/bin] ls
    docker-image-tool.sh load-spark-env.sh run-example spark-class spark-sql sparkR
    find-spark-home pyspark spark-beeline spark-shell spark-submit
0%