搜狗实战案例:基于Alluxio优化Spark Shuffle性能

应用背景

随着集群业务的不断扩展和新技术的涌现,现在很多业务由采用传统的MapReduce框架转型使用性能更有优势的Spark计算引擎。在实际的大规模使用过程中,我们和很多Spark用户一样,发现Spark在Hadoop集群上的稳定性逊色于MapReduce。事实上,对于很多线上生产任务,稳定性的要求一定程度上是优先于性能的,这限制了Spark在更大范围的业务线上的推广落地。

在实际使用过程中,我们的大数据集群基本采用多组件混部方式。Spark一般与Yarn一起部署,通过设置不可抢占队列以及世袭血缘关系,Spark作业运行的稳定性也逐步提高。此外,Spark在Spark 2.2版本之后引入的BlackList机制进一步提升了系统稳定性。然而,虽然上述工作使得Spark的基本稳定性有所提高,但是系统依然存在一个问题严重影响任务成功的与外部环境相关性较大的因素:shuffle fetchfailure。这个问题出现主要有以下原因:

  1. NodeManager OOM
  2. 机器宕机
  3. 磁盘故障

虽然,通过Spark的RDD世袭血缘关系,我们能通过重算再次获取数据。但是,当重算失败次数过多时,整个任务还是会执行失败。并且,如果整个世袭关系冗长(如下图1所示),那么重算代价太大,在重算过程中也有极大可能导致整个作业运行失败。为了解决这个问题,我们引入了Alluxio分布式存储以提升Spark Shuffle的稳定性。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图1. 长世袭关系

系统架构设计

Spark Shuffle 在Map之后将数据写入节点机器本地磁盘,在Reduce阶段获取 shuffle数据时,都是到对应节点读取数据。这造成了shuffle数据存储存在单点故障的风险。为了解决这个问题,我们考虑使用分布式存储系统来替代shuffle数据的本地存储方案。我们首先评估了 HDFS,HDFS 的特点是稳定可靠的存储,但是由于采用磁盘IO,因此性能比较慢并且在大规模场景下不稳定。我们期望采用提供内存级访问速度的文件系统缓存shuffle数据,经过多方面考量,我们选择了Alluxio,如图2所示。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图2 Alluxio作为中间层存储Shuffle数据

引入Alluxio之后的系统部署总体架构如下图3所示,计算和存储在同一集群上采用Spark on Yarn的部署方式,Alluxio的master和worker也部署在相同的集群。Spark Executor通过Alluxio client与Alluxio master以及Alluxio worker进行通信和数据传输。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图3 系统系统部署架构图

系统实现

Spark 2.0以后的shuffle机制摈弃了hash shuffle。现有的shuffle writer将shuffle数据partition成n个分区文件,然后将这n个分区文件整合成一个shuffledataFile,并且生成相应的indexFile来标记每个分区数据的位置信息。在引入Alluxio之后,shuffle数据读写的流程如图4所示。MapTask将dataFile和indexFile写入本地的同时也写入 Alluxio。这意味着在 Alluxio 中存储了shuffle数据的一个副本,当ReduceTask从MapTask获取数据失败时,可以从Alluxio中获取相应的dataFile和indexFile,从而避免出现shuffle fetch failure问题。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图4 引入Alluxio之后的Shuffle数据读写流程

​由于Alluxio 1.x社区版暂时没有写多备份(Alluxio2.0已支持),所以 Alluxio Worker 崩溃也会导致获取数据失败(目前概率较小)。在这种情况下,我们目前会根据用户对于任务的优先级,来使用 Shuffle on Alluxio:

  • HIGH
  • Local + Alluxio (CACHETHROUGH: HDFS,ASYNCTHROUGH) 3 副本
  • Middle
  • Local + Alluxio (MUST_CACHE) 2 副本
  • Low
  • Local 1 副本

在最近的工作中,我们已经将Local层去掉,因为同时要写Local和Alluxio对作业运行的整体效率有一定的影响,改进后的架构如下:

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

MapTask 将 dataFile 和 indexFile 写入 Alluxio,显著加快了Spark Shuffle的IO速度。同时,Alluxio的多备份功能,保证了shuffle数据的安全性,从而避免出现shuffle fetch failure。同时因为内存容量受限,我们关闭了Alluxio的自动刷写功能,由一个服务来控制Alluxio数据的生命周期,即:当内存满时,不刷写SSD,而是直接写SSD,并且当任务运行结束,自动删除所有的中间数据。

以下是我们环境中Alluxio的一些重要的配置说明:

alluxio.worker.tieredstore.level0.watermark.high.ratio=2

此参数设置的值要大于1,避免数据的层次化刷写(需要修改源码中必须小于1的限制)

alluxio.worker.tieredstore.level0.dirs.path=mem,ssd
alluxio.worker.allocator.class=alluxio.worker.block.allocator.GreedyAllocator

存储级别只设置一级,从上往下依次是MEM和SSD,同时为了让数据优先写内存,需要设置存储模式选择为贪婪模式

alluxio.worker.file.buffer.size=100MB

由于Alluxio在worker上为block选择存储介质的规则是按照存储介质是否大于buffer进行选择,有的worker存储介质的剩余空间大于buffer,但小于block的大小,会导致block写失败,所以worker端buffer的值必须大于单个block的大小。

性能对比分析

下面我们进行原生的Spark Shuffle与SparkShuffle on Alluxio的性能对比分析。如下图5所示,以WordCount为例,在reduce阶段出现NodeManager Crash错误时,原生的Spark会报 shuffle fetch failure从而导致出现重算。即使重算成功,也会严重影响任务的执行效率。而采用Shuffle on Alluxio,ReduceTask可以直接从Alluxio获取所需shuffle数据,无需重算,从而保证任务能够快速正常运行成功。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图5 两种方案在workcount用例中的性能对比分析

图6是基于TPC-H benchmark(scaleFactor = 2000)对Shuffle on Alluxio的性能对比部分结果图,其中shuffle数据直接写入Alluxio。整体上,使用Alluxio性能能够得到了比较大的提升。对于shuffle 数据量较大的 Query-23,Query-24等,使用 Shuffle on Alluxio的性能提升非常明显。

搜狗实战案例:基于Alluxio优化Spark Shuffle性能

图 6 Shuffle on Alluxio在TPC-H benchmark (scale

Alluxio调优技巧

Alluxio官方文档给出了很多参数调优建议(https://docs.alluxio.io/os/user/stable/en/advanced/Performance-Tuning.html),这里选出了几条与本场景相关的调优技巧进行介绍。

  • Master与Client之间的连接池大小
  • Application shuffle过程中的每个executor都相当于一个AlluxioClient,当并发过大时,需要调大Master的thread pool(alluxio.master.worker.threads.max)来处理Client的请求,并且需要相应调大机器的文件描述符限制。同时,减少alluxio.user.file.master.client.threads和alluxio.user.file.master.client.threads的值。
  • Worker并发度
  • 调整alluxio.worker.block.threads.max,使Worker能够处理更多RPC请求
  • Netty超时值
  • Client与Alluxio Worker之间出现timeout时,调整alluxio.user.network.netty.timeout。
  • Journal日志存储
  • Alluxio Master采用HA模式,Journal日志存于HDFS中,当Leading Master崩溃时,而Backup Master启动较慢时,可以通过减小alluxio.master.journal.checkpoint.period.entries的值,更频繁地做Journalcheckpoints从而加速Alluxio startup。

总 结

本文基于Alluxio优化了Spark Shuffle的性能,解决了Spark运行过程中Shuffle的稳定性问题。进一步地从性能和稳定性方面综合考虑,本文提出了分优先级使用Alluxio的策略,这使得我们总体Spark的稳定性得到了很大的提升。随着Alluxio自身的不断完善与改进,我们期待通过使用不同场景下的策略变换在保证Spark稳定性的同时,探索进一步提升Shuffle性能的可能。


分享到:


相關文章: