通過像Parquet這樣的列數據格式進行有效的數據探索

通過Lambda層,Step函數以及通過AWS Athena進行進一步的數據分析,從Elasticsearch到S3上的Apache Parquet文件的無服務器大規模數據提取

特徵提取和降維是機器學習數據準備中常見的基本任務。 在我們的大多數(Kubernetes)項目中,我們使用通用的EFK堆棧(Elasticsearch + Fluentd + Kibana),該堆棧可用於臨時故障排除,但缺乏大規模的數據分析功能。 為了深入瞭解更大的數據集,需要使用不同的方法。 在這裡,我探索一種方法,其中將提取的Elasticsearch數據(25m條或更多記錄)作為列數據文件存儲在S3中,並使用AWS Athena執行以更快的響應時間進行切片和切塊。

列數據格式是BI和數據分析工作負載的關鍵。 它們有助於極大地降低總體磁盤I / O要求,並減少要從磁盤加載的數據量,因此有助於優化分析查詢性能。 最常用的格式是Apache Parquet或Apache ORC。 在本文中,我研究瞭如何大規模提取Elasticsearch數據以進行進一步的分析和提取。

通過像Parquet這樣的列數據格式進行有效的數據探索

目錄

· 通過Lambda層,Step函數以及通過AWS Athena進行進一步的數據分析,從Elasticsearch到S3上的Apache Parquet文件的無服務器大規模數據提取

· 通過滾動Elasticsearch批量導出

· 作為步進功能執行

· 數據存儲為列式文件

· 將所有依賴項打包為Lambda層

· 通過AWS Athena進行數據訪問

· 結論

· 附錄

· Snappy Lambda運行時問題

· Fastparquet安裝問題

通過滾動Elasticsearch批量導出

Elasticsearch具有一個基於http的API,可以從搜索請求中檢索大量結果,其方式與在傳統數據庫中使用遊標的方式幾乎相同。 初始調用在查詢字符串中包含滾動參數,該參數告訴Elasticsearch應該保持"搜索上下文"有效的時間。

<code>from elasticsearch import Elasticsearch
es = Elasticsearch("http://myserver:13000/elasticsearch")
page = es.search(
    index = myindex,
    scroll = '2m',
    size = 10000,
    body = myquery)
sid = page['_scroll_id']
scroll_size = page['hits']['total']/<code>

scroll參數(傳遞給搜索請求,然後傳遞給每個滾動請求)告訴Elasticsearch它應該保持搜索上下文存活多長時間,並且需要足夠長的時間才能在下一個滾動請求之前處理這批結果。 現在可以迭代和檢索與查詢關聯的數據:

<code>while (scroll_size > 0):
    print("Scrolling...", i)
    page = es.scroll(scroll_id = sid, scroll = '2m')
    # Update the scroll ID
    sid = page['_scroll_id']
    # Get the number of results that we returned in the last scroll
    scroll_size = len(page['hits']['hits'])
    print("scroll size: " + str(scroll_size))
    # Convert the page into a Pandas dataframe
    dfES = Select.from_dict(page).to_pandas()
    dfES.drop(columns=['_id', '_index', '_score', '_type'],          inplace=True)/<code>

應當注意,大小的最大值(每批結果將返回的命中數)是10000。在我的情況下,這是一個25m記錄的數據集,因此需要進行2500次滾動提取批次。 為了減少迭代次數,在Lambda函數中,一次調用期間執行15個滾動事件,這將產生150,000條記錄。 連同對結果的進一步處理,每次Lambda調用大約需要60秒,因此整個處理時間大約為3小時。 (如果數據集更大,則可以在Elasticsearch集群的不同節點上並行執行切片滾動)。 整體執行作業作為步進功能運行,每個提取批處理執行一次Lambda執行。

作為步進功能執行

要自動執行滾動Lambda直到提取出最後一頁,Step Function是最佳選擇。

通過像Parquet這樣的列數據格式進行有效的數據探索

在此檢查Lambda的輸出,如果它指示仍然有其他頁面可用,則再次執行Lambda函數。 可以通過Cloudwatch事件觸發Step功能,方法是在給定時間指示"狀態":"開始",並組織所有數據的提取,兩次運行之間傳遞的數據/事件如下:

{
"state": "Scroll",
"scrollId": "DnF1ZXJ5VGhlbkZldGNoBQA2VjRfUUJRAAAAALckuFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JLxZGOW0tSGw4c1JOS1BZVzI2VjRfUUJRAAAAAAAAApAWbGtqLUVOQWVSZm03NmdxTllqNFhZZwAAAAAALckxFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JMBZGOW0tSGw4c1JOS1BZVzI",
"index": 12879232
}

可能的狀態為開始,滾動和完成。 狀態函數定義如下:

<code>{
  "StartAt": "ScrollAgain?",
  "States": {
    "ScrollAgain?": {
      "Type" : "Choice",
      "Choices": [
        {
          "Variable": "$.state",
          "StringEquals": "Done",
          "Next": "FinalState"
        }
      ],
      "Default": "Scrolling"
    },"Scrolling": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxx:function:MyParquetfunction",
      "Next": "ScrollAgain?"
    },"FinalState": {
      "Type": "Succeed"
    }
  }
}/<code>

數據存儲為列式文件

在許多分析用例中,提取的數據可以存儲為CSV文件,以進行進一步處理。 但是,由於數據集很大,因此這種方法實際上不是可行的選擇。

將數據提取存儲在多個數據文件中,特別是作為列存儲文件存儲,具有多個優點,尤其是在將數據與Athena一起使用時。 與基於行的文件(如CSV)相比,Apache Parquet面向列,旨在提供高效的列式數據存儲。 它具有字典編碼,位打包和遊程長度編碼(RLE)等功能來壓縮數據。 Twitter工程團隊的博客文章"使用Parquet使Dremel變得簡單"給出了更詳細的概述。

可以對鑲木地板文件應用其他壓縮,甚至可以對每列應用不同的壓縮算法。

Snappy是一種輕量級且快速壓縮的編解碼器,不需要太多的CPU使用率,但壓縮效果不如gzip或bzip2那樣好。 在這種情況下,應用快速壓縮會將文件大小減少5倍,並且比相同的文件gzip壓縮大2倍。

在Python中,編寫鑲木地板文件並將上傳內容集成到S3中非常容易:

<code>import s3fs
import fastparquet as fp
import pandas as pd
import numpy as np
s3 = s3fs.S3FileSystem()
myopen = s3.open
s3bucket = 'mydata-aws-bucket/'
# random dataframe for demo
df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
parqKey = s3bucket + "datafile"  + ".parq.snappy"
fp.write(parqKey, df ,compression='SNAPPY', open_with=myopen)/<code>

Pandas和Numpy應該被稱為標準Python數據科學庫。 Fastparquet是Parquet文件格式的接口,該文件格式使用Numba Python到LLVM編譯器來提高速度。 它是Dask項目的一個分叉,來自Joe Crobak的python-parquet的原始實現。 S3Fs是S3的Pythonic文件接口,它建立在boto3之上。

將所有依賴項打包為Lambda層

通過壓縮包打包了我上一篇博客文章"使用AWS Lambda和Layers進行純無服務器機器推理的純文本",將要上傳的依賴關係打包。 參見下文,必須相應地設置LD_LIBRARY_PATH,以使Lambda函數訪問該層中已編譯的Snappy庫。 同樣使用Pip和附加目標值進行構建會導致fastparquet安裝出現一些問題,下面提供了更多詳細信息。

rm -rf python && mkdir -p python
docker run --rm -v $(pwd):/foo -w /foo lambci/lambda:build-python3.7 /foo/build.sh

清除"外部"目標目錄,然後在容器內執行與AWS運行時匹配的構建。 build.sh如下所示:

#!/usr/bin/env bash
export PYTHONPATH=$PYTHONPATH:/foo/python
yum install snappy-devel -y
pip install --upgrade pip
pip install -r requirements.txt --no-binary --no-dependencies --no-cache-dir -t python
pip install fastparquet==0.2.1 --no-binary --no-dependencies --no-cache-dir -t python
cp /usr/lib64/libsnappy* /foo/lib64/

如前所述,由於使用了-t python target參數,因此必須從requirements.txt文件中手動刪除fastparquet模塊並進行第二步安裝。 然後可以像下面這樣部署該層:

zip -r MyParquet_layer.zip python lib64

aws s3 cp MyParquet_layer.zip s3://mybucket/layers/

aws lambda publish-layer-version --layer-name MyParquet_layer --content S3Bucket=mybucket,S3Key=layers/MyParquet_layer.zip --compatible-runtimes python3.7

通過AWS Athena進行數據訪問

為了進一步處理數據並提取一些子集,AWS Athena是完美的工具。 AWS Athena是一種無服務器交互式查詢服務,可輕鬆使用標準SQL在S3中分析大量數據。 它開箱即用地處理諸如Parquet或ORC的列狀和壓縮數據。

可以通過以下命令輕鬆創建S3中數據的Athena表:

CREATE EXTERNAL TABLE IF NOT EXISTS myanalytics_parquet (
`column1` string,
`column2` int,
`column3` DOUBLE,
`column4` int,
`column5` string
)


STORED AS PARQUET
LOCATION 's3://mybucket/parquetdata/'
tblproperties ("parquet.compress"="SNAPPY")

此後,數據已經可用,並且可以以驚人的速度和響應時間進行查詢。

<code>SELECT column5, count(*) AS cnt 
FROM myanalytics_parquet
GROUP BY column5
ORDER BY cnt DESC;/<code>

這樣的查詢只需要幾秒鐘即可對完整的數據集執行。 結果數據在分配給Athena實例的默認S3存儲桶中可用,並且可以提取為CSV文件。

結論

從Elasticsearch中批量提取數據並將其轉換為壓縮的Parquet文件(存儲在S3中)是處理大型數據集的有效方法。 將AWS Athena與這些文件一起使用,可以輕鬆快速,高效地對數據集進行切片和切塊,並允許取出定義的數據子集(降維)以進行進一步處理或特徵提取。 與使用Elasticsearch中的原始數據集相比,這無疑是一種更快,更有效的方法。 可以通過滾動功能來完成從Elasticsearch中獲取數據的操作,並使用step函數和lambda運行它是一種有效的無服務器解決方案。

附錄

將snappy和fastparquet安裝到lambda函數中並不是那麼容易,因為它們都使用C擴展名和庫。

Snappy Lambda運行時問題

關於快速安裝,我遇到了以下問題:

ModuleNotFoundError: No module named 'snappy._snappy_cffi'

不幸的是,這是一個掩蓋的錯誤,最初的異常是由於找不到libsnappy.so庫而發生的,然後又發生了另一個異常,如上所示,這最初使我走錯了路。 這裡的問題是Lambda圖層都映射在/ opt目錄下。 在
aws-lambda-container-image-converter存儲庫中未檢測到/ opt下的此問題報告庫#12,該線索表明必須擴展LD_LIBRARY_PATH並將其設置為Lambda函數,例如:

$LAMBDA_TASK_ROOT/lib:$LAMBDA_TASK_ROOT/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:/opt/lib:/opt/lib64:/lib64:/usr/lib64

然後將共享庫放在可以找到共享庫的lib64文件夾中。

Fastparquet安裝問題

快速鑲木地板的安裝也造成了一些麻煩。 如果使用目標參數執行Pip命令,則fastparquet構建無法找到必要的numpy頭文件。 解決此問題的一種方法是,從require.txt文件中刪除fastparquet並將其安裝為第二步。 如果正確設置了PYTHONPATH,則可以找到numpy標頭。

(本文翻譯自Klaus Seiler的文章《Effective data exploration via columnar data formats like Parquet》,參考:
https://medium.com/merapar/effective-data-exploration-via-columnar-data-formats-like-parquet-652466676188)


分享到:


相關文章: