Spark介绍

Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性。它是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入Hadoop的生态系统,以弥补缺失MapReduce的不足。

通过下面这张图,可以了解到Spark的主要组件及其功能。

可以看到,为了兼容Apache大数据生态,Spark在底层支持三种资源调度框架:

  1. Standalone: Spark原生的资源管理,由Master负责资源的分配。
  2. Apache Mesos: 与 Hadoop MapReduce兼容性良好的一种资源调度框架。
  3. Hadoop Yarn: 主要是指的Yarn中的ResourceManager。

架构

Spark内部主要涉及两个核心组件,下面是其关系图。

预习知识: 用户编写的Spark应用程序,其中包含了Driver功能的代码和分布在集群中多个节点上运行的Executor代码。

  • Driver:在一个Spark集群中,有一个节点负责中央协调,调度各个分布式工作节点;这个中央协调节点被称为驱动器节点。Driver运行Application的main()函数并创建SparkContext(负责与ClusterManager通信,进行资源的申请、任务的分配和监控等)。

    1. 把用户程序(Application)转为任务
    2. 为执行器调度任务
  • Executor:与之对应的工作节点。

    1. 负责运行组成Spark应用的任务,并将结果返回给Driver;
    2. 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。

任务执行过程

通用流程

  1. 用户通过 spark-submit 脚本提交应用;
  2. spark-submit 脚本启动Driver,调用用户定义的 main() 方法;
  3. Driver与集群管理器通信,申请资源以启动Executor;
  4. 集群管理器为Driver启动Executor;
  5. Driver执行用户应用中的操作(根据程序中所定义的对RDD的转化操作和行动操作,Drvier把工作以任务的形式发送到Executor);
  6. 任务在Executor中进行计算并保存结果;
  7. 如果Driver的 main() 方法退出,或者调用了SparkContext.stop(),Driver会终止Executor,并且通过集群管理器释放资源。

Yarn-Cluster流程

  1. Spark Yarn Client向Yarn中提交应用程序;
  2. ResourceManager收到请求后,在集群中选择一个NodeManager,并为该应用程序分配一个Container,在这个 Container 中启动应用程序的ApplicationMaster,ApplicationMaster进行SparkContext等的初始化;
  3. ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
  4. ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,并在获得的Container中启动 CoarseGrainedExecutorBackend,启动后会向ApplicationMaster中的SparkContext注册并申请Task;
  5. ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
  6. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

有没有发现这和之前讲的Yarn调度MapReduce的流程很像,具体见Hadoop原理-任务创建流程

RDD与操作

  • actions:这类算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统。
  • transformations:转换, 从现有RDD创建一个新的RDD。这种变换并不触发提交作业,只是完成作业中间过程处理。Transformation 是延迟计算的,也就是说从一个 RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

实践

  • 本地docker启动spark(自带hadoop集群,使用yarn来启动spark)

    1
    docker run -it -p 8088:8088 -p 8042:8042 -h sandbox sequenceiq/spark:1.6.0 bash
  • 也可通过brew来安装

    1
    2
    3
    4
    5
    [~] brew install apache-spark

    [/usr/local/Cellar/apache-spark/2.4.2/bin] ls
    docker-image-tool.sh load-spark-env.sh run-example spark-class spark-sql sparkR
    find-spark-home pyspark spark-beeline spark-shell spark-submit

安装包

1
2
3
4
5
6
wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

tar -xzvf spark-3.0.1-bin-hadoop3.2.tgz
mv spark-3.0.1-bin-hadoop3.2 spark-3.0.1

pip install pyspark

配置

  • spark-env.sh

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    cd $SPARK_HOME/conf
    cp spark-env.sh.template spark-env.sh

    # 配置spark-env.sh
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export SPARK_MASTER_IP=localhost
    export SPARK_LOCAL_DIRS=$SPARK_HOME
    export SPARK_DRIVER_MEMORY=1g

    # spark-history-server, 需要在hdfs上创建/spark-logs 目录
    export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://localhost:9000/spark-logs"
  • slaves

    1
    2
    cd $SPARK_HOME/conf
    cp slaves.template slaves
  • spark-defaults.conf (用于启用history-server)

    1
    2
    3
    4
    5
    6
    7
    8
    cp spark-defaults.conf.template spark-defaults.conf

    # 配置spark-defaults.conf
    spark.master spark://localhost:7077
    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://localhost:9000/spark-logs
    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.driver.memory 1g

启动

  • 启动服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    $SPARK_HOME/sbin/start-all.sh

    # 查看多了Master和Worker进程
    [root@aliyun-vm spark-3.0.1]# jps
    9633 Worker
    2005 SecondaryNameNode
    1654 NameNode
    9576 Master
    9720 Jps
    1790 DataNode
    2286 ResourceManager
    2415 NodeManager
  • 启动history-server (可选)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    $SPARK_HOME/sbin/start-history-server.sh

    # 查看多了HistoryServer进程
    [root@aliyun-vm spark-3.0.1]# jps
    9633 Worker
    9762 HistoryServer
    2005 SecondaryNameNode
    1654 NameNode
    9576 Master
    9804 Jps
    1790 DataNode
    2286 ResourceManager
    2415 NodeManager
  • 访问web界面
    spark-master: http://localhost:8080
    History-server: http://localhost:18080

  • 连接到spark

    其中--deploy-mode参数的意思是:是否发布驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
    如果设置client模式, 驱动直接在spark-submit进程中启动,输入输出都可以显示在控制台.。所以这种模式特别适合REPL(读取-求值-输出循环), 比如Spark shell。

    python方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 在yarn上调度spark任务,完成后,可以在yarn上看到任务信息。
    #提前使用hdfs dfs命令上传data目录下的users.txt到hdfs的根目录,然后执行以下操作。

    $SPARK_HOME/bin/pyspark \
    --master yarn \
    --deploy-mode client
    >>> textFile = spark.read.text("/users.txt")
    >>> textFile.count()
    7
    >>> textFile.first()
    Row(value=u'1,BarackObama,Barack Obama')
    >>> linesWithSpark = textFile.filter(textFile.value.contains("Obama"))
    >>> textFile.filter(textFile.value.contains("Obama")).count()
    1

    提交任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 在spark-master中启动应用,完成后,可以在spark web ui中看到任务信息。
    $SPARK_HOME/bin/spark-submit \
    --master spark://localhost:7077 \
    --deploy-mode client \
    examples/src/main/python/pi.py \
    10

    # 在yarn中启动应用,完成后,可以在spark history web ui中看到任务信息。
    $SPARK_HOME/bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    hdfs://localhost:9000/pi.py \
    10

    java方式

    1
    $SPARK_HOME/bin/spark-shell

Spark-SQL

  • 将Hive元数据的配置文件放到spark/conf下:

    1
    2
    3
    cd $SPARK_HOME
    cd conf
    cp ${HIVE_HOME}/conf/hive-site.xml . #hive-site.xml中配置了元数据的连接
  • 在Spark中加入mysql的driver驱动包(因为Hive元数据是存储在MySQL上)
    cp包进$SPARK_HOME/jars
    注意

    1. Hive底层是HDFS,一定要保证HDFS进程是开启的。
    2. mysql的服务需要是开启的。
    3. 想要使用Spark整合Hive,在编译Spark的时候需要在编译指令中加入-Phive -Phive-thriftserver。
    4. 因为Spark配置中加入了hive-site.xml,所以默认的文件系统就是HDFS。
      否则就是本地文件系统,create table的时候就会在本地创建/user/hive/warehouse目录 。
0%