学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示:
之前写过一些flink应用,keyed stream常用但不是必须用的,所以产生了疑问:
为何只有keyed stream的元素能读写状态?每个key对应的状态是如何操作的?Flink的"状态"
先去回顾Flink"状态"的知识点;官方文档说就两种状态:keyed state和operator state:如上图,keyed stream的元素是具有key的特征,与ProcessFunction的操作状态时要求匹配,其他steam的元素由于没有key的特征,所以也就没有"状态"一说了;另一种状态是Operator State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction是用来处理stream元素的,不会涉及到Operator State:
官方demo
为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html ,简单说说这个demo的功能:
数据源在不间断的产生单词,每个单词对应一个Tuple2<string>的实例;/<string>数据源被keyBy方法转成KeyedStream,key是Tuple2实例的f0字段;一个KeyedProcessFunction的子类CountWithTimeoutFunction,被用来处理KeyedStream的每个元素,处理的逻辑:为每个key维护一个状态,状态的内容是这个key的出现次数和最后一次出现时间;如果那个key连续一分钟没有出现,KeyedProcessFunction就向下游发送这个元素;以上就是官方demo的功能,本来是想通过demo来加深认识,结果看完不但没有明白,反而更晕了,下图是我对demo代码的疑惑:
从上图可见我的疑惑,这里再复述一下:
入参value是Tuple2类型,假设其f0字段等于aaa,那么processElement方法的作用,就是取出aaa的状态,更新后保存;从代码上看,state.value()返回了aaa的状态,这个value方法并没有将aaa作为入参,那怎么做到返回aaa的状态呢?如果下一个入参value的f0字段等于bbb了,这个state.value()能返回bbb的状态吗?对更新状态的代码state.update(current)也是同样的疑惑;然后又产生了新的疑惑:成员变量state难道是一直在变?每执行一次processElement,都会变成该key对应的state实例?先反思为何会有上述疑惑
上述疑惑产生的原因,应该是受到平时使用HashMap的影响,HashMap获取值就是在调用get方法时指定key,设置值也是在put时指定key,所以看到state.value()方法没有用key做入参就不习惯了要消除这种不适应,要做的第一件事就是提醒自己:processElement是在框架内运行的,很多数据在之前已经由框架准备好了;接下来要做的,就是把框架准备数据跟踪源码
如下图,让我们从一个断点的堆栈开始吧,这是在执行上面demo中的processElement方法之前的一个断点,可见根源是个线程的run方法,也就是KeyedProcessFunction对应的算子执行任务的线程:得益于Flink代码自身规范、清晰的设计和实现,再加上IDEA强大的debug功能,整个阅读和分析过程十分顺利,这其中的收获会逐渐在今后深入学习DataStreamAPI的过程中见效;
最后,根据上面的分析过程绘制了一幅简陋的流程图,希望能帮助您加快理解: