Spark 2.0系列-SparkSession詳解

Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能。用戶不但可以使用DataFrame和Dataset的各種API,學習Spark2.0的難度也會大大降低。

在Spark的早期版本,SparkContext是進入Spark的切入點。我們都知道RDD是Spark中重要的API,然而它的創建和操作得使用SparkContext提供的API;對於RDD之外的其他東西,我們需要使用其他的Context。比如流處理我們得使用StreamingContext;對於SQL得使用SQLContext;而對於Hive得使用HiveContext。然而Dataset和DataFrame提供的API逐漸成為新的標準API,我們需要一個切入點來構建它們,所以在Spark 2.0中我們引入一個新的切入點(Entry Point):SparkSession。

Spark 2.0系列-SparkSession詳解


創建SparkSession

SparkSession 是 Spark程序的入口。在2.0版本之前,使用Spark必須先創建SparkConf和SparkContext,不過在Spark2.0中只要創建一個SparkSession就夠了,SparkConf、SparkContext和SQLContext都已經被封裝在SparkSession當中。使用 Dataset 或者 Datafram 編寫 Spark SQL 應用的時候,第一個要創建的對象就是 SparkSession。

Builder 是 SparkSession 的構造器。 通過 Builder, 可以添加各種配置。

Builder 的方法如下:

MethodDescriptiongetOrCreate獲取或者新建一個 sparkSessionenableHiveSupport增加支持 hive SupportappName設置 application 的名字config設置各種配置

SparkSession的設計遵循了工廠設計模式(factory design pattern),下面代碼片段介紹如何創建SparkSession。

/**
* 初始化spark配置容器
*
* @param task
* @return
*/
public static SparkSession buildSparkSession(Task task) {
//配置容器
SparkConf conf = new SparkConf();
//to avoid exception: org.apache.spark.structure.catalyst.
//expressions.GeneratedClass$SpecificOrdering" grows beyond 64 KB
//spark jira:https://issues.apache.org/jira/browse/SPARK-16845
conf.set("spark.structure.codegen.wholeStage", "false");
//web端顯示應用名
conf.setAppName(task.getTaskCode());

//setMaster主要是連接主節點,如果參數是"local",
//則在本地用單線程運行spark,如果是 local[4],
// 則在本地用4核運行,如果設置為spark://master:7077,
//就是作為單節點運行
conf.setMaster(task.getMaster());
SparkSession sparkSession = SparkSession.builder()
.config(conf).getOrCreate();
sparkSession.sparkContext().setLogLevel("WARN");
return sparkSession;
}

"spark.structure.codegen.wholeStage"SparkConf 配置如果數據集是一個寬表超過64KB時會報錯,參見對應jira。

SparkSession和SparkContext

下圖說明了SparkContext在Spark中的主要功能。

Spark 2.0系列-SparkSession詳解

從圖中可以看到SparkContext起到的是一箇中介的作用,通過它來使用Spark其他的功能。每一個JVM都有一個對應的SparkContext,driver program通過SparkContext連接到集群管理器來實現對集群中任務的控制。Spark配置參數的設置以及對SQLContext、HiveContext和StreamingContext的控制也要通過SparkContext進行。

不過在Spark2.0中上述的一切功能都是通過SparkSession來完成的,同時SparkSession也簡化了DataFrame/Dataset API的使用和對數據的操作。

SparkSession的API

SparkSession是一個比較重要的類,它的功能的實現,肯定包含比較多的函數,這裡介紹下它包含哪些函數。

builder函數

public static SparkSession.Builder builder()

創建 SparkSession.Builder,初始化SparkSession.

setActiveSession函數

public static void setActiveSession(SparkSession session)

當SparkSession.GetOrCreate()被調用,SparkSession發生變化,將會返回一個線程和它的子線程。這將會確定給定的線程接受帶有隔離會話的SparkSession,而不是全局的context。

clearActiveSession函數

public static void clearActiveSession()

清除當前線程的Active SparkSession。然後調用GetOrCreate將會返回第一次創建的context代替本地線程重寫

setDefaultSession函數

public static void setDefaultSession(SparkSession session)

設置默認的SparkSession,返回builder

clearDefaultSession函數

public static void clearDefaultSession()

清除默認的SparkSession返回的builder

getActiveSession函數

public static scala.Option<sparksession> getActiveSession()/<sparksession>

由builder,返回當前線程的Active SparkSession

getDefaultSession函數

public static scala.Option<sparksession> getDefaultSession()/<sparksession>

由builder,返回默認的SparkSession

sparkContext函數

public SparkContext sparkContext()

version函數

public String version()

返回運行應用程序的spark版本

sharedState函數

public org.apache.spark.sql.internal.SharedState sharedState()

通過sessions共享狀態,包括SparkContext, cached 數據, listener, 和catalog.

這是內部spark,接口穩定性沒有保證

sessionState函數

public org.apache.spark.sql.internal.SessionState sessionState()

通過session隔離狀態,包括:SQL 配置, 臨時表, registered 功能, 和 其它可接受的 SQLConf.

這是內部spark,接口穩定性沒有保證

sqlContext函數

public SQLContext sqlContext()

session封裝以 SQLContext的形式,為了向後兼容。

conf函數

public RuntimeConfig conf()

Spark 2.0系列-SparkSession詳解

運行spark 配置接口

通過這個接口用戶可以設置和獲取與spark sql相關的所有Spark 和Hadoop配置.當獲取config值,

listenerManager函數

public ExecutionListenerManager listenerManager()

用於註冊自定義QueryExecutionListeners的接口,用於偵聽執行指標。

experimental函數

public ExperimentalMethods experimental()

collection函數,被認為是experimental,可以用於查詢高級功能的查詢計劃程序。

udf函數

public UDFRegistration udf()

collection 函數,用於用戶自定義函數

streams函數

public StreamingQueryManager streams()

返回StreamingQueryManager ,允許管理所有的StreamingQuerys

newSession函數

public SparkSession newSession()

啟動一個獨立的 SQL 配置, temporary 表, registered 功能新的session,但共享底層的SparkContext 和緩存數據.

emptyDataFrame函數

public Dataset emptyDataFrame()

返回一個空沒有行和列的DataFrame

emptyDataset函數

public Dataset emptyDataset(Encoder evidence$1)

創建一個T類型的空的Dataset

createDataFrame函數

public

從rdd創建DateFrame

public Dataset createDataFrame(RDD rowRDD, StructType schema)

從RDD包含的行給定的schema,創建DataFrame。需要確保每行的RDD結構匹配提供的schema,否則將會運行異常。

public Dataset createDataFrame(JavaRDD rowRDD,StructType schema)

創建DataFrame從包含schema的行的RDD。確保RDD提供的每行結構匹配提供的schema,否則運行異常

public Dataset createDataFrame(java.util.List rows,StructType schema)

創建DataFrame從包含行的schema的java.util.List

public Dataset createDataFrame(RDD> rdd,Class> beanClass)

應用schema到Java Beans的RDD

警告:由於Java Bean中的字段沒有保證的順序,因此SELECT *查詢將以未定義的順序返回列。

public Dataset createDataFrame(JavaRDD> rdd, Class> beanClass)

應用schema到Java Beans的RDD

警告:由於Java Bean中的字段沒有保證的順序,因此SELECT *查詢將以未定義的順序返回列。

public Dataset createDataFrame(java.util.List> data,Class> beanClass)

應用schema到Java Bean list

警告:由於Java Bean中的字段沒有保證的順序,因此SELECT *查詢將以未定義的順序返回列。

baseRelationToDataFrame函數

public Dataset baseRelationToDataFrame(BaseRelation baseRelation)

轉換創建的BaseRelation,為外部數據源到DataFrame

createDataset函數

public Dataset createDataset(scala.collection.Seq data,Encoder evidence$4)

從本地給定類型的數據Seq創建DataSet。這個方法需要encoder (將T類型的JVM對象轉換為內部Spark SQL表示形式)。這通常是通過從sparksession implicits自動創建。或則可以通過調用 Encoders上的靜態方法來顯式創建。

public Dataset createDataset(RDD data,Encoder evidence$5)

創建DataSet從給定類型的RDD。這個方法需要encoder (將T類型的JVM對象轉換為內部Spark SQL表示形式)。通常自動創建通過SparkSession的implicits 或則可以通過調用 Encoders上的靜態方法來顯式創建。

public Dataset createDataset(java.util.List data,Encoder evidence$6)

創建 Dataset,對於T類型的java.util.List。這個方法需要encoder (將T類型的JVM對象轉換為內部Spark SQL表示形式), 或則可以通過調用 Encoders上的靜態方法來顯式創建。

range函數

public Dataset<long> range(long end)/<long>使用名為id的單個LongType列創建一個Dataset,包含元素的範圍從0到結束(不包括),步長值為1。

public Dataset<long> range(long start,long end)/<long>

使用名為id的單個LongType列創建一個Dataset,包含元素的範圍從start到結束(不包括),步長值為1。

public Dataset<long> range(long start, long end, long step)/<long>

使用名為id的單個LongType列創建一個Dataset,包含元素的範圍從start到結束(不包括),步長值為step。

public Dataset<long> range(long start,long end,long step,int numPartitions)/<long>

使用名為id的單個LongType列創建一個Dataset,包含元素的範圍從start到結束(不包括),步長值為step,指定partition 的數目

catalog函數

public Catalog catalog()

用戶可以通過它 create, drop, alter 或則query 底層數據庫, 表, 函數等.

table函數

public Dataset table(String tableName)返回指定的table/view作為DataFrame

tableName是可以合格或則不合格的名稱。如果在數據庫中指定,它在數據庫中會識別。否則它會嘗試找到一個臨時view ,匹配到當前數據庫的table/view,全局的臨時的數據庫view也是有效的。

sql函數

public Dataset sql(String sqlText)

使用spark執行sql查詢,作為DataFrame返回結果。用來sql parsing,可以用spark.sql.dialect來配置

read函數

public DataFrameReader read()

返回一個DataFrameReader,可以用來讀取非流數據作為一個DataFrame

readStream函數

public DataStreamReader readStream()

返回一個DataFrameReader,可以用來讀取流數據作為一個DataFrame

time函數

public T time(scala.Function0 f)

執行一些代碼塊並打印輸出執行該塊所花費的時間。 這僅在Scala中可用,主要用於交互式測試和調試。

implicits函數

public SparkSession.implicits$ implicits()

嵌套Scala對象訪問

stop函數

public void stop()

停止SparkContext

close函數

public void close()

與stop類似

Spark 2.0系列-SparkSession詳解


分享到:


相關文章: