【Greenplum ETL】Greenplum-Spark Connector 介紹

1. 前序

Greenplum 是一款優秀的 mpp 數據庫產品,官方推薦了幾種將外部數據寫入 Greenplum 方式,包含:通用的 Jdbc,gpload 以及 Pivotal Greenplum-Spark Connector 等。

  • Jdbc:Jdbc 方式,寫大數據量會很慢。
  • gpload:適合寫大數據量數據,能並行寫入。但其缺點是需要安裝客戶端,包括 gpfdist 等依賴,安裝起來很麻煩。需要了解可以參考 gpload。
  • Greenplum-Spark Connector:基於 Spark 並行處理,並行寫入 Greenplum,並提供了並行讀取的接口。也是接下來該文重點介紹的部分。

2. Greenplum-Spark Connector 讀數據架構

一個 Spark application,是由 Driver 和 Executor 節點構成。當 Spark Application 使用 Greenplum-Spark Connector 加載 Greenplum 數據時,其 Driver 端會通過 JDBC 的方式請求 Greenplum 的 master 節點獲取相關的元數據信息。Connector 將會根據這些元數據信息去決定 Spark 的 Executor 去怎樣去並行的讀取該表的數據。

Greenplum 數據庫存儲數據是按 segment 組織的,Greenplum-Spark Connector 在加載 Greenplum 數據時,需要指定 Greenplum 表的一個字段作為 Spark 的 partition 字段,Connector 會使用這個字段的值來計算,該 Greenplum 表的某個 segment 該被哪一個或多個 Spark partition 讀取。

其讀取過程如下:

  1. Spark Driver 通過 Jdbc 的方式連接 Greenplum master,並讀取指定表的相關元數據信息。然後根據指定的分區字段以及分區個數去決定 segment 怎麼分配。
  2. Spark Executor 端會通過 Jdbc 的方式連接 Greenplum master,創建 Greenplum 外部表。
  3. 然後 Spark Executor 通過 Http 方式連接 Greenplum 的數據節點,獲取指定的 segment 的數據。該獲取數據的操作在 Spark Executor 並行執行。
【Greenplum ETL】Greenplum-Spark Connector 介紹

3. Greenplum-Spark Connector 寫數據流程

1.GSC 在 Spark Executor 端通過 Jetty 啟動一個 Http 服務,將該服務封裝為支持 Greenplum 的 gpfdist 協議。

2.GSC 在 Spark Executor 端通過 Jdbc 方式連接 Greenplum master,創建 Greenplum 外部表,該外部表文件地址指向該 Executor 所啟動的 gpfdist 協議地址。SQL示例如下:

<code>CREATE READABLE EXTERNAL TABLE
"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")
LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')
FORMAT 'CSV'
(DELIMITER AS '|'
NULL AS '')
ENCODING 'UTF-8'/<code>

3.GSC 在 Spark Executor 端通過 Jdbc 方式連接 Greenplum master,然後執行 insert 語句至真實的表中,數據來源於這張外部表。SQL 示例如下:

<code>INSERT INTO "public"."rank_a1"
SELECT *
FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"/<code>

至於這張外部表的數據,是否落地當前 Executor 服務器,不清楚。猜測不會落地,而是直接通過 Http 直接傳遞給了 Greenplum 對應的 Segment。

4.GSC監聽 onApplicationEnd 事件,在 Spark application 結束後,刪除創建的外部表。

4. Greenplum-Spark Connector 使用

1.下載 GSC Jar 包。 下載地址:https://network.pivotal.io/products/pivotal-gpdb/releases。 可直接下載最新版本的 GSC,即1.6.2,支持 Greenplum5.0 之後的版本。如:

<code>greenplum-spark_2.11-1.6.2.jar/<code>

2.Maven 中引入:

<code>        <dependency>
<groupid>io.pivotal.greenplum.spark/<groupid>
<artifactid>greenplum-spark_2.11/<artifactid>
<version>1.6.2/<version>
/<dependency>/<code>

3. Spark 提交引入:

  • spark-shell 或 spark-submit 時候,通過-jars加入 greenplum-spark_2.11-1.6.2.jar。
  • 將 greenplum-spark_2.11-1.6.2.jar 與 Spark application 包打成 uber jar 提交。

5. Greenplum-Spark Connector 參數

【Greenplum ETL】Greenplum-Spark Connector 介紹

【Greenplum ETL】Greenplum-Spark Connector 介紹

6. 從 Greenplum 讀取數據

1.DataFrameReader.load() 方式:

<code>val gscReadOptionMap = Map(
"url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
"user" -> "bill",
"password" -> "changeme",
"dbschema" -> "myschema",
"dbtable" -> "table1",
"partitionColumn" -> "id"
)

val gpdf = spark.read.format("greenplum")
.options(gscReadOptionMap)
.load()/<code>

2.spark.read.greenplum() 方式:

<code>val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"
val tblname = "avgdelay"
val jprops = new Properties()
jprops.put("user", "user2")
jprops.put("password", "changeme")
jprops.put("partitionColumn", "airlineid")
val gpdf = spark.read.greenplum(url, tblname, jprops)/<code>

然而,這種方式必然需要引入一個隱式轉換,官網也沒介紹。

7. 寫數據至 Greenplum

1.寫數據示例:

<code>val gscWriteOptionMap = Map(
"url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
"user" -> "bill",
"password" -> "changeme",
"dbschema" -> "myschema",
"dbtable" -> "table2",
)

dfToWrite.write.format("greenplum")
.options(gscWriteOptionMap)
.save()/<code>

在通過 GSC 寫到 Greenplum 表時,如果表已經存在或表中已經存在數據,可通過 DataFrameWriter.mode(SaveMode savemode) 方式指定其輸出模式。相關模式行為如下:

【Greenplum ETL】Greenplum-Spark Connector 介紹

2.GSC 自動建表

2.1 創建的 Greenplum 表將不會有 distribution 列,如下為 GSC 生成的建表語句:

<code>CREATE TABLE "public"."rank_a1" 
("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);/<code>

2.2 創建的 Greenplum 表的字段名將會使用 Spark DataFrame 中的字段名。

2.3 在 GSC 自動建表時,將會為字段名加上雙引號,這將使 Greenplum 區分大小寫。

2.4 當 Spark DataFrame 的字段不為 nullable 時,GSC 自動建表的字段將是 NOT NULL。

2.5 將會對應的 Spark DataFrame 字段類型映射為 Greenplum 的字段類型。參考,字段類型映射表。

3.提前手動建表

3.1 將 Spark DataFrame 的字段名的數據寫至 Greenplum 表的對應的字段中。值得注意的是,GSC在做映射的時候,是嚴格區分大小寫的。

3.2 寫至 Greenplum 的字段的數據類型,與對應的 Spark DataFrame 一致,具體參見字段類型映射。

3.3 如果 Spark 數據中某列包含空數據,需確保對應的 Greenplum 表的列沒有被指定為 NOT NULL。

3.4 Greenplum 表中建表時其字段順序可以與 Spark DataFrame 中不一致。但 Greenplum 表中不能出現不存在在 Spark DataFrame 中的字段。如下例子:

<code>// Greenplum 中的字段
CREATE TABLE public.rank_a1 (
\tid int4 NOT NULL,
\t"rank" text NULL,
\t"year" int4 NOT NULL,
\tgender int4 NOT NULL,
\tcount int4 NOT NULL
)
DISTRIBUTED BY (id);

// Spark DataFrame中的字段
var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")

// 在寫數據至public.rank_a1表時,將會報錯如下
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (5): _1, _2, _3, _4, _5
New column names (4): id, rank, year, gender
\tat scala.Predef$.require(Predef.scala:224)
\tat org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
\tat org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
\tat com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)
\tat com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)/<code>

3.5 確保指定的用戶對於該表有讀寫的權限,自動建表,需要有建表的權限。

8. Troubleshooting

1. 端口相關問題

【Greenplum ETL】Greenplum-Spark Connector 介紹

2. Greenplum 連接數問題

當連接 Greenplum 的連接數接近 Greenplum 數據庫配置的最大連接數(max_connections)時。Spark application 將會拋出 connection limit exceeded 錯誤。

排查過程:

2.1 查詢 Greenplum 數據的最大連接數:

<code>postgres=# show max_connections;
max_connections
-----------------
250
(1 row)/<code>

2.2 查詢當前連接Greenplum數據庫的連接數:

<code>postgres=# SELECT count(*) FROM pg_stat_activity;/<code>

2.3 查詢指定的用戶連接 Greenplum 數據的連接數:

<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';/<code>

2.4 查詢 Greenplum 數據庫空閒和活動的連接數:

<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<idle>';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<idle>';/<idle>/<idle>/<code>

2.5 查詢連接 Greenplum 數據庫名,用戶名,客戶端地址,客戶端ip,當前查詢語句:

<code>postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;/<code>

如果確認是 Spark application 使用連接數過多,則配置 JDBC Connection Pooling 相關參數,減少連接數。

3. Greenplum Database Data Length Errors

在使用 Greenplum 4.x 或 5.x 的時候,可能會報出“data line too long”錯誤。這是因為在 Greenplum 數據庫中參數項“gp_max_csv_line_length”默認值是1M。需要登陸 Greenplum master 修改這個參數值。示例如下,通過 gpconfig 修改該參數的值為5M:

<code>gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880
gpadmin@gpmaster$ gpstop -u/<code>

9. 類型映射表

1. Greenplum to Spark

【Greenplum ETL】Greenplum-Spark Connector 介紹

2. Spark to Greenplum

【Greenplum ETL】Greenplum-Spark Connector 介紹

10. 參考

  1. Greenplum-Spark Connector 官方文檔:https://greenplum-spark.docs.pivotal.io/1-6/overview.html
  2. Greenplum 建表語句文檔:https://gpdb.docs.pivotal.io/510/ref_guide/sql_commands/CREATE_EXTERNAL_TABLE.html#topic1
  3. Greenplum 參數配置官方文檔:https://gpdb.docs.pivotal.io/5250/ref_guide/config_params/guc-list.html#gp_max_csv_line_length




秣碼一盞

專注大數據、微服務領域,大數據平臺建設。


分享到:


相關文章: