网易云 戴丹:《实时流计算在业务中的应用》

网易云 戴丹:《实时流计算在业务中的应用》


9月15日FMI-2018人工智能与大数据高峰论坛圆满落幕,网易云戴丹老师就实时流计算在业务中的应用进行了深入的分享。

飞马网将其内容整理如下:

各位早上好,我在这边跟大家分享一些流式计算技术相关的内容。首先介绍一下网易对大数据这块数据平台基本的情况,首先是数据业务架构,网易经过近20多年的发展,已经沉淀了非常丰富的业务数据,本身的业务当前在跑的可能有一两百个,整个数据业务架构有三层结构,最底下是业务数据的采集,包括日志数据,数据库数据,这部分数据可以时时导入,离线导入形成一个离线数据仓,现在也有时时数仓,第二层是数据平台,相当于管理所有业务数据,在网易有两个平台,一个是网易猛犸,是开放给大家使用的运营开发平台,还有网易有数,是可视化的,这是偏业务层的跟技术稍微远一点。数据采集有存储计算,还有离线计算,时时计算,最上面是数据应用这一层,对应到非常具体的业务,网易新闻,云音乐,考拉,云课堂之类,以新闻为例,在实时计算这一块,他在新闻推荐这一块用的比较多一点,比如用网易新闻客户端,每天给你推送一些消息,这个也是有实时计算的应用在里面。每个人的兴趣点所处的地理位置,年龄段性别都不一样,会做一个兴趣的推荐,为什么还需要实时计算,因为这个推荐的效果跟时间和本人的状态非常相关,跟当前的热点也有关系,实时计算是用来真实反馈推荐效果的。比如在网易考拉做大促,运营的一些活动促销做推送的时候也是用到了实时计算,比如考拉618,当前上午给你推送了一波消息,这波消息的推送是通过实时计算它的转化率,计算推荐效果是否OK,运营再根据这个结果做策略的调整。

接下来是纯技术相关的,这是网易大数据的架构,这个各大公司都差不多,这是数据层和管理层具体的展示,底下数据源这一层还是结构化,半结构化,非结构化的数据,数据集成,在网易分为两类,一类是全量的非实时的,还有增量实时的,采集系统和数据库增量系统,还有存储,实时计算有离线计算和在线计算并列的。再往上是平台层,是给业务开发的,旁边还有一些辅助的功能,作业流和相关的特性。

下面是今天我要跟大家分享的重点,网易流式计算的平台,我们这个流式计算平台叫Sloth,项目立项在2017年初,2016年有一部电影《疯狂动物城》,里面有一只树懒叫闪电,英文名是Sloth,用这个树懒类比,是非常便捷非常快的,可能动作很慢,达到的效果是可以做很多的工作,但是表达能力非常强,可以把实时计算的场景跑起来,就像疯狂动物城里的情景,树懒跑的很快。还有网易梦犸在《冰河世纪》里跟树懒是三剑客,所以这是简单高效的平台。

下面分两部分介绍一下流式计算的经验,一部分是产品特性,第二部分是具体关键实现的细节。首先是Sloth的特点,首先用SQL开发的流计算任务,比较重要的点是完全兼容离线SQL,因为流式SQL是去年火起来的,我们做这个是去年年初做成第一个版本,流和离线的表达能力比较类似,原来用Having解决大数据计算问题的人,继续用写SQL的方式做,但是从原来天级别的计算时效性提升到秒级。

第二个特点,支持UDF,Having、Join、subquery,兼容MySQL,同时也兼容UDF,用户可以自己提交UDF,实现个性化跟业务场景需求比较相关的自己实现的函数,写完了之后可以在SQL里直接使用。

第三,支持DDL、SQL级联和维表关联,DDL本身并不参与计算逻辑定义,但是他是计算输入输出定义的关键因素,大家知道不管是离线计算还是实时计算,计算三要素无非是输入输出,DDL是定义输出输入的,适配各种样的存储邮件,输入主要是像MQ之类的数据队列,输出比较丰富多样,这个业务场景可能是Kbuka、数据库之类的。SQL级别,我们在项目之初,绝大部分项目没有SQL的支持,用户相当于脱离不了计算引擎的东西,肯定要写一些代码,SQL的支持是为了让业务方写纯SQL的方式解决流式计算的问题,他不需要了解这个代码怎么写。还有一个是维表关联,这是业务里用的比较多的,相当于是一个流和表结合计算的场景,可能大家业务中涉及的比较多的是用流式计算,结合静态的或者变化不频繁的K1的配置查询,或者是字段补全,用ID查用户名之类的,这也是流式计算生态里比较关键的点。

第四,扩展了Flink做计算引擎,Flink已经是不错的了,而且状态越来越好,但是在项目之初包括现在还是有很多不足,我们的实时计算平台而言,我们对Flink做了很多改进,比如支持数据落期,支持数据撤销,还有Join的性能优化,做了很多工作,为此才足以让流式计算平台跑起来。

第五点,我们用了一个技术方案实现SQL到Flink的转化,因为流式计算本身是一个实时性要求比较高的场景,对运行式程序效率要求比较高一点,用这个技术相当于可以大幅度的提高运行效率,相关技术在后面的PPT展示。倒数第二点是增量计算模型,从我去年做分享的反馈来看,了解和理解的人并不是特别多,今年可能会好一点,等一下会做一个解释,为什么需要有增量计算,为什么有撤销,大家在什么情况下需要注意撤销这件事。

最后一个是流式计算平台本身的数据特性的保证,保证了exactlyonce和at least Once,恰好一次处理和至少一次处理,这是得益于Flink框架的能力才做到这个事情,所有的这些特点构成了Sloth这个平台所必须的能力和先进的点。

给大家看一下内部流式计算平台的产品,这个上线一年多了,产品形态比较简单,左边这个地方是三个栏目,任务开发、任务运维、邮件管理。任务开发相当于给业务方的产品写SQL的地方,SQL是S-SQL,允许用户创建UDF,是一个函数名,有一个类名,在UDF里自由发挥,包括了检查和交付式的调试,这个截图是真实的线上应用,是一个广告的业务,有十来个任务,任务开发完了之后开始上线运营,这里主要表达的是流式计算SQL可以解决80%左右的场景,但是用SQL的好处是非常快,可能算个点击率转化率,用SQL写,从开发到上线半天时间就够了,用平台不用运维,而且还有一个实时监控,这是平台里面集成了单个任务计算的延迟,因为本身对延迟是非常敏感的,所以首先用户要能够看到延迟有多少,如果延迟比较高,还是要监控告警之类的。

下面给大家讲一个具体的流式Sloth的应用,这是广告业务的实时报表,广告业务在各公司都是比较重要的,因为它直接相关营收,这里举的例子是报表,这是流式Sloth最典型的应用,最成熟,没有任何问题。广告主通过实时报表来观测广告流量的强占情况,根据这个广告的PV数和广告投放的好坏,根据投放之后实时的点击率等等,广告主会看广告的点击情况进行投放策略的变更,如果转化率比较好,他会继续加大投入,如果这个转化率很低的话,他会及时撤下来,减少没必要的投入,整个场景很明确,我们流式怎么来表达,刚才我提到,计算的话三个点,输入、计算、输出,计算的输入和输出先通过TCR表示,这个里面有两个数据源,一个是广告的点击数据,第二是广告PV,假设Kafka是原始数据类型,我们并不能直接从里面提取出A字段B字段C字段,也不需要填写数据源相关的属性,比如这个Topic属于哪一个Topic,在哪一个broker上面,消费的ID是什么,看哪个数据,本身这个数据是实时更新的,比如现在是11点多,Kafka的数据是11点,并且不断追加的。

第二部分,定义输出,就看业务方需要计算哪些东西,在这个场景里假设用户,广告主维度的,包括数据时间,比如十点到十一点,我投的一个广告,他的转化率怎么样,这里有一个广告主ID点击数,PV、收入,这个输出表也有相关的属性,输入是Kafka,输出是数据库,必须支持输出存储,他还支持临时表,临时表没有物理的影射,就是一个动态表的概念,所以它的的类型是type,因为我们的输出场景会计算很多个,广告主维度,创意维度,计划维度非常多,一个实时流可以用多次计算,这个实时流可以被用很多次,你的Sql会很复杂,因为源头计算四个输出,源头要重复计算四次,有了这个Type表之后,相当于只需要把宽表输出一次就可以了,想算多少维度就算多少。

第三部分,计算,这里面截了一部分,这个SQL本身也很简单,首先需要把点击数据和PV数据汇合到同一个流里,这里的关键字是SUM,点击数据和PV数据刚才定义的是原始数据,需要解析,平台不知道你的数据长什么样子,这个时候通过用户写自定义函数的方式把数据解析出来,点击数据解析是从原始数据写一个Click,这是一个函数,用Java实现,来写这个,点击ID,这是时间,这是事件的小时,广告ID,创业ID之类的,点击收录一些字段。然后PV数据也是一样的,只不过中间有一些字段是为零的,因为PV和Click要统一的,没有的就为零。

输入的两个流,作为一个预计算,点击PV收入相加,相当于得到最细力度空间层的宽表,放在这个临时表里面,下一步是根据中间管表计算具体的报表,在右边小的SQL里是从广告维度的实时报表,他是一张输出表,这个对应的是数据库,有时间、广告主、日志日期,相当于每一个小时做一个实时的聚合,聚合的内容也是点击PV的融合,这样相当于SQL跑起来之后,广告主维度对应的物理存储,比如MySQL的物理表不断更新,每秒更新一次,当前时间的转化率可以实时的看到,如果作为一个页面呈现,这个页面是实时动态的。

这个场景计算这部分已经讲完了,大家看到的直观感受是什么,其实中间没有提到实时计算相关的字眼,这就是SQL带来的好处,用了SQL大家可以用解决离线计算的思维解决实时计算的问题,因为SQL是一样的,无非是数据的输入是持续输入的,可以把数据流理解成动态的表,输出也理解成动态的表,整个实时计算的SQL是很顺畅的。

这是总结的第二个为什么用SQL的好处,开发很便捷,完整的场景有四个维度的实时报表,一共包含两个UDF,7DDL,5DML,一个DML是算中间宽表,4个DML是四个实时报表维度,300行的SQL,这个开发效率比直接用框架提升了四倍以上。

特别是在网易的业务场景下,这个优势更明显,因为网易至少有一百多个业务,直接拿优势平台就可以了,不需要每个业务团队重新做框架的运维业务的开发。

首先分享一下流式计算平台实现的原理,Sloth是一个SQLFlink的东西,最核心在于SQL怎么变成Flink代码,Flink的开源项目里早就有了,2017年1.1版本就开始有了,现在相对比较成熟了。Sloth原理跟Flink是类似的。整个过程是这样的,SQL到Flink的代码分为两个阶段,第一个阶段是SQL到执行计划,第二,执行计划到真实的Flink代码,这张图是SQL到执行计划的阶段,Sloth,SQL是用开源的Parse做的,我们支持了表定义,DML是原先要支持的,SQL解析分成这几个步骤,第一,把所有的DDL、DML解析成SQL,相当于SQL到内部数据结构,第二个阶段,根据信息和语法本身做校验。第三,语法到语法数的转化,生成的是逻辑执行计划数,这是单句SQL生成的,有多少DML就有多少狼藉执行计划数,一个业务的任务可能包含非常多的DML,操作非常多实时流,输出到非常多的实时流,他有很多颗。

第三,做Optimise,这是比较难的一点,最后一部是做数的合并和翻译,根据表的原信息,DML的解析结果,根据你的输入输出表,临时表,把多棵的执行计划树串联成一个完整的很大的执行计划树,整个是有效环图,我们业务里没有知识环,但实际上这个环也可以。

上一步得到了逻辑,执行计划树,下一步是做一个Flink代码的翻译,我们翻译到了Flink的邮箱,这一块是Flink最稳定的特性,举个简单的例子,怎么做Select,从这张表里查询一千个记录,这里是一棵逻辑执行计划树,最下面是一个扫描,然后是filter,过滤,再往上是做投影,把你要算的东西投影出来,最后一部分是做聚合,在这里面做一个创新,这加一的操作。

CodeGen怎么解决计划到代码生成的问题,早些用的是火山模型,在左边这块,相当于这个图里每一层执行计划的每一个节点就是一个函数,多层之间是多层的函数调用,这是过滤的节点翻译出来的代码,它是不断的去判断这个Next是不是需要到上一个节点,这火山模型,代码比较多,函数调用比较多,直观上讲,函数调用多现在开销大一点,但是实测下来也还好,我们还有更好的选择,Whole-stage CodeGen这是在现在的Flink里都有的技术,过不错,而且生成的代码跟人写的差不多,一个计划只有一个函数,它的逻辑很直观,是这个表里所有的数据,第二层做一个过滤,第三步是聚合,所有满足的条件都加一,算完了之后直接返回,这就是Whole-stageCodeGen,从执行计划书的森林到完整的代码有很多工作要做,比如这个中间表怎么连接,这个表有多个输入,也可能有多个输出,怎么翻译代码,怎么做新的优化,怎么把数据实践,任务相关的特性,SQL以外的东西也要融合到里面。

下面讲一下UDF,UDF的语法跟Hive差不多,兼容Hive的UDF,这个里面语法上是兼容的,但是Hive代码不能那来直接用,UDTF可以拿来用,UDF不行,在聚合函数里,我们在Hive原来的基础上撤销。再跟大家介绍一下什么是增量计算,或者什么是流式计算的数据撤销,这是一个概念性的东西,相信大家一定能听懂,我举了一个实际场景的例子,如果在流式计算里不考虑撤销,你有可能得到的结果是错的。

我们需要做的计算是这样的,电商平台要对所有商家的销售额度分类统计,销售额0-100之间归为一类,100-200之间归为一类,这是一个实时计算,这个结果不断动态更新,这个场景用SQL很容易表达,我们要处理的数据是商家的ID和每笔订单的金额,然后整个计算用SQL来表达,两次查询,第一步是对商家的ID做一个分组,算这个商家的销售总额,第二步是外部查询,对商家做分统,这个商家销售金额总额是在哪个区间里,最终回得到每个区间的值和这个区间里面所有商家的数量。大家一步一步来看这个计算怎么做,假设输入数据有四个,包含三个商家,一号商家和二号商家分别有30块和50块钱的订单,三号商家两笔订单,用离线计算的模型分两步,第一步SUM,先按商家的订单总金额,第一层得到1号30,2号10块,3号是130,第二步是分统,得到的结果是0-120,第二个区间一位商家,这个就很直白,没有任何悬念,也没有任何有问题的地方。

下一个是流式计算的场景,做流式计算的人肯定知道流式计算的数据特点是无界的,你的计算程序并不知道数据流什么时候结束,有可能出现乱序,A订单比B订单先完成,但是计算系统有可能是B订单先完成,这跟采集传输计算模型都有关系,还有一个是乱序、延迟,可能这笔订单现在生成,但是要过一个半小时两个小时,因为某些网络的原因,系统的原因,延迟到了计算引擎,这只是一个延迟的Case。流式计算里因为你的输出是不断更新的,假设三号商家50个订单延迟了,第一次输出之前没有到,第一次输出,左边这个部分他的过程跟离线的一样,第一层做SUM,第二层做分统,之前三个订单分别是三个商家分别少于一百块,得到结果0-100区间三个商家,系统算完了这个之后,三号商家第二笔订单进来了,我们在流式计算场景里做一个叠加的计算,我们肯定在之前计算结果上做叠加计算,对这条新流入的数据重复应用两层计算,第一层是计算SUM,三号商家订单总金额80块,再加上新的订单,总金额是130块,第二步做分统,100到200,里面刚好有三号商家。这个时候整个系统输出的结果,第一区间三位商家,第二区间一位商家,显然跟刚才离线计算的结果不一样,离线计算结果第一区间只有两位专家,在这个时代迭代计算的方式得到的结果不一样,这个问题也很好理解,0-130这个结果是过期的计算结果,这个结果在第一次输出的时候对的,因为数据不断更新,这个结果随着时间推移就失效了,不代表当前时刻数据的状态,计算引擎怎么修整这个问题,这就是增量计算或者系统实现里实现的数据撤销,数据的输入在不变的情况下增加撤销的逻辑。

左边部分我不再赘述,计算结果,第一区间三位商家,对第四条数据做增量计算的时候,我们需要注意一个问题,在第一层计算的时候,三号商家聚合节点,它的缓存结构三号商家80块,流入一个订单50块,计算结果更新了,得到三号商家订单总金额130块,他其实需要做更多事情告诉下一个节点,我现在有新的结果给你,老的结果应该作废,所谓的作废是撤销数据,他应该把什么数据作废,就是之前缓存的计算结果作废,这个地方我用了一个-号表示是特殊的撤销消息,并不是计算结果,这个时候整个流式计算系统需要对撤销消息做特殊的处理,来看这个撤销消息,撤销消息的分统工作跟正常的保持一致,他也应该被分到0-100区间,在第二层收到撤销消息的时候需要对应逆操作,撤销了上游的3、80的结果,第二层是+1的,他应该-1,-1之后更新计算结果,从130更新成120,同时需要继续输出一条额外的消息,把130这个消息给撤销掉,对应到实际的运行态里面,有可能从数据库里把130这条记录删掉,或者直接改成120,这两个操作合并成一个。

最右边的3-100计算不受影响,仍然放在100-200区间,最终去掉过时阶段,得到最终结果是120-210,这个跟离线计算的结果是一致的,这就是为什么流式计算里面需要做增量计算,需要撤销这个概念。总结一下,什么场景下需要做撤销,撤销消息是聚合节点产生的,如果你的系统只有一层计算节点,可能并不一定需要做撤销,跟现在的计算结果直接在输出系统里,直接做覆盖就可以了,新结果把老结果覆盖,你看到的一定是最新的有效数据结果,或者你并不Care计算结果是不是精确,差一点就差一点,数据延迟非常小,非常偶然,及时发生也不需要做到完全正确,你也可以不做撤销。还有一个,你希望结果精确,但是又不会做撤销,或者不知道怎么做,或者觉得这个代价太高了,那要牺牲数据的实时性,相当于在这个时刻就不输出了,那你就不需要做撤销,牺牲实时性,比如一个小时才输出一次,等第二次数据进来之后再输出。所以那些对业务有实时性要求比较高,又希望计算结果精确,这个情况需要撤销这件事情的。其实最新的版本里,这一块已经比较完善了,配合API可以解决这个问题,我们Sloth在Flink社区没有这个东西的时候已经做了。

最后讲一下我们成果的总结,因为Sloth整个对Flink的依赖非常重,我们跟社区也有一些互动,为什么不直接使用FlinkSQL,用FlinkSQL在我们项目之初是不现实的,因为它不支持Join,整个SQL是一个雏形,第二,他并不是一个相当的引擎,如果有比Flink更先进的引擎出来,他只需要把引擎换掉就可以了,用户并不需要关注引擎是什么。我们跟社区也有互动,在今年培养了一位Flinkcommiter。以上就是今天分享的内容,谢谢大家。

想要获取大会PPT的朋友可以关注公众号【FMI飞马网】—底部导航栏回复关键词"ppt"进行查阅哦!


分享到:


相關文章: