使用 tf.Transform 對 TensorFlow 管道模式進行預處理

文 / Matthias Feys,ML6 首席技術官

機器學習模型需要數據來訓練,但是通常需要對這些數據進行預處理,以便在訓練模型時發揮作用。這種預處理(通常稱為 “特徵工程”)採用多種形式,例如:規範化和縮放數據,將分類值編碼為數值,形成詞彙表,以及連續數值的分級。

在生產過程中利用機器學習時,為了確保在模型的離線培訓期間應用的特徵工程步驟與使用模型用於預測時應用的特徵工程步驟保持相同,這往往就成為一項極具挑戰性的任務。此外,放眼當今世界,機器學習模型會在超大型的數據集上進行訓練,因此在訓練期間應用的預處理步驟將會在大規模分佈式計算框架(例如 Google Cloud Dataflow 或 Apache Spark)上實現。由於訓練環境通常與服務環境大相徑庭,在訓練和服務期間執行的特徵工程之間可能會產生不一致的情況。

幸運的是,我們現在有了 tf.Transform,這是一個 TensorFlow 庫,它提供了一個優雅的解決方案,以確保在訓練和服務期間特徵工程步驟的一致性。 在這篇文章中,我們將提供在 Google Cloud Dataflow 上使用 tf.Transform,以及在 Cloud ML Engine 上進行模型訓練和服務的具體示例。

注:tf.Transform 鏈接

https://github.com/tensorflow/transform

應用於機器模擬上的變換用例

ecc.ai 是一個有助於優化機器配置的平臺。 我們模擬物理機器(例如瓶灌裝機或餅乾機)以便找到更優化的參數設置。 由於每個模擬的物理機器的目標是具有與實際機器相同的輸入/輸出特性,我們稱之為 “數字孿生”。

這篇文章將展示這個 “數字孿生” 的設計和實現過程。 在最後一段中,您可以找到有關我們之後如何使用這些數字孿生來優化機器配置的更多信息。

注:ecc.ai 鏈接

https://ecc.ai/

tf.Transform 釋義

tf.Transform 是 TensorFlow 的一個庫,它允許用戶定義預處理管道模式並使用大規模數據處理框架運行這些管道模式,同時還以可以作為 TensorFlow 圖形的一部分運行的方式導出管道。 用戶通過組合模塊化 Python 函數來定義管道,然後 tf.Transform 隨著 Apache Beam 一起運行。 tf.Transform 導出的 TensorFlow 圖形可以在使用訓練模型進行預測時複製預處理步驟,比如在使用 TensorFlow Serving 服務模型時。

注:Apache Beam 鏈接

https://beam.apache.org/

TensorFlow Serving 鏈接

https://ai.googleblog.com/2016/02/running-your-models-in-production-with.html

使用 tf.Transform 對 TensorFlow 管道模式進行預處理

tf.Transform 允許用戶定義預處理管道。 用戶可以實現預處理數據以用於 TensorFlow 訓練,還可以將轉換編碼為 TensorFlow 圖形後導出。然後將該變換圖形結合到用於推斷的模型圖中

tf.Transform 建立數字孿生

數字雙模型的目標是能夠根據其輸入預測機器的所有輸出參數。 為了訓練這個模型,我們分析了包含這種關係的觀察記錄歷史的日誌數據。 由於日誌的數據量可能會相當廣泛,理想的情況是應該以分佈式方式運行此步驟。 此外,必須在訓練和服務的時間之間使用相同的概念和代碼,這樣對預處理代碼的改動最小。

開發伊始,我們在任何現有的開源項目中都找不到此功能。 因此,我們開始構建用於 Apache Beam 預處理的自定義工具,這使我們能夠分配我們的工作負載並輕鬆地在多臺機器之間切換。 但是不太幸運的是,這種方法不允許我們在服務時(即在生產環境中使用訓練模型時)重複使用相同的代碼作為 TensorFlow 圖形的一部分運行。

在實踐中,我們必須在 Apache Beam 中編寫自定義分析步驟,計算並保存每個變量所需的元數據,以便在後續步驟中進行實際的預處理。 我們在訓練期間使用 Apache Beam 執行後續預處理步驟,並在服務期間作為 API 的一部分執行。 不幸的是,由於它不是 TensorFlow 圖形的一部分,我們不能簡單地使用 ML Engine 將我們的模型部署為 API,而我們的 API 總是由預處理部分和模型部分組成,這使得統一升級變得更加困難。而且,對於所有想要使用的那些已有的和全新的轉換,我們需要為此實施和維護分析並轉換步驟。

TensorFlow Transform 解決了這些問題。 自發布以來,我們將其直接整合為我們完整管道模式的主要構建塊。

簡化數字孿生示例流程

我們現在將專注於構建和使用特定機器的數字孿生。 舉個例子,我們假設有一個布朗尼麵糰機器。 這臺機器對不同的原料進行加熱、攪拌,直到麵糰產生完美的質地。 我們將從批次問題開始,這意味著數據在完整的生產批次中進行彙總,而不是在連續不斷的生產線上進行彙總。

數據

我們有兩種類型的數據:

輸入數據:原料描述(綠色)和布朗尼麵糰機(藍色)的設置。 您可以在下面找到列名稱和 3 個示例行。

使用 tf.Transform 對 TensorFlow 管道模式進行預處理

輸出數據:帶有這些原料的機器設置結果:消耗的能量,輸出的質量度量和輸出量。 您可以在下面找到列名稱和 3 個示例行。

使用 tf.Transform 對 TensorFlow 管道模式進行預處理

製作數字孿生

使用 tf.Transform 對 TensorFlow 管道模式進行預處理

在這裡,我們在雲存儲中根據兩種不同類型文件的歷史日誌數據來訓練系統的數字孿生。 該數字孿生能夠基於輸入數據預測輸出數據。上圖顯示我們在此流程中使用的 Google 服務。

預處理

使用 tf.Transform 函數,Apache Beam 將完成預處理(製作訓練示例)。

預處理階段包括 4 個步驟,代碼如下:

1. 組合輸入/輸出數據,並製作原始數據 PCollection。

1 raw_data_input = ( 
2 p
3 | 'ReadInputData' >> textio.ReadFromText(train_data_file)
4 | 'ParseInputCSV'>> beam.Map(converter_input.decode)
5 | 'ExtractBatchKeyIn'>> beam.Map(extract_batchkey))
6
7 raw_data_output = (
8 p
9 | 'ReadOutputData' >> textio.ReadFromText(train_data_file)
10 | 'ParseOutputCSV'>> beam.Map(converter_output.decode)
11 | 'ExtractBatchKeyOut'>> beam.Map(extract_batchkey))
12
13 raw_data = (
14 (raw_data_input, raw_data_output)
15 | 'JoinData' >> CoGroupByKey()
16 | 'RemoveKeys'>> beam.Map(remove_keys))

2. 定義將預處理原始數據的預處理功能。 此函數將組合多個 TF-Transform 函數,以生成 TensorFlow Estimators 的示例。

Language: Python

1 def preprocessing_fn(inputs): 
2 """Preprocess input columns into transformed columns."""
3 outputs = {}
4 # Encode categorical column:
5 outputs['Mixing Speed'] = tft.string_to_int(inputs['Mixing Speed'])
6 # Calculate Derived Features:
7 outputs['Total Mass'] = inputs['Butter Mass'] + inputs['Sugar Mass'] + inputs['Flour Mass']
8 for ingredient in ['Butter', 'Sugar', 'Flour']:
9 ingredient_percentage = inputs['{} Mass'.format(ingredient)] / outputs['Total Mass']
10 outputs['Norm {} perc'.format(ingredient)] = tft.scale_to_z_score(ingredient_percentage)
11 # Keep absolute numeric columns
12 for key in ['Total Volume', 'Energy']:
13 outputs[key]=inputs[key]
14 # Normalize other numeric columns
15 for key in [
16 'Butter Temperature',
17 'Sugar Humidity',
18 'Flour Humidity'
19 'Heating Time',
20 'Mixing Time',
21 'Density',
22 'Temperature',
23 'Humidity',
24 ]:
25 outputs[key] = tft.scale_to_z_score(inputs[key]) 26 # Extract Specific Problems
27 chunks_detected_str = tf.regex_replace(
28 inputs['Problems'],
29 '.*chunk.*'
30 'chunk',
31 name='Detect Chunk')
32 outputs['Chunks']=tf.equal(chunks_detected_str,'chunk')
33 return outputs

3. 使用預處理功能分析和轉換整個數據集。這部分代碼將採用預處理功能,首先分析數據集,即完整傳遞數據集以計算分類列的詞彙表,然後計算平均值和標準化列的標準偏差。 接下來,Analyze 步驟的輸出用於轉換整個數據集。

1 transform_fn = raw_data | AnalyzeDataset(preprocessing_fn)
2 transformed_data = (raw_data, transform_fn) | TransformDataset()
4. 保存數據並將 TransformFn 和元數據文件序列化。

1 transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
2 transformed_eval_data_base,
3 coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))
4
5 _ = (
6 transform_fn
7 | "WriteTransformFn" >>
8 transform_fn_io.WriteTransformFn(working_dir))
9
10
11 transformed_metadata | 'WriteMetadata' >> beam_metadata_io.WriteMetadata(
12 transformed_metadata_file, pipeline=p)

訓練

使用預處理數據作為 `TFRecords`,我們現在可以使用 Estimators 輕鬆訓練帶有標準 TensorFlow 代碼的 TensorFlow 模型。

導出訓練的模型

在分析數據集的結構化方法旁邊,tf.Transform 的實際功能在於可以導出預處理圖。 您可以導出 TensorFlow 模型,該模型包含與訓練數據完全相同的預處理步驟。

為此,我們只需要使用 tf.Transform 輸入函數導出訓練模型:

1 tf_transform_output = tft.TFTransformOutput(working_dir)
2 serving_input_fn = _make_serving_input_fn(tf_transform_output)
3 exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR)
4 estimator.export_savedmodel(exported_model_dir, serving_input_fn)

_make_serving_input_fn 函數是一個非常通用的函數,不管項目的邏輯如何,您都可以簡單地在不同項目之間重用:

Language: Python

1 def _make_serving_input_fn(tf_transform_output): 
2 raw_feature_spec = RAW_DATA_METADATA.schema.as_feature_spec()
3 raw_feature_spec.pop(LABEL_KEY)
4
5 def serving_input_fn():
6 raw_input_fn = input_fn_utils.build_parsing_serving_input_fn(
7 raw_feature_spec)
8 raw_features, _, default_inputs = raw_input_fn()
9 transformed_features = tf_transform_output.transform_raw_features(
10 raw_features)
11 return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)
12
13 return serving_input_fn

使用數字孿生

使用 tf.Transform 對 TensorFlow 管道模式進行預處理

數字孿生示例流程的最後一部分使用保存的模型根據輸入預測系統的輸出。 這是我們可以充分利用 tf.Transform 的地方,因為這使得在 Cloud ML Engine 上部署 “TrainedModel”(包括預處理)變得非常容易。

要部署訓練模型,您只需運行 2 個命令:

1 gcloud ml-engine models create MODEL_NAME
2 gcloud ml-engine versions create VERSION --model=MODEL_NAME --origin=ORIGIN

現在,我們可以使用以下代碼輕鬆地與我們的數字孿生進行交互

1 def get_predictions(project, model, instances, version=None): 
2 service = discovery.build('ml', 'v1')
3 name = 'projects/{}/models/{}'.format(project, model)
4
5 if version is not None:
6 name += '/versions/{}'.format(version)
7
8 response = service.projects().predict(
9 name=name,
10 body={'instances': instances}
11 ).execute()
12
13 if 'error' in response:
14 raise RuntimeError(response['error'])
15
16 return response['predictions']
17
18
19 if __name__ == "__main__":
20 predictions = get_predictions(
21 project="",
22 model="",
23 instances=[
24 {
25 "Butter Mass": 121,

26 "Butter Temperature": 20,
27 "Sugar Mass": 200,
28 "Sugar Humidity": 0.22,
29 "Flour Mass ": 50,
30 "Flour Humidity": 0.23,
31 "Heating Time": 50,
32 "Mixing Speed": "Max Speed",
33 "Mixing Time": 200
34 }]
35 )

在 ecc.ai,我們使用數字孿生來優化物理機器的參數。

簡而言之,我們的方法包括 3 個步驟(如下圖 1 所示):

  • 使用歷史機器數據創建模擬環境。機器的這種 “數字孿生” 則將作為能夠允許增強代理來學習最佳控制策略的環境。
  • 利用數字孿生使用我們的強化學習(RL)代理查找(新的)最佳參數設置。
  • 使用 RL 代理配置真實機器的參數。
使用 tf.Transform 對 TensorFlow 管道模式進行預處理

總結

通過 tf.Transform,我們現在已將我們的模型部署在 ML Engine 上作為一個 API,成為特定布朗尼麵糰機的數字孿生:它採用原始輸入功能(成分描述和機器設置),並將反饋機器的預測輸出。

好處是我們不需要維護 API 並且包含所有內容 - 因為預處理是服務圖形的一部分。 如果我們需要更新 API,只需要使用最新的版本來刷新模型,所有相關的預處理步驟將會自動為您更新。

此外,如果我們需要為另一個布朗尼麵糰機器(使用相同數據格式的機器)製作數字孿生模型,但是是在不同的工廠或設置中運行,我們也可以輕鬆地重新運行相同的代碼,無需手動調整預處理代碼或執行自定義分析步驟。

您可以在 GitHub 上找到這篇文章的代碼。

注:GitHub 鏈接

https://github.com/Fematich/tftransform-demo


分享到:


相關文章: