大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

概述

确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 指向包含 Hadoop(客户端)配置文件的目录。这些配置文件用于写HDFS和连接YARN ResourceManager。这个目录下包含的配置文件被分发给YARN集群,这样application所使用的容器都使用同样的配置。如果配置引用了Java系统属性或非YARN管理的环境变量,他们也应该被配置在Spark application的配置中(driver,executors,client 模式下的AM)。

有两种YARN上的部署模式。cluster模式,Spark driver运行在由YARN管理的一个application master进程内。启动这个application后,客户端可以撤离。client模式,driver运行在客户端进程内,application master仅被用于向YARN请求资源。

和Spark standalone,Mesos不同,部署在YARN上,master的地址由--master指定,ResourceManager的地址从Hadoop配置中获取。这样--master配置为”yarn”。

以cluster模式在YARN上运行application

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] [app options]

例如:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \\

--master yarn \\

--deploy-mode cluster \\

--driver-memory 4g \\

--executor-memory 2g \\

--executor-cores 1 \\

--queue thequeue \\

lib/spark-examples*.jar \\

10

上面启动一个 YARN 客户端程序,这个客户端程序启动了默认的 Application Master。SparkPi将作为Application Master的子线程运行。客户端定期地轮询Application Master获得状态的更新并将在控制台展示出来。一旦Application 运行完成客户端就退出。

添加其他的jar

在cluster模式下,driver运行在与客户端不同的机器上,因此SparkContext.addJar不能使用位于客户端本地的文件。为了使SparkContext.addJar可以利用客户端本地文件,通过--jars指定这些文件(jar)。

$ ./bin/spark-submit --class my.main.Class \\

--master yarn \\

--deploy-mode cluster \\

--jars my-other-jar.jar,my-other-other-jar.jar \\

my-main-jar.jar \\

app_arg1 app_arg2

准备工作

在YARN上运行Spark需要YARN 支持的binary distribution of Spark(Spark的二进制分布式文件),可以下载他:

http://spark.apache.org/downloads.html

为了能够在YARN上使用Spark运行时jar(Spark runtime jars),需要设置spark.yarn.archive 或spark.yarn.jars,如果没有指定其中之一,那么Spark会将$SPARK_HOME/jars下的文件打包为zip文件并上传到分布式缓存。

为了省去这个过程,将jar包放到HDFS上,并在程序中设置这个参数:

sparkConf.set("spark.yarn.jar", "hdfs://192.168.106.210:8020/sparkjar/spark-assembly-1.6.0-hadoop2.6.0.jar");

调试应用

在YARN术语中,executors和application masters运行在“containers”中。YARN有两种处理应用完成后的container日志的模式。如果启用日志聚合(配置yarn.log-aggregation-enable),container日志会被拷贝到HDFS并将本机日志删除。在集群的任何一台机器上都可以使用yarn logs这个命令查看日志。

yarn logs -applicationId

打印指定application的全部容器的所有日志的全部内容。也可以使用HDFS Shell或API查看日志。通过查看yarn.nodemanager.remote-app-log-dir 和yarn.nodemanager.remote-app-log-dir-suffix可以找到日志文件的目录。也可以在Spark Web UI中找到这些日志,前提是Spark history server和MapReduce history server必须运行,并且配置了yarn-site.xml的yarn.log.server.url属性。Spark history server的日志URL会重定向到MapReduce history server以展示聚合信息。

日志聚合没有开启的话,每台机器生成的日志仍然保存在本地,存储路径通过YARN_APP_LOGS_DIR配置指定,常被指定为/tmp/logs 或$HADOOP_HOME/logs/userlogs,这取决于Hadoop版本和具体的安装情况。这时查看container日志要到包含那个container的主机去查。

要想回顾每个container的运行环境,增大yarn.nodemanager.delete.debug-delay-sec的值(如36000),并且通过设置yarn.nodemanager.local-dirs来使用每个在container启动的节点上

的application缓存。这个目录包含了启动脚本,jar和所有用来启动container的环境变量。这个过程有助于调试路径问题。注意,启用此功能需要集群设置的管理员权限并且还要重启所有node manager,因此这不适用于托管集群。

application master或executor要使用自定义的log4j配置,一些选项如下:

1)使用spark-submit命令,将log4j.properties文件路径加入--files文件列表,实现上传log4j.properties。

2)设置spark.driver.extraJavaOptions(对于驱动)或spark.executor.extraJavaOptions(对于executors)为

-Dlog4j.configuration=<location>。如果使用文件,应明确指出file:协议并且这个文件在每个节点都应存在。/<location>

3)更新$SPARK_CONF_DIR/log4j.properties并且这个配置文件会和其他配置一起上传到集群。如果指定了多个选项,那么上两个选项的优先权比此项高。

注意,对于第一个选项,所有的driver和executor将共享相同的log4j配置文件,如果这些driver和executor都运行在一个节点上就会有问题(日志都写到一个文件里)。

如果你需要一个引用,指向YARN上的正确位置来放置日志文件,以便YARN能够正确地展示和聚合他们,那么在log4j.properties中配置spark.yarn.app.container.log.dir。例如log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log。对于streaming application,配置RollingFileAppender并且将文件路径设置为YARN的日志目录,这样可以避免大日志文件引起磁盘溢出,并且YARN日志工具可以访问这些日志文件。

为了使用application master和executor的自定义度量属性,更新$SPARK_CONF_DIR/metrics.properties文件。这个文件连同其他配置一起被上传,这样你就不用使用--files来指定他。

Spark Properties(Spark属性)

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

大数据计算框架Spark之使用Hadoop YARN

注释

1)调度描述中的核心请求是否会执行取决于使用的调度器(scheduler )和如何配置他。

2)在cluster模式在,Spark executors和Spark driver使用的本地目录

是YARN上配置的本地目录(Hadoop YARN 的配yarn.nodemanager.local-dirs)

如果用户指定了spark.local.dir,他会被忽略。在client模式中,Spark executors会使用YARN上配置的本地目录,然而Spark driver会使用spark.local.dir。这是因为client模式下Spark driver不会在YARN集群上运行,只有Spark executors在YARN集群上运行。

3)--files和--archives选项支持以#指定的文件名,这与hadoop类似。例如可以指定:--files localtest.txt#appSees.txt,会将本地名为localtest.txt的文件上传到HDFS上,但会使用appSees.txt来连接,当应用运行在YARN上时,要使用appSees.txt来引用这个文件。

4)如果模式为cluster,--jars选项使用本地文件,那么他使SparkContext.addJar这个方法起作用。如果使用HDFS, HTTP, HTTPS, 或FTP,那么不必使用--jars。

在一个安全的集群上运行

在安全的Hadoop集群上使用Kerberos来认证与服务端和客户端相关联的主体。这样客户端就能向已认证的服务端发起请求。服务端为已认证的主体赋予权限。

Hadoop服务发行hadoop tokens来授予访问服务和获得数据的权限。客户端必须先获得与要访问服务相关的tokens,然后连同application一起发生给YARN集群。

Spark application与Hadoop文件系统(HDFS,webhdfs),HBase,Hive等交互,他必须获得相关的tokens,这个tokens使用提交application的用户的Kerberos凭据,这样这个身份的主体就成为了提交Spark application的主体。

一般在提交应用的时候,Spark自动从集群的默认hadoop文件系统获得一个token,有可能是HBase或Hive。

如果HBase在classpath中,一个HBase token将被获得。HBase配置声明了application是安全的(例如hbase-site.xml设置hbase.security.authentication为kerberos),并且spark.yarn.security.credentials.hbase.enabled设置为true。

相似地,如果Hive在classpath中,Hive token将被获得。他的配置包含了存储元数据的URI(hive.metastore.uris),并且spark.yarn.security.credentials.hive.enabled设置为true。

如果application需要和其他的安全的hadoop文件系统交互,那么要使用这些token访问集群,就必须在调用时明确地指出,可通过spark.yarn.access.hadoopFileSystems这个属性列出来集群

spark.yarn.access.hadoopFileSystems hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/

Spark支持通过Java Services机制与安全感知系统交互(参见java.util.ServiceLoader)。为了实现这一功能,org.apache.spark.deploy.yarn.security.ServiceCredentialProvider的实现可用于Spark,通过在jar的META-INF/services路径中的相应的文件列出他们的名字,可以配置spark.yarn.security.credentials.{service}.enabled为false来禁用这个插件,{service}是凭据提供者的名字。

配置外部的Shuffle服务

在每个NodeManager上启动Spark Shuffle Service,请遵照以下指令:

1)构建Spark,如果使用构建好的,跳过此步骤。

2)找到spark-<version>-yarn-shuffle.jar。如果自己构建Spark,那么应该放在$SPARK_HOME/common/network-yarn/target/scala-<version>目录下,如果使用分发版本,应该放在YARN下。/<version>/<version>

3)把这个jar放到集群的所有的NodeManager下。

4)在每个节点的yarn-site.xml文件中,添加spark_shuffle到yarn.nodemanager.aux-services,设置yarn.nodemanager.aux-services.spark_shuffle.class为org.apache.spark.network.yarn.YarnShuffleService

5)在etc/hadoop/yarn-env.sh中,通过设置YARN_HEAPSIZE 增大NodeManager的堆内存,可避免Shuffle过程的垃圾收集问题。

6)重启所有NodeManager。

shuffle service运行在YARN上时,额外的参数是:

spark.yarn.shuffle.stopOnFailure默认是false,这个参数的含义是当Spark Shuffle Service初始化失败时,是否停止NodeManager。当Spark Shuffle Service没有在NodeManager上运行,而NodeManager的container正在运转,这样的错误可以通过设置上面的参数来阻止。

Kerberos排错

记录针对Kerberos操作的额外日志,设置HADOOP_JAAS_DEBUG这个环境变量

bash export HADOOP_JAAS_DEBUG=true

通过系统属性sun.security.krb5.debug和sun.security.spnego.debug=true来配置JDK类,即可开启针对Kerberos和SPNEGO/REST认证的日志。

-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

在Application Master中,可以启用上面的所有选项

spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

如果日志级别org.apache.spark.deploy.yarn.Client被设置为DEBUG,日志会包含获得的所有token,以及这些token到期的详细信息。

使用Spark History Server

应用端设置Spark的配置为spark.yarn.historyServer.allowTracking=true

在Spark History Server,添加org.apache.spark.deploy.yarn.YarnProxyRedirectFilter到spark.ui.filters配置列表。

实践

在集群中选择一台服务器,输入命令:spark-submit 回车,得到如下命令模式及参数注释:

大数据计算框架Spark之使用Hadoop YARN

注意:

1)在YARN上运行,那么master一定是 yarn。

2)deploy-mode值有两个,一个是cluster,一个是client。在程序中无法指定deploy-mode,只能由spark-submit命令指定。如果在命令中不指定deploy-mode参数,那么运行方式为client(即默认)。

3)可以通过命令设定master的值,也可以在程序中设置。

4)指定运行应用的用户账号。运行spark-submit命令默认以root身份执行命令(即登录Linux服务器的身份),但是Spark访问HDFS会遇到权限问题:

18/10/25 08:57:26 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="/home":hdfs:supergroup:d-wx------
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:316)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:243)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1846)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1830)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkOwner(FSDirectory.java:1775)
at org.apache.hadoop.hdfs.server.namenode.FSDirAttrOp.setPermission(FSDirAttrOp.java:64)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.setPermission(FSNamesystem.java:1886)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.setPermission(NameNodeRpcServer.java:849)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.setPermission(ClientNamenodeProtocolServerSideTranslatorPB.java:509)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)

这一句:

yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="/home":hdfs:supergroup:d-wx------

充分显示了问题,即以root用户访问/home文件,然而其属于hdfs这个用户,那么要以hdfs用户身份执行spark-submit命令,使用:

sudo -u hdfs spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] [app options]

有博文说添加:System.setProperty("HADOOP_USER_NAME", "hdfs");然而只能解决以client模式运行的应用,对于cluster模式这种方式不起作用,所以还是指定执行脚本的用户这种方式最可靠。

例:不指定master和deploy-mode

spark-submit --class net.cnki.xtsj.a_statistical_information.WordRetrievedStatistics \\

/home/a-statistical-information-0.0.1-SNAPSHOT-jar-with-dependencies.jar

在程序中指定master为yarn:

SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("write words to mongodb");


分享到:


相關文章: