全文共8336字,预计学习时长
24分钟Apache Kafka和机器学习的关系很微妙。
本文旨在讨论建立机器学习框架的一个特定部分:在Kafka应用程序中部署一个分析模型来进行实时预测。
模式训练和模型部署可以是两个独立的过程。但是相同的步骤也可应用于数据集成和数据预处理,因为模型训练和模型推导需要呈现同样的数据集成、过滤、充实和聚合。
本文将讨论和比较两种模型部署的不同选择:有RPC的服务端模型(RPCs)和本地嵌入Kafka客户端应用的模型。本文的例子特地使用了TensorFlow,但是相关原则对其他机器学习/深度学习框架或者产品同样适用。这些框架和产品包括H2O.ai,Deeplearning4j,谷歌云端机器学习引擎和统计分析系统(SAS)。
TensorFlow — 机器学习/深度学习的开源软件库
Tensorflow是一个为高效计算打造的开源软件库.它灵活的架构让多个平台(cpu、gpu、TPUs等)间的计算部署变得更加容易,应用范围从桌面到服务器集群再到移动和边缘设备。该软件由谷歌人工智能组织的研发团队研究员和工程师开发,作为机器学习和深度学习的强力支持,Tensorflow应用于多个领域,是一个完整的生态系统而不是一个孤立的元件。
鉴于本文聚焦于模型服务,主要对保存和加载模型感兴趣。保存和加载模型就是存储训练模型,并将Tensorflow作为模型服务器。
存储模型本质上是一个二进制文件,使用协议缓冲区(Protobuf)序列化。接着模型在C,Python,Java等软件中分类数据、加载数据、存储和处理数据。文件格式是可读的文本格式(.pbtxt)或压缩的二进制协议缓冲区(.pb)。图表对象是在TensorFlow中进行计算的基础。权重保存在单独的TensorFlow检查点文件中。
由于本文关注的是TensorFlow的模型部署,因此如何预训练模型并不重要。可以利用云端服务和像云端机器学习引擎和其谷歌云端平台(GCP)的集成管线,或者建立自己的模型训练途径。Kafka不但在模型部署方面很重要,在数据集成、数据预处理和数据监控方面也扮演重要角色。
使用模型服务器和RPC进行流处理
模型服务器可以自我管理,也可以由分析或者云端供应商托管。模型服务器并不仅为模型推导部署和存储模型,而且还提供诸如版本控制或A/B测试之类的附加功能。从应用程序到模型服务器的交流经常通过请求-响应协议(HTTP)或者谷歌RPC(gRPC)等RPC框架来完成。每次项目运行,都会发生这种介于Kafka应用和模型服务器之间的请求-响应式交流,
有很多模型服务器可供选择。可以从像Seldon Server,PredictionIO,Hydrosphere.io等开源模型服务器中选择或者从H2O.ai,DataRobot,国际商业机器公司(IBM),统计分析系统(SAS)等分析供应商中利用模型服务器。
本文使用TensorFlow提供的服务,即来自TensorFlow的模型服务器。该模型服务器可以实现自我管理,也可以使用云端机器学习引擎服务。TensorFlow服务拥有以下特征。
·包含谷歌RPC(gRPC)和请求-响应协议终端(HTTP)
·呈现模型版本,无需改变客户端代码
·将单个模型推导请求分组,以便联合执行请求
·优化模型推导时间以便最小化延迟
·支持许多可服务项(可服务项是一个模型或者是一个和模型一起提供数据的任务)
o TensorFlow 模型
o 嵌入函数
o 词汇查找表格
o 特征转换
o 不基于TensorFlow的模型
·有能力开展金丝雀发布及A/B测试
下图是Kafka应用和模型服务器的交流过程
执行Kafka应用的过程是直接的。下面是Kafka应用数据流的代码片段以及TensorFlow服务端的RPC。
1.输入Kafka以及TensorFlow服务API
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStream;
- import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;
2.配置Kafka数据流应用
- // Configure Kafka Streams Application
- finalString bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
- final Properties streamsConfiguration = new Properties();
- // Give the Streams application a unique name. The name must be unique
- // in the Kafka cluster against which the application is run.
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
- // Where to find Kafka broker(s).
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
3.向TensorFlow服务端呈现RPC(如果RPC失败,则报备异常情况)
- KStream<string> transformedMessage = imageInputLines.mapValues(value -> {/<string>
- System.out.println("Image path: " + value);
- imagePath = value;
- TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);
- System.out.println("Image = " + imagePath);
- InputStream jpegStream;
- try {
- jpegStream = new FileInputStream(imagePath);
- // Prediction of the TensorFlow Image Recognition model:
- List<map.entry>> list = recogniser.recognise(jpegStream);/<map.entry>
- String prediction = list.toString();
- System.out.println("Prediction: " + prediction);
- recogniser.close();
- jpegStream.close();
- return prediction;
- } catch (Exception e) {
- e.printStackTrace();
- return Collections.emptyList().toString();
- }
- });
4.启动Kafka应用
- // Start Kafka Streams Application to process new incoming images from the Input Topic
- final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
- streams.start();
嵌入式模型的流处理
可以不用模型服务器和RPC交流,直接将模型嵌入Kafka应用。嵌入模型可以通过Kafka本地处理的数据流应用,以Kafka数据流为杠杆。该模型还可以通过KSQL(一种SQL方言)或者Java、Scala、Python、Go.等Kafka客户端应用程序接口。
在这种情况下,Kafka应用无法依赖外部模型服务器。该模型在Kafka应用内加载,例如在Kafka数据流应用内使用TensorFlow的JavaAPI。
同样,执行Kafka应用很简单。这里是在Kafka数据流应用里嵌入TensorFlow模型的代码片段,作为实时预测:
1.输入Kafka和TensorFlowAPI
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.KeyValue;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
- import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
- import org.apache.kafka.streams.kstream.KStream;
- import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
- import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
2.从数据存储(例如亚马逊S3链接)或者数据记忆(例如接受一个Kafkatopic级别参数)中加载TensorFlow模型。
- // Step 1: Load Keras TensorFlow Model using DeepLearning4J API
- String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
- System.out.println(simpleMlp.toString());
- MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);
3.配置Kafka数据流应用
- // Configure Kafka Streams Application
- Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- // Specify default (de)serializers for record keys and for record values
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
4.在数据流中应用TensorFlow模型
- final KStream<string> inputEvents = builder.stream(inputTopic);/<string>
- inputEvents.foreach((key, value) -> {
- // Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
- String[] valuesAsArray = value.split(",");
- INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
- // Model inference in real time:
- output = model.output(input);
- prediction = output.toString();
- });
5.启动Kafka应用
- final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
- streams.cleanUp();
- streams.start();
其他有关TensorFlow,H2O和深度学习4j在Kafka数据流中的应用实例详见GitHub。
甚至可以通过使用知名的测试库编写单元测试框架(unit test),如JUnit和Kafka数据流测试库使用的单元测试框架。
以下是KSQL自定义函数使用的模型部署示例:
需要做的是执行KSQL 自定义函数的Java界面,并且将自定义函数部署到KSQL服务端。过去的博客文章有详细介绍如何建立自己的KSQL自定义函数。通过这种方法,终端用户编写SQL语言查询用来实时应用分析模型。
在应用中直接嵌入何种模型?
并不是所有模型都是嵌入应用的理想模型。考虑模型是否应该嵌入时需要包含以下几点:
·模型性能:越快越好
·模型二进制格式:最好是Java字节码
·模型大小:字节少,存储少为佳
·模型服务器特征:加载即用vs.自己安装vs.不必要
写在Python中的代码之所以运行缓慢是因为动态语言在运行时需要翻译很多变量和请求。
H2O Java分类(例如决策树)可以运行很快。运行时间以微秒计算。
一个只有几兆字节,内存少的TensorFlow Protobuf协议神经网络可以加载得很快。
一个庞大的TensorFlow Protobuf协议神经网络(约100兆字节)需要很大内存,运行相对缓慢。
基于标准的模型(例如 XML/JSON格式数据就是基于预测模型标记语言或者开放神经网络交换)包括除了模型处理(例如数据预处理)的其他步骤。该模型呈现了使用这些标准的组织挑战和技术局限,性能远次于像TensorFlow的Saved Model模型等本地加载模型
最终,不管模型是否直接嵌入应用,这取决于模型本身、硬件设施和项目要求。
在 Kafka应用中重建模型服务器的特征并不难
在应用中嵌入模型并不意味着能即刻使用模型特征。用户必须亲自执行模型。先问自己第一个问题:我需要模型服务器的特征吗?我需要动态更新模型吗?模型版本?A/B测试?金丝雀测试?
好消息是执行模型特征并不困难。这取决于用户的要求和工具配置,你可以:
·启动新版本应用(例如Kubernetes pod容器)
·通过Kafka主题发送并使用模型或权重
·利用服务网络(比如像Envoy,Linkerd, 或者 Istio服务器)
下面来评估权衡各种在Kafka应用里利用分析模型的方法
权衡---模型服务器vs嵌入模型
可以在模型服务器部署一个分析模型并且使用RPC进行交流。或者可以直接在应用里嵌入模型。这里没有最佳选项因为这取决于用户的设施、要求和能力。
为什么和事件流应用一起使用模型服务器和RPC?
·如果你对事件流一无所知,该模型便于理解
·可以让之后迁移到实时流变得可能
·将不同模型,版本和A/B测试放到内置模型管理
·内置监控
为什么在事件流应用中嵌入模型?
·借助本地推理实现更好的延迟,而无需进行远程呼叫
·脱机推断(设备,边缘处理等)
·Kafka Streams应用程序的可用性,可伸缩性和延迟/吞吐量与RPC接口的SLA之间没有耦合
·无副作用(比如失败的风险)- Kafka应用处理包含了各种因素(例如具体一次)
两个选项各有利弊,根据不同场合推荐使用。
Kubernetes(K8s)云原生模型部署
在云原生框架中,两种方法都可以获得好处。即使其他的云原生技术有相似的特征,下面仍用Kubernetes作为云原生环境。
将模型嵌入到Kafka应用中,可以获得独立pod数据结构的所有优势。独立的pod数据结构是流式数据处理和模型推导的容器,不依赖外部的模型服务器。
在以下示例中,可以独立测量嵌入模型的Kafka数据流应用,启动新版本,加入A/B测试或者其他路径,用像Envoy或者Linkerd的云原生代理服务器处理异常。
如果仍然想获取模型服务器的优点及特性,可以使用边车设计模式。Kubernetes支持将具有特定任务的其他容器添加到Pod中。在以下示例中,将Kafka Streams应用程序部署在一个容器中,而模型服务器作为边车部署在同一pod内的另一个容器中。
这样就可以利用模型服务器的功能以及单个容器的坚固性和可扩展性。它仍然具有在每个容器之间使用RPC的缺点。通过将两个容器部署在同一容器中,可以最大程度地减少等待时间和潜在错误。
边缘模型部署
模型也不是经常部署在云端或者是数据中心。在某些情况下,模型可以部署在边缘,边缘部署意味着:
·边缘数据中心或者边缘设备/机器
·边缘有一个Kafka应用集群,一个中介和一个Kafka应用客户端。
·一个强大的客户端(比如KSQL或者Java)或者一个轻量级的客户端(比如C或者JavaScript)
·一个嵌入模型或者RPC模型推导
·本地或者远程训练
·对法律和法规的影响
对于某些电信提供商来说,边缘计算的定义是超低延迟,端与端之间的通信时间不到100毫秒。这是通过诸如开源云基础架构软件堆栈StarlingX之类的框架实现的,该框架需要完整的OpenStack和Kubernetes集群以及对象存储。对于其他对象来说,“边缘”意味着移动设备、轻量级板或传感器,可以在其中部署非常小的轻量级C应用程序和模型的移动设备。
从Kafka的角度来看,有很多选择。可以使用librdkafka(本机Kafka C / C ++客户端库)完全构建轻量级的边缘应用程序,该库由Confluent完全支持。还可以使用JavaScript并利用REST代理或WebSocket集成进行 Kafka通信,将模型嵌入移动应用程序中。
Kafka的独立技术模型部署
模型部署在过程和技术上可以与模型训练完全分开。部署基础架构可以处理不同的模型,甚至可以使用不同的机器学习框架训练模型。Kafka还为构建机器学习监控提供了良好的基础,包括基础设施的技术监控和特定于模型的监控,例如性能或模型准确性。
无论是否要使用Kafka实施所有功能,包括数据集成,预处理,模型部署和监视,或者是否仅使用Kafka客户将模型嵌入到实时的Kafka客户端,Kafka都是适用于机器学习基础设施的补充工具(与数据预处理和模型训练完全分开)Kafka。
对于模型部署,有两种选择:模型服务器(RPC)和嵌入式模型。了解每种方法的利弊将有助于为项目做出正确决定。实际上,将分析模型嵌入到Kafka应用程序中很简单,而且非常实用。
我们一起分享AI学习与发展的干货
閱讀更多 讀芯術 的文章