Flink中parallelism并行度和slot槽位的理解

Flink应用程序在一个分布式环境中并行执行。当一个数据流程序提交到作业管理器(JobManager)执行时,系统将会创建一个数据流图,然后准备执行需要的操作符。每一个操作符将会并行化到一个或者多个任务中去。每个算子的并行任务都会处理这个算子的输入流中的一份子集。一个算子并行任务的个数叫做算子的并行度。它决定了算子执行的并行化程度,以及这个算子能处理多少数据量。


算子的并行度可以在执行环境这个层级来控制,也可以针对每个不同的算子设置不同的并行度。默认情况下,应用程序中所有算子的并行度都将设置为执行环境的并行度。执行环境的并行度(也就是所有算子的默认并行度)将在程序开始运行时自动初始化。如果应用程序在本地执行环境中运行,并行度将被设置为CPU的核数。当我们把应用程序提交到一个处于运行中的Flink集群时,执行环境的并行度将被设置为集群默认的并行度,除非我们在客户端提交应用程序时显式的设置好并行度。


parallelism指的是并行度的意思。在 Flink 框架中代表每个任务的并行度,适当的提高并行度可以大大提高 job 的执行效率。

slot指的是任务槽位的意思,flink中任务的并行性由每个 Task Manager 上可用的 slot 决定。

并行度是动态概念,任务槽数量是静态概念。并行度<=任务槽数量。一个任务槽最多运行一个并行度。


一、如何设置flink job的parallelism

(1)在flink的配置文件中flink-conf.yaml,默认的并行度为1;

parallelism.default: 1


(2)在通过shell方式提交flink job的时候,可以使用-p指定程序的并行度;

./bin/flink run -p 10 ../word-count.jar


(3)在flink job程序内设置并行度;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(10);


(4)每个算子指定并行度;

Flink中parallelism并行度和slot槽位的理解

data.keyBy(new xxxKey())

.flatMap(new xxxFlatMapFunction()).setParallelism(2)

.map(new xxxMapFunction).setParallelism(2)

.addSink(new xxxSink()).setParallelism(1)

上面每个算子设置的并行度优先级要高于前面 env设置的并行度,然后才是配置文件中默认并行度。


二、如何理解flink中的slot?

slot 是指 taskmanager 的并发执行能力。

flink-conf.yaml中默认taskmanager.numberOfTaskSlots=1


下面,我们设置taskmanager.numberOfTaskSlots=3;即每一个 taskmanager 中分配 3 个 TaskSlot, 3 个 taskmanager 一共有 9 个 TaskSlot。

Flink中parallelism并行度和slot槽位的理解


Flink中parallelism并行度和slot槽位的理解


三、调整parallelism并行度设置

parallelism 是指 taskmanager 实际使用的并发能力。

parallelism.default=1;即运行程序默认的并行度为 1,9 个 TaskSlot 只用了 1 个,有 8 个空闲。设置合适的并行度才能提高效率。

Flink中parallelism并行度和slot槽位的理解

parallelism 是可配置;下面我们调整parallelism 并行度。

图中 example2 每个算子设置的并行度是 2, example3 每个算子设置的并行度是 9,所有slot槽位都占用了。

Flink中parallelism并行度和slot槽位的理解

example4 除了 sink 是设置的并行度为 1,其他算子设置的并行度都是 9。数据从其他TaskManagers上的所有槽位流到这个Sink接收器上。

Flink中parallelism并行度和slot槽位的理解


Flink中parallelism并行度和slot槽位的理解


分享到:


相關文章: