大数据计算框架Spark之使用Hadoop YARN

概述

确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 指向包含 Hadoop(客户端)配置文件的目录。这些配置文件用于写HDFS和连接YARN ResourceManager。这个目录下包含的配置文件被分发给YARN集群,这样application所使用的容器都使用同样的配置。如果配置引用了Java系统属性或非YARN管理的环境变量,他们也应该被配置在Spark application的配置中(driver,executors,client 模式下的AM)。

有两种YARN上的部署模式。cluster模式,Spark driver运行在由YARN管理的一个application master进程内。启动这个application后,客户端可以撤离。client模式,driver运行在客户端进程内,application master仅被用于向YARN请求资源。

和Spark standalone,Mesos不同,部署在YARN上,master的地址由--master指定,ResourceManager的地址从Hadoop配置中获取。这样--master配置为”yarn”。

以cluster模式在YARN上运行application

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] [app options]

例如:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \\

--master yarn \\

--deploy-mode cluster \\

--driver-memory 4g \\

--executor-memory 2g \\

--executor-cores 1 \\

--queue thequeue \\

lib/spark-examples*.jar \\

10

上面启动一个 YARN 客户端程序,这个客户端程序启动了默认的 Application Master。SparkPi将作为Application Master的子线程运行。客户端定期地轮询Application Master获得状态的更新并将在控制台展示出来。一旦Application 运行完成客户端就退出。

添加其他的jar

在cluster模式下,driver运行在与客户端不同的机器上,因此SparkContext.addJar不能使用位于客户端本地的文件。为了使SparkContext.addJar可以利用客户端本地文件,通过--jars指定这些文件(jar)。

$ ./bin/spark-submit --class my.main.Class \\

--master yarn \\

--deploy-mode cluster \\

--jars my-other-jar.jar,my-other-other-jar.jar \\

my-main-jar.jar \\

app_arg1 app_arg2

准备工作

在YARN上运行Spark需要YARN 支持的binary distribution of Spark(Spark的二进制分布式文件),可以下载他:

http://spark.apache.org/downloads.html

为了能够在YARN上使用Spark运行时jar(Spark runtime jars),需要设置spark.yarn.archive 或spark.yarn.jars,如果没有指定其中之一,那么Spark会将$SPARK_HOME/jars下的文件打包为zip文件并上传到分布式缓存。

为了省去这个过程,将jar包放到HDFS上,并在程序中设置这个参数:

sparkConf.set("spark.yarn.jar", "hdfs://192.168.106.210:8020/sparkjar/spark-assembly-1.6.0-hadoop2.6.0.jar");

调试应用

在YARN术语中,executors和application masters运行在“containers”中。YARN有两种处理应用完成后的container日志的模式。如果启用日志聚合(配置yarn.log-aggregation-enable),container日志会被拷贝到HDFS并将本机日志删除。在集群的任何一台机器上都可以使用yarn logs这个命令查看日志。

yarn logs -applicationId

打印指定application的全部容器的所有日志的全部内容。也可以使用HDFS Shell或API查看日志。通过查看yarn.nodemanager.remote-app-log-dir 和yarn.nodemanager.remote-app-log-dir-suffix可以找到日志文件的目录。也可以在Spark Web UI中找到这些日志,前提是Spark history server和MapReduce history server必须运行,并且配置了yarn-site.xml的yarn.log.server.url属性。Spark history server的日志URL会重定向到MapReduce history server以展示聚合信息。

日志聚合没有开启的话,每台机器生成的日志仍然保存在本地,存储路径通过YARN_APP_LOGS_DIR配置指定,常被指定为/tmp/logs 或$HADOOP_HOME/logs/userlogs,这取决于Hadoop版本和具体的安装情况。这时查看container日志要到包含那个container的主机去查。

要想回顾每个container的运行环境,增大yarn.nodemanager.delete.debug-delay-sec的值(如36000),并且通过设置yarn.nodemanager.local-dirs来使用每个在container启动的节点上

的application缓存。这个目录包含了启动脚本,jar和所有用来启动container的环境变量。这个过程有助于调试路径问题。注意,启用此功能需要集群设置的管理员权限并且还要重启所有node manager,因此这不适用于托管集群。

application master或executor要使用自定义的log4j配置,一些选项如下:

1)使用spark-submit命令,将log4j.properties文件路径加入--files文件列表,实现上传log4j.properties。

2)设置spark.driver.extraJavaOptions(对于驱动)或spark.executor.extraJavaOptions(对于executors)为

-Dlog4j.configuration=<location>。如果使用文件,应明确指出file:协议并且这个文件在每个节点都应存在。/<location>

3)更新$SPARK_CONF_DIR/log4j.properties并且这个配置文件会和其他配置一起上传到集群。如果指定了多个选项,那么上两个选项的优先权比此项高。

注意,对于第一个选项,所有的driver和executor将共享相同的log4j配置文件,如果这些driver和executor都运行在一个节点上就会有问题(日志都写到一个文件里)。

如果你需要一个引用,指向YARN上的正确位置来放置日志文件,以便YARN能够正确地展示和聚合他们,那么在log4j.properties中配置spark.yarn.app.container.log.dir。例如log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log。对于streaming application,配置RollingFileAppender并且将文件路径设置为YARN的日志目录,这样可以避免大日志文件引起磁盘溢出,并且YARN日志工具可以访问这些日志文件。

为了使用application master和executor的自定义度量属性,更新$SPARK_CONF_DIR/metrics.properties文件。这个文件连同其他配置一起被上传,这样你就不用使用--files来指定他。

Spark Properties(Spark属性)

注释

1)调度描述中的核心请求是否会执行取决于使用的调度器(scheduler )和如何配置他。

2)在cluster模式在,Spark executors和Spark driver使用的本地目录

是YARN上配置的本地目录(Hadoop YARN 的配yarn.nodemanager.local-dirs)

如果用户指定了spark.local.dir,他会被忽略。在client模式中,Spark executors会使用YARN上配置的本地目录,然而Spark driver会使用spark.local.dir。这是因为client模式下Spark driver不会在YARN集群上运行,只有Spark executors在YARN集群上运行。

3)--files和--archives选项支持以#指定的文件名,这与hadoop类似。例如可以指定:--files localtest.txt#appSees.txt,会将本地名为localtest.txt的文件上传到HDFS上,但会使用appSees.txt来连接,当应用运行在YARN上时,要使用appSees.txt来引用这个文件。

4)如果模式为cluster,--jars选项使用本地文件,那么他使SparkContext.addJar这个方法起作用。如果使用HDFS, HTTP, HTTPS, 或FTP,那么不必使用--jars。

在一个安全的集群上运行

在安全的Hadoop集群上使用Kerberos来认证与服务端和客户端相关联的主体。这样客户端就能向已认证的服务端发起请求。服务端为已认证的主体赋予权限。

Hadoop服务发行hadoop tokens来授予访问服务和获得数据的权限。客户端必须先获得与要访问服务相关的tokens,然后连同application一起发生给YARN集群。

Spark application与Hadoop文件系统(HDFS,webhdfs),HBase,Hive等交互,他必须获得相关的tokens,这个tokens使用提交application的用户的Kerberos凭据,这样这个身份的主体就成为了提交Spark application的主体。

一般在提交应用的时候,Spark自动从集群的默认hadoop文件系统获得一个token,有可能是HBase或Hive。

如果HBase在classpath中,一个HBase token将被获得。HBase配置声明了application是安全的(例如hbase-site.xml设置hbase.security.authentication为kerberos),并且spark.yarn.security.credentials.hbase.enabled设置为true。

相似地,如果Hive在classpath中,Hive token将被获得。他的配置包含了存储元数据的URI(hive.metastore.uris),并且spark.yarn.security.credentials.hive.enabled设置为true。

如果application需要和其他的安全的hadoop文件系统交互,那么要使用这些token访问集群,就必须在调用时明确地指出,可通过spark.yarn.access.hadoopFileSystems这个属性列出来集群

spark.yarn.access.hadoopFileSystems hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/

Spark支持通过Java Services机制与安全感知系统交互(参见java.util.ServiceLoader)。为了实现这一功能,org.apache.spark.deploy.yarn.security.ServiceCredentialProvider的实现可用于Spark,通过在jar的META-INF/services路径中的相应的文件列出他们的名字,可以配置spark.yarn.security.credentials.{service}.enabled为false来禁用这个插件,{service}是凭据提供者的名字。

配置外部的Shuffle服务

在每个NodeManager上启动Spark Shuffle Service,请遵照以下指令:

1)构建Spark,如果使用构建好的,跳过此步骤。

2)找到spark-<version>-yarn-shuffle.jar。如果自己构建Spark,那么应该放在$SPARK_HOME/common/network-yarn/target/scala-<version>目录下,如果使用分发版本,应该放在YARN下。/<version>/<version>

3)把这个jar放到集群的所有的NodeManager下。

4)在每个节点的yarn-site.xml文件中,添加spark_shuffle到yarn.nodemanager.aux-services,设置yarn.nodemanager.aux-services.spark_shuffle.class为org.apache.spark.network.yarn.YarnShuffleService

5)在etc/hadoop/yarn-env.sh中,通过设置YARN_HEAPSIZE 增大NodeManager的堆内存,可避免Shuffle过程的垃圾收集问题。

6)重启所有NodeManager。

shuffle service运行在YARN上时,额外的参数是:

spark.yarn.shuffle.stopOnFailure默认是false,这个参数的含义是当Spark Shuffle Service初始化失败时,是否停止NodeManager。当Spark Shuffle Service没有在NodeManager上运行,而NodeManager的container正在运转,这样的错误可以通过设置上面的参数来阻止。

Kerberos排错

记录针对Kerberos操作的额外日志,设置HADOOP_JAAS_DEBUG这个环境变量

bash export HADOOP_JAAS_DEBUG=true

通过系统属性sun.security.krb5.debug和sun.security.spnego.debug=true来配置JDK类,即可开启针对Kerberos和SPNEGO/REST认证的日志。

-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

在Application Master中,可以启用上面的所有选项

spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

如果日志级别org.apache.spark.deploy.yarn.Client被设置为DEBUG,日志会包含获得的所有token,以及这些token到期的详细信息。

使用Spark History Server

应用端设置Spark的配置为spark.yarn.historyServer.allowTracking=true

在Spark History Server,添加org.apache.spark.deploy.yarn.YarnProxyRedirectFilter到spark.ui.filters配置列表。

实践

在集群中选择一台服务器,输入命令:spark-submit 回车,得到如下命令模式及参数注释:

注意:

1)在YARN上运行,那么master一定是 yarn。

2)deploy-mode值有两个,一个是cluster,一个是client。在程序中无法指定deploy-mode,只能由spark-submit命令指定。如果在命令中不指定deploy-mode参数,那么运行方式为client(即默认)。

3)可以通过命令设定master的值,也可以在程序中设置。

4)指定运行应用的用户账号。运行spark-submit命令默认以root身份执行命令(即登录Linux服务器的身份),但是Spark访问HDFS会遇到权限问题:

18/10/25 08:57:26 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="/home":hdfs:supergroup:d-wx------
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:316)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:243)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1846)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1830)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkOwner(FSDirectory.java:1775)
at org.apache.hadoop.hdfs.server.namenode.FSDirAttrOp.setPermission(FSDirAttrOp.java:64)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.setPermission(FSNamesystem.java:1886)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.setPermission(NameNodeRpcServer.java:849)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.setPermission(ClientNamenodeProtocolServerSideTranslatorPB.java:509)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)

这一句:

yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="/home":hdfs:supergroup:d-wx------

充分显示了问题,即以root用户访问/home文件,然而其属于hdfs这个用户,那么要以hdfs用户身份执行spark-submit命令,使用:

sudo -u hdfs spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] [app options]

有博文说添加:System.setProperty("HADOOP_USER_NAME", "hdfs");然而只能解决以client模式运行的应用,对于cluster模式这种方式不起作用,所以还是指定执行脚本的用户这种方式最可靠。

例:不指定master和deploy-mode

spark-submit --class net.cnki.xtsj.a_statistical_information.WordRetrievedStatistics \\

/home/a-statistical-information-0.0.1-SNAPSHOT-jar-with-dependencies.jar

在程序中指定master为yarn:

SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("write words to mongodb");