跳转至

Spark 使用 Yarn Rest 提交

https://community.cloudera.com/t5/Community-Articles/Starting-Spark-jobs-directly-via-YARN-REST-API/ta-p/245998 这篇文章是Spark 1.6, 已经过时,只能作为基本的参考,具体还是要阅读Spark on Yarn的提交代码。

  • 本文基于 Spark 2.4.8 和 Hadoop 3.2 进行验证。

将 Spark 作业提交到 Yarn上时,只能通过命令行 spark-submit 进行操作,本文通过解析 spark-submit 的源码,探究如何使用 Yarn Rest API 进行提交 Spark 作业(仅 cluster 模式,因 client 模式 driver 运行在 client 中而不是 AM 中)。

一句话总结:还是用命令行调用spark-submit

1. Yarn Rest API

1.1 创建Yarn 应用

POST http://rm-http-address:port/ws/v1/cluster/apps/new-application

返回体示例:application-id 和单个应用申请的不同资源的最大数量

{
    "application-id": "application_1632703810135_0002",
    "maximum-resource-capability": {
        "memory": 20480,
        "vCores": 4,
        "resourceInformations": {
            "resourceInformation": [
                {
                    "maximumAllocation": 9223372036854775807,
                    "minimumAllocation": 0,
                    "name": "memory-mb",
                    "resourceType": "COUNTABLE",
                    "units": "Mi",
                    "value": 20480
                },
                {
                    "maximumAllocation": 9223372036854775807,
                    "minimumAllocation": 0,
                    "name": "vcores",
                    "resourceType": "COUNTABLE",
                    "units": "",
                    "value": 4
                },
                {
                    "maximumAllocation": 9223372036854775807,
                    "minimumAllocation": 0,
                    "name": "yarn.io/gpu",
                    "resourceType": "COUNTABLE",
                    "units": "",
                    "value": 3
                }
            ]
        }
    }
}

1.2. 提交Yarn作业

POST http://rm-http-address:port/ws/v1/cluster/apps

请求体示例:具体字段见(https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API.28Submit_Application.29)

{
  "application-id": "application_1632703810135_0002",
  "application-name": "Spark_Yarn_Rest",
  "queue": "default",
  "priority": 10,
  "am-container-spec": {
    // Location of the resource to be localized
    "local-resources": {
       "entry": [
         {
           "key":"AppMaster.jar",
           "value": {
             "resource": "hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
             // options: "ARCHIVE", "FILE", and "PATTERN"
             "type" : "FILE",
             // options are "PUBLIC", "PRIVATE", and "APPLICATION"
             "visibility": "APPLICATION",
             // Size of the resource to be localized
             "size": 43004,
             // Timestamp of the resource to be localized
             "timestamp": 1405452071209
           }
         }
       ]
    },
    "environment": {
       "entry": [{
            "key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
            "value": "1405459400754"
          }, {
            "key": "CLASSPATH",
            "value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
          }, {
            "key": "DISTRIBUTEDSHELLSCRIPTLEN",
            "value": "6"
          }, {
            "key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
            "value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
          }
        ]
    },
    "commands": {
      "command": "{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
    },
    "service-data": null,
    "credentials": null,
    "application-acls": null
  },
  "unmanaged-AM": false,
  "max-app-attempts": 3,
  "resource": {
     "memory":1024,
     "vCores":1,
     "yarn.io/gpu": 1
  },
  "application-type": "Spark",
  "keep-containers-across-application-attempts": true,
  "application-tags": {

  },
  "log-aggregation-context": {

  },
  "attempt-failures-validity-interval": 3600000,
  "reservation-id": null,
  "am-black-listing-requests": null  
}

2. Spark 提交原理

当前只讨论Yarn Cluster模式,Client模式下Driver在Client中,无法直接提供Yarn Rest服务

  • 客户端:org.apache.spark.deploy.yarn.Client

  • AppMaster: org.apache.spark.deploy.yarn.ApplicationMaster

主要目标:

  • 构造AppMaster的命令行和相关配置参数(以scala提交jar包形式,其它模式可能含有不同的操作);
  • 其它的python提交,kerberos配置等,都是额外的local resource配置,原理大致一样;

2.1 spark-submit.sh 入口

  • 设置ENV SPARK_HOME
  • 设置ENV PYTHONHASHSEED0

  • 执行如下类似的命令

exec /home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --supervise \
--executor-memory 2G --total-executor-cores 3 \
examples/jars/spark-examples_2.11-2.4.8.jar 1000
  • spark-class里,执行的操作
  • 设置ENV SPARK_CONF_DIR
  • spark-env.sh中设置的变量,全部export;
  • 设置ENV SPARK_SCALA_VERSION
  • 执行如下命令,获取启动命令
/opt/tmp_workspace/jdk1.8.0_201/bin/java -Xmx128m -cp \
'/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/jars/*' \
org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit \
--class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster \
--supervise --executor-memory 2G --total-executor-cores 3 \
examples/jars/spark-examples_2.11-2.4.8.jar 1000
  • Main中,针对SparkSubmit,执行的操作:
  • 构建CLASSPATH,添加spark.driver.extraClasspath$SPARK_CONF_DIR$SPARK_HOME/jars, $HADOOP_CONF_DIR$YARN_CONF_DIR
  • 构建之后的执行命令如下所示:
exec /opt/tmp_workspace/jdk1.8.0_201/bin/java -cp '/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/conf/:/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/jars/*:/home/hadoop/workspace/hadoop-2.7.2/etc/hadoop/' \
org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster \
--class org.apache.spark.examples.SparkPi --supervise --executor-memory 2G \
--total-executor-cores 3 examples/jars/spark-examples_2.11-2.4.8.jar 1000

2.2 SparkSubmit

执行的逻辑:

  • proxyUser的用户身份,执行以下操作;
  • YarnClusterApplication::start()调用Client::run()

2.3 Client

  • 在 HDFS 的 /user/${user}/.sparkStaging/${appId}下建立目录,用于Yarn的localResource,如下所示;
  • 上传spark配置__spark_conf__.zip和应用的jar包到HDFS的路径;
  • 对这两个文件,作为Yarn AM的 local resource;
$ hdfs dfs -ls /user/hadoop/.sparkStaging/application_1631928041971_0067
-rw-r--r--   2 hadoop supergroup     215195 2021-09-28 11:21 /user/hadoop/.sparkStaging/application_1631928041971_0067/__spark_conf__.zip
-rw-r--r--   2 hadoop supergroup    2017859 2021-09-28 11:21 /user/hadoop/.sparkStaging/application_1631928041971_0067/spark-examples_2.11-2.4.8.jar
  • __spark_conf__.zip目录组织如下:

  • __hadoop_conf__中包含HADOOP_CONF_DIRYARN_CONF_DIR下的所有文件;

    • 包含SPARK_CONF_DIR下的xml文件;
  • 包含CLASSPATH中的log4j.propertiesmetrics.properties文件;

    • 将Hadoop Conf 写到__spark_hadoop_conf__.xml

    • 将Spark Conf 写到__spark_conf__.properties

drwx------. 2 hadoop hadoop  4096 Sep 26 15:32 __hadoop_conf__
-r-x------. 1 hadoop hadoop 33785 Sep 26 15:32 __spark_conf__.properties
-r-x------. 1 hadoop hadoop 96226 Sep 26 15:32 __spark_hadoop_conf__.xml
-r-x------. 1 hadoop hadoop  1523 Sep 26 15:32 log4j.properties
-r-x------. 1 hadoop hadoop   523 Sep 26 15:32 metrics.properties
  • 添加的额外的spark的属性
  • spark.yarn.cache.confArchive__spark_conf__.zip的路径(作为其它container的配置);
  • spark.yarn.cache.filenamesspark.yarn.cache.timestampsspark.yarn.cache.visibilitiesspark.yarn.cache.typesspark.yarn.cache.sizes:spark自身的jar包,每一个都是Yarn app的localResource。

2.4 ApplicationMaster

AM启动命令,类似如下:

  • {{PWD}}这个是HADOOP内置的变量的使用形式;
  • <LOG_DIR>是跟应用相关的配置,会被Hadoop自动替换;

  • --properties-file是最重要的参数,其来源可以见 2.3 节;

{{JAVA_HOME}}/bin/java -server -Xmx4096m -Djava.io.tmpdir={{PWD}}/tmp \
-Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ApplicationMaster \
--class 'org.apache.spark.examples.SparkPi' --jar file:/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.8.jar --arg 1000 \
--properties-file {{PWD}}/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr

--properties-file可以不指定,那么SparkConf的配置,会从ENV或者Java System Properties中读取,因此可以将文件里的spark配置,作为命令行Java -D参数传入(但是这个可能会导致命令行的长度非常长,具体见 issue [SPARK-14602] 和 解决 https://github.com/apache/spark/pull/12487 )。

同时,会将spark.yarn.cache.confArchive作为Executor的local resource(可以不需要)。

3 示例

3.1 创建应用,并获取应用ID

POST http://rm-http-address:port/ws/v1/cluster/apps/new-application

3.2 执行应用

以 spark example pi 为例:

3.2.1 Spark自身的jars设置

AM 设置,两种方案:

  • 通过Yarn local resource,将jar包上传到hdfs上,并在http请求体中配置每一个的jar的local resource(大概226个),或者先手动合成一个jar包,再配置(只需HDFS一份);
  • 直接通过CLASSPATH,在Yarn的每个节点都在相同位置安装spark的jars文件,然后在HTTP请求体中的CLASSPATH指定该路径(所有Yarn nodemanager节点一份);

Executor设置,两种方案:

  • 通过Yarn local resource,Spark(spark.yarn.jar) 是通过先将Spark自身的jar包设置spark.yarn.cache.filenames | timestamps | visibilities | sizes | types,放在__spark_conf__.properties里,然后解析并配置为Executor的local resource;
  • 直接通过CLASSPATH,在__spark_conf__.properties里设置spark.executor.extraClasspath,路径为spark的jars文件(所有Yarn nodemanager节点一份)

3.2.2 Spark应用程度的jar设置

因为应用程序本身的jar包,是不能预先存在集群上的,且上传到全部节点也不现实(除非有NFS等),因此对于应用程序本身的jar包,需要通过Yarn local resource进行配置,仍然分为AM和Executor两部分的配置。

前提准备:

  • 将应用程序jar包,上传到HDFS上,获取到JDFS上该文件的路径、大小、时间戳等信息;

AM:

  • 在HTTP的请求体中构建,如下形式,其中resource、size、timestamp字段需要按照真实值进行修改:
{
  "key":"__app__.jar",
  "value": {
    "resource": "hdfs://node131:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/spark-examples_2.11-2.4.8.jar",
    "type" : "FILE",
    "visibility": "PRIVATE",
    "size": 2017859,
    "timestamp": 1632823976707
  }
}

Executor

  • _spark_conf__.properties,配置spark.yarn.cache.filenames | timestamps | visibilities | sizes | types,如下所示
spark.yarn.cache.filenames=hdfs://node131:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/spark-examples_2.11-2.4.8.jar
spark.yarn.cache.timestamps=1632823976707
spark.yarn.cache.visibilities=PRIVATE
spark.yarn.cache.sizes=2017859
spark.yarn.cache.types=FILE

3.2.2 Spark配置

1)构建__spark_conf__.zip

根据 2.3 节内容,可知zip包格式如下:

# hadoop的配置目录
- __hadoop_conf__
  - hdfs-site.xml
  - ...

# spark 配置,核心文件
- __spark_conf__.properties

# hadoop 配置,优先级比__hadoop_conf__高
- _spark_hadoop_conf__.xml

__spark_conf__.zip作为AM的Yarn local resource,在HTTP请求格式,类似如下:

{
  "key":"__spark_conf__",
  "value": {
    // resource, size, timestamp 需要修改(注意是ARCHIVE类型)
    "resource": "hdfs://node131:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/__spark_conf__.zip",
    "type" : "ARCHIVE",
    "visibility": "APPLICATION",
    "size": 18023,
    "timestamp": 1632825115975
  }
}

2)或 作为命令行/系统配置传递

__spark_conf__.properties中的配置,可以作为HTTP请求体中的command的java 系统配置传入(形式-Dspark.a.b.c=de的格式

3.2.3 调用Restful

注:

  • 对于Spark 自身的jar包,采用所有节点相同目录部署的形式;
  • Spark自身应用程序的jar包,采用Yarn local resource的形式;

Yarn RM接口:POST http://rm-http-address:port/ws/v1/cluster/apps

1)只需要 __spark_conf__.properties

注: 如果只需要__spark_conf__.properties 文件,也可以只设置这个文件的local resource,上传到HDFS也可以只上传该文件,而不需要整个__spark_conf__.zip

__spark_conf__.properties示例:

#Spark configuration.
#Tue Sep 28 17:09:43 CST 2021
spark.executor.memory=2G
spark.cores.max=3
# 需要修改
spark.yarn.cache.confArchive=hdfs\://node131\:9000/tmp/__app__.properties
spark.eventLog.compress=true
spark.submit.deployMode=cluster
spark.ui.killEnabled=false
spark.eventLog.enabled=true
spark.yarn.jars=
spark.yarn.historyServer.address=172.16.2.131\:18080
spark.master=yarn
spark.executor.cores=1
spark.app.name=org.apache.spark.examples.SparkPi
spark.port.maxRetries=100
spark.eventLog.dir=hdfs\://node131\:9000/history
spark.driver.extraClassPath=/extraSparkJars
spark.executor.extraClassPath=/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/jars/*
# filenames, timestamps, sizes 需要修改
spark.yarn.cache.filenames=hdfs\://node131\:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/spark-examples_2.11-2.4.8.jar#__app__.jar
spark.yarn.cache.timestamps=1632823976707
spark.yarn.cache.visibilities=PRIVATE
spark.yarn.cache.sizes=2017859
spark.yarn.cache.types=FILE

Yarn Rest 接口的请求体示例:

{
  // 修改应用id
  "application-id": "application_1631928041971_0085",
  "application-name": "Spark_Yarn_Rest",
  "queue": "default",
  "priority": 10,
  "am-container-spec": {
    "local-resources": {
       "entry": [
         {
           "key":"__app__.properties",
           "value": {
             // resource, size, timestamp 需要修改
             "resource": "hdfs://node131:9000/tmp/__spark_conf__.properties",
             "type" : "FILE",
             "visibility": "APPLICATION",
             "size": 990,
             "timestamp": 1632879896059
           }
         }, {
           "key":"__app__.jar",
           "value": {
             // resource, size, timestamp 需要修改
             "resource": "hdfs://node131:9000/tmp/spark-examples_2.11-2.4.8.jar",
             "type" : "FILE",
             "visibility": "PRIVATE",
             "size": 2017859,
             "timestamp": 1632822059125
           }
         }
       ]
    },
    "environment": {
       "entry": [{
            "key": "CLASSPATH",
            "value": "{{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/jars/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>{{PWD}}/__spark_conf__/__hadoop_conf__<CPS>"
          },
          {
            "key": "SPARK_USER",
            "value": "hadoop"
          },
          {
            "key": "PYTHONHASHSEED",
            "value": "0"
          }
        ]
    },
    "commands": {
      "command": "{{JAVA_HOME}}/bin/java -server -Xmx4096m -Djava.io.tmpdir={{PWD}}/tmp -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.examples.SparkPi' --jar file:/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.8.jar --arg 1000 --properties-file {{PWD}}/__app__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr"
    },
    "service-data": null,
    "credentials": null,
    "application-acls": null
  },
  "unmanaged-AM": false,
  "max-app-attempts": 2,
  "resource": {
     "memory":1024,
     "vCores":1
  },
  "application-type": "Spark",
  "keep-containers-across-application-attempts": true,
  "application-tags": {

  },
  "log-aggregation-context": {

  },
  "attempt-failures-validity-interval": 3600000,
  "reservation-id": null,
  "am-black-listing-requests": null  
}

2)采用__spark_conf__.zip传递配置

其中有注释的配置,需要进行修改后,删除注释,再进行调用:

{
  // 修改应用id
  "application-id": "application_1631928041971_0085",
  "application-name": "Spark_Yarn_Rest",
  "queue": "default",
  "priority": 10,
  "am-container-spec": {
    "local-resources": {
       "entry": [
         {
           "key":"__spark_conf__",
           "value": {
             // resource, size, timestamp 需要修改
             "resource": "hdfs://node131:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/__spark_conf__.zip",
             "type" : "ARCHIVE",
             "visibility": "APPLICATION",
             "size": 18023,
             "timestamp": 1632825115975
           }
         }, {
           "key":"__app__.jar",
           "value": {
             // resource, size, timestamp 需要修改
             "resource": "hdfs://node131:9000/user/hadoop/.sparkStaging/application_1631928041971_0087/spark-examples_2.11-2.4.8.jar",
             "type" : "FILE",
             "visibility": "PRIVATE",
             "size": 2017859,
             "timestamp": 1632823976707
           }
         }
       ]
    },
    "environment": {
       "entry": [{
            "key": "CLASSPATH",
             // resource, size, timestamp 需要修改
            "value": "/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/jars/*<CPS>{{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>{{PWD}}/__spark_conf__/__hadoop_conf__<CPS>"
          },
          {
            "key": "SPARK_USER",
            "value": "hadoop"
          },
          {
            "key": "PYTHONHASHSEED",
            "value": "0"
          }
        ]
    },
    "commands": {
      "command": "{{JAVA_HOME}}/bin/java -server -Xmx4096m -Djava.io.tmpdir={{PWD}}/tmp -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.examples.SparkPi' --jar file:/home/hadoop/workspace/spark-2.4.8-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.8.jar --arg 1000 --properties-file {{PWD}}/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr"
    },
    "service-data": null,
    "credentials": null,
    "application-acls": null
  },
  "unmanaged-AM": false,
  "max-app-attempts": 2,
  "resource": {
     "memory":1024,
     "vCores":1
  },
  "application-type": "Spark",
  "keep-containers-across-application-attempts": true,
  "application-tags": {

  },
  "log-aggregation-context": {

  },
  "attempt-failures-validity-interval": 3600000,
  "reservation-id": null,
  "am-black-listing-requests": null  
}