比pgload更快更方便寫入大數據量至Greenplum的Spark Connector

前序

Greenplum是目前比較優秀的mpp數據庫,其官方推薦了幾種將外部數據寫入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。

  • Jdbc:Jdbc方式,寫大數據量會很慢。
  • pgcopy:其中pgcopy是及其不推薦的一種,因為其寫數據必須經過Greenplum的master,因此也只建議小數據量使用。
  • pgload:適合寫大數據量數據,能並行寫入。但其缺點是需要安裝客戶端,包括gpfdist等依賴,安裝起來很麻煩。需要了解可以參考pgload。
  • Greenplum-Spark Connector:基於Spark並行處理,並行寫入Greenplum,並提供了並行讀取的接口。也是接下來該文重點介紹的部分。

1. Greenplum-Spark Connector是啥

Greenplum-Spark Connector(GSC)是Pivotal提供的一個高性能並行的讀寫Greenplum的工具。不需要去安裝麻煩的Greenplum loader客戶端,也不用去實現繁瑣的copy代碼。

2. Greenplum-Spark Connector讀數據架構

一個Spark application,是由Driver和Executor節點構成。當Spark application使用Greenplum-Spark Connector加載Greenplum數據時,其Driver端會通過JDBC的方式請求Greenplum的master節點獲取相關的元數據信息。Connector將會根據這些元數據信息去決定Spark的Executor去怎樣去並行的讀取該表的數據。

Greenplum數據庫存儲數據是按segment組織的,Greenplum-Spark Connector在加載Greenplum數據時,需要指定Greenplum表的一個字段作為Spark的partition字段,Connector會使用這個字段的值來計算,該Greenplum表的某個segment該被哪一個或多個Spark partition讀取。

其讀取過程如下:

  1. Spark Driver通過Jdbc的方式連接Greenplum master,並讀取指定表的相關元數據信息。然後根據指定的分區字段以及分區個數去決定segment怎麼分配。
  2. Spark Executor端會通過Jdbc的方式連接Greenplum master,創建Greenplum外部表。
  3. 然後Spark Executor通過Http方式連接Greenplum的數據節點,獲取指定的segment的數據。該獲取數據的操作在Spark Executor並行執行。

其示意流程圖如下:

比pgload更快更方便寫入大數據量至Greenplum的Spark Connector

3. Greenplum-Spark Connector寫數據流程

  1. GSC在Spark Executor端通過Jetty啟動一個Http服務,將該服務封裝為支持Greenplum的gpfdist協議。
  2. GSC在Spark Executor端通過Jdbc方式連接Greenplum master,創建Greenplum外部表,該外部表文件地址指向該Executor所啟動的gpfdist協議地址。SQL示例如下:
<code>CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING 'UTF-8'/<code>
  1. GSC在Spark Executor端通過Jdbc方式連接Greenplum master,然後執行insert語句至真實的表中,數據來源於這張外部表。SQL示例如下:
<code>INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"/<code>

至於這張外部表的數據,是否落地當前Executor服務器,不清楚。猜測不會落地,而是直接通過Http直接傳遞給了Greenplum對應的Segment。

  1. GSC監聽onApplicationEnd事件,在Spark application結束後,刪除創建的外部表。

4. Greenplum-Spark Connector使用

  1. 下載GSC Jar包。 下載地址:Pivotal Network。 可直接下載最新版本的GSC即1.6.2,支持Greenplum5.0之後的版本。greenplum-spark_-.jar,如:
<code>greenplum-spark_2.11-1.6.2.jar/<code>
  1. maven中引入:
<code>        <dependency>            
<groupid>io.pivotal.greenplum.spark/<groupid>
\t\t\t\t\t<artifactid>greenplum-spark_2.11/<artifactid>
\t\t\t\t\t<version>1.6.2/<version>
\t\t\t\t/<dependency>/<code>
  1. spark提交引入:
  • spark-shell或spark-submit時候,通過--jars加入greenplum-spark_2.11-1.6.2.jar。
  • 將greenplum-spark_2.11-1.6.2.jar與Spark application包打成 uber jar 提交。

5. Greenplum-Spark Connector參數

參數名:url

參數描述:Jdbc連接的url。

作用域:讀,寫


參數名:dbschema

參數描述:Greenplum數據庫的schema,GSC創建的臨時外部表也在該schema下,默認值為public。

作用域:讀,寫


參數名:dbtable

參數描述:Greenplum數據庫的表名,GSC在讀取時,會讀取dbschema下的表。GSC在寫數據時,如果該表不存在會自動創建。

作用域:讀,寫


參數名:driver

參數描述:Jdbc driver全類名,非必填,在GSC Jar包中已經包含了driver包。

作用域:讀,寫


參數名:user

參數描述:用戶名

作用域:讀,寫


參數名:password

參數描述:密碼

作用域:讀,寫


參數名:partitionColumn

參數描述:Greenplum數據表的字段,該字段將作為Spark分區的字段,支持integer, bigint, serial, bigserial4中類型,該字段名需小寫。該字段為必填,且必須是Greenplum表建表時 DISTRIBUTED BY ()語句中的字段。

作用域:讀


參數名:partitions

參數描述:Spark分區數,非必填,其默認值為Greenplum的primary segments數量。

作用域:讀


參數名:truncate

參數描述:當在Spark中指定了輸出模式為SaveMode.Overwrite時候,寫的目標表存在的時候的策略,非必填。默認為false,即GSC將會先刪除然後重新創建目標表,然後在寫數據。當為true時,GSC將會先truncates目標表,然後在寫入數據。

作用域:寫


參數名:iteratorOptimization

參數描述:指定寫數據時內存模式,非必填。默認指為true,GSC將會使用 Iterator 方式。當為false時,GSC將會在寫數據時將數據存儲在內存中。

作用域:寫


參數名:server.port

參數描述:指定在Spark Worker端啟動gpfdist服務的端口號,非必填。默認情況下會使用隨機的端口號。

作用域:讀,寫


參數名:server.useHostname

參數描述:指定是否使用Spark Worker節點的host name為gpfdis服務的地址,非必填。默認為false。

作用域:讀,寫


參數名:pool.maxSize

參數描述:GSC連接Greenplum的連接池的最大連接數,默認為64。

作用域:讀,寫


參數名:pool.timeoutMs

參數描述:非活動連接被認為是空閒連接的時間,毫秒值。默認為10000(10秒)。

作用域:讀,寫


參數名:pool.minIdle

參數描述:GSC連接Greenplum的連接池的最小空閒連接數,默認為0。

作用域:讀,寫


6. 從Greenplum讀取數據

  1. DataFrameReader.load()方式:
<code>val gscReadOptionMap = Map(      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",      "user" -> "bill",      "password" -> "changeme",      "dbschema" -> "myschema",      "dbtable" -> "table1",      "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum")      .options(gscReadOptionMap)      .load()/<code>
  1. spark.read.greenplum()方式:
<code>val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)/<code>

然鵝,這種方式必然需要引入一個隱式轉換,官網也沒介紹。

7. 寫數據至Greenplum

7.1. 寫數據示例:

<code>val gscWriteOptionMap = Map("url" -> "jdbc:postgresql://gpdb-master:5432/testdb",      "user" -> "bill",      "password" -> "changeme",      "dbschema" -> "myschema",      "dbtable" -> "table2",)dfToWrite.write.format("greenplum")      .options(gscWriteOptionMap)      .save()/<code>

在通過GSC寫到Greenplum表時,如果表已經存在或表中已經存在數據,可通過DataFrameWriter.mode(SaveMode savemode)方式指定其輸出模式。相關模式行為如下:

SaveMode:ErrorIfExists

行為:如果Greenplum數據表已經存在則GSC直接返回錯誤,該策略為默認策略。


SaveMode:Append

行為:直接將Spark中數據追加至表中。


SaveMode:Ignore

行為:如果Greenplum數據表已經存在,GSC將不會寫數據至表中也不會去修改已經存在的數據。


SaveMode:Overwrite

行為:如果Greenplum數據表已經存在,則truncate參數將會生效。默認為false,即GSC將會先刪除然後重新創建目標表,然後在寫數據。當為true時,GSC將會先truncates目標表,然後在寫入數據。

7.2. GSC自動建表:

  1. 創建的Greenplum表將不會有distribution列,如下為GSC生成的建表語句:
<code>CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);/<code>
  1. 創建的Greenplum表的字段名將會使用Spark DataFrame中的字段名。
  2. 在GSC自動建表時,將會為字段名加上雙引號,這將使Greenplum區分大小寫。
  3. 當Spark DataFrame的字段不為nullable時,GSC自動建表的字段將是 NOT NULL。
  4. 將會對應的Spark DataFrame字段類型映射為Greenplum的字段類型。參考,字段類型映射表。

7.3. 提前手動建表:

  1. 將Spark DataFrame的字段名的數據寫至Greenplum表的對應的字段中。值得注意的是,GSC在做映射的時候,是嚴格區分大小寫的。
  2. 寫至Greenplum的字段的數據類型,與對應的Spark DataFrame一致,具體參見字段類型映射。
  3. 如果Spark數據中某列包含空數據,需確保對應的Greenplum表的列沒有被指定為NOT NULL。
  4. Greenplum表中建表時其字段順序可以與Spark DataFrame中不一致。但Greenplum表中不能出現不存在在Spark DataFrame中的字段。如下例子:
<code>// Greenplum 中的字段CREATE TABLE public.rank_a1 (    id int4 NOT NULL,    "rank" text NULL,    "year" int4 NOT NULL,    gender int4 NOT NULL,    count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrame中的字段var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// 在寫數據至public.rank_a1表時,將會報錯如下Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender    at scala.Predef$.require(Predef.scala:224)    at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)    at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)    at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)    at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)/<code>
  1. 確保指定的用戶對於該表有讀寫的權限,自動建表,需要有建表的權限。

8. Troubleshooting

8.1. 端口相關問題

原因解決辦法

錯誤信息:java.lang.RuntimeException: is not a valid port number.

解決辦法:通過server.port所指定的端口無效,比如1024以內,為系統使用端口指定端口在[1024-65535]之間


錯誤信息:java.lang.RuntimeException:Unable to start GpfdistService on any of ports=

解決辦法:通過server.port指定的端口已經被佔用從新指定一個未被佔用的端口,或不指定該參數

8.2. Greenplum連接數問題

當連接Greenplum的連接數接近Greenplum數據庫配置的最大連接數(max_connections)時。Spark application將會拋出 connection limit exceeded 錯誤。

排查過程:

  1. 查詢Greenplum數據的最大連接數:
<code>postgres=# show max_connections; max_connections----------------- 250(1 row)/<code>
  1. 查詢當前連接Greenplum數據庫的連接數:
<code>postgres=# SELECT count(*) FROM pg_stat_activity;/<code>
  1. 查詢指定的用戶連接Greenplum數據的連接數:
<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';/<code>
  1. 查詢Greenplum數據庫空閒和活動的連接數:
<code>postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<idle>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<idle>';/<idle>/<idle>/<code>
  1. 查詢連接Greenplum數據庫名,用戶名,客戶端地址,客戶端ip,當前查詢語句:
<code>postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;/<code>

如果確認是Spark application使用連接數過多,則配置JDBC Connection Pooling相關參數,減少連接數。

8.3. Greenplum Database Data Length Errors

在使用Greenplum 4.x或5.x的時候,可能會報出“data line too long”錯誤。這是因為在Greenplum數據庫中參數項“gpmaxcsvlinelength”默認值是1M。需要登陸Greenplum master修改這個參數值,示例如下,通過gpconfig修改該參數的值為5M:

<code>gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u/<code>

9. 參考

  1. Greenplum-Spark Connector官方文檔
  2. Greenplum建表語句文檔
  3. Greenplum參數配置官方文檔


分享到:


相關文章: