日誌服務(SLS)集成 Spark 流計算實戰

前言

日誌服務作為一站式的日誌的採集與分析平臺,提供了各種用戶場景的日誌採集能力,通過日誌服務提供的各種與·與SDK,採集客戶端(Logtail),Producer,用戶可以非常容易的把各種數據源中的數據採集到日誌服務的Logstore中。同時為了便於用戶對日誌進行處理,提供了各種支持流式消費的SDK,如各種語言的消費組,與 Spark,Flink,Storm 等各種流計算技術無縫對接的Connector,以便於用戶根據自己的業務場景非常便捷的處理海量日誌。

從最早的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流計算框架之一。使用日誌服務的Spark SDK,可以非常方便的在Spark 中消費日誌服務中的數據,同時也支持將 Spark 的計算結果寫入日誌服務。


日誌服務基礎概念

日誌服務的存儲層是一個類似Kafka的Append only的FIFO消息隊列,包含如下基本概念:

  • 日誌(Log):由時間、及一組不定個數的Key-Value對組成。
  • 日誌組(LogGroup):一組日誌的集合,包含相同Meta信息如Topic,Source,Tags等。是讀寫的基本單位。
日誌服務(SLS)集成 Spark 流計算實戰

圖-1 Log與LogGroup的關係

  • Shard:分區,LogGroup讀寫基本單元,對應於Kafka的partition。
  • Logstore:日誌庫,用於存放同一類日誌數據。Logstore會包含1個或多個Shard。
  • Project:Logstore存放容器,包含一個或者多個Logstore。


準備工作

1)添加Maven依賴:

<code><dependency>
<groupid>com.aliyun.emr/<groupid>
<artifactid>emr-logservice_2.11/<artifactid>
<version>1.9.0/<version>
/<dependency>/<code>

Github源碼下載。2)計劃消費的日誌服務project,logstore以及對應的endpoint。3)用於訪問日誌服務Open API的Access Key。


對 Spark Streaming 的支持

Spark Streaming是Spark最早推出的流計算技術,現在已經進入維護狀態,不再會增加新的功能。但是考慮到Spark Streaming 的使用仍然非常廣泛,我們先從Spark Streaming開始介紹。Spark Streaming 提供了一個DStream 的數據模型抽象,本質是把無界數據集拆分成一個一個的RDD,轉化為有界數據集的流式計算。每個批次處理的數據就是這段時間內從日誌服務消費到的數據。

日誌服務(SLS)集成 Spark 流計算實戰

圖-2 DStream

Spark Streaming 從日誌服務消費支持 Receiver 和 Direct 兩種消費方式。


Receiver模式

Receivers的實現內部實現基於日誌服務的消費組(Consumer Library)。數據拉取與處理完全分離。消費組自動均勻分配Logstore內的所有shard到所有的Receiver,並且自動提交checkpoint到SLS。這就意味著Logstore內的shard個數與Spark 實際的併發沒有對應關係。對於所有的Receiver,接收到的數據默認會保存在Spark Executors中,所以Failover的時候有可能造成數據丟失,這個時候就需要開啟WAL日誌,Failover的時候可以從WAL中恢復,防止丟失數據。

SDK將SLS中的每行日誌解析為JSON字符串形式,Receiver使用示例如下所示:

<code>object SLSReceiverSample {
def main(args: Array[String]): Unit = {
val project = "your project"
val logstore = "your logstore"
val consumerGroup = "consumer group"
val endpoint = "your endpoint"
val accessKeyId = "access key id"
val accessKeySecret = "access key secret"
val batchInterval = Milliseconds(5 * 1000)

val conf = new SparkConf().setAppName("Test SLS Loghub")
val ssc = new StreamingContext(conf, batchInterval)
val stream = LoghubUtils.createStream(
ssc,
project,
logstore,
consumerGroup,

endpoint,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK,
LogHubCursorPosition.END_CURSOR)

stream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
rdd.map(bytes => new String(bytes)).top(10).foreach(println)
)
ssc.checkpoint("hdfs:///tmp/spark/streaming")
ssc.start()
ssc.awaitTermination()
}
}/<code>

除Project,Logstore,Access Key 這些基礎配置外,還可以指定StorageLevel,消費開始位置等。


Direct模式

Direct模式不再需要Receiver,也不依賴於消費組,而是使用日誌服務的低級API,在每個批次內直接從服務端拉取數據處理。對於Logstore中的每個Shard來說,每個批次都會讀取指定位置範圍內的數據。為了保證一致性,只有在每個批次確認正常結束之後才能把每個Shard的消費結束位置(checkpoint)保存到服務端。

為了實現Direct模式,SDK依賴一個本地的ZooKeeper,每個shard的checkpoint會臨時保存到本地的ZooKeeper,等用戶手動提交checkpoint時,再從ZooKeeper中同步到服務端。Failover時也是先從本地ZooKeeper中嘗試讀上一次的checkpoint,如果沒有讀到再從服務端獲取。

<code>object SLSDirectSample {
def main(args: Array[String]): Unit = {

val project = "your project"
val logstore = "your logstore"
val consumerGroup = "consumerGroup"
val endpoint = "endpoint"
val accessKeyId = "access key id"
val accessKeySecret = "access key secret"
val batchInterval = Milliseconds(5 * 1000)
val zkAddress = "localhost:2181"
val conf = new SparkConf().setAppName("Test Direct SLS Loghub")
val ssc = new StreamingContext(conf, batchInterval)
val zkParas = Map("zookeeper.connect" -> zkAddress)
val loghubStream = LoghubUtils.createDirectStream(
ssc,
project,
logstore,
consumerGroup,
accessKeyId,
accessKeySecret,
endpoint,
zkParas,
LogHubCursorPosition.END_CURSOR)

loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
println(s"count by key: ${rdd.map(s => {
s.sorted
(s.length, s)
}).countByKey().size}")
// 手動更新checkpoint
loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
})
ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
ssc.start()
ssc.awaitTermination()
}
}/<code>

Direct模式示例


如何限速

在Receiver中,如果需要限制消費速度,我們只需要調整 Consumer Library 本身的參數即可。而Direct方式是在每個批次開始時從SLS拉取數據,這就涉及到一個問題:一個批次內拉取多少數據才合適。如果太多,一個批次內處理不完,造成處理延時。如果太少會導worker空閒,工作不飽和,消費延時。這個時候我們就需要合理配置拉取的速度和行數,實現一個批次儘可能多處理又能及時完成的目標。理想狀態下Spark 消費的整體速率應該與SLS採集速率一致,才能實現真正的實時處理。

由於SLS的數據模型是以LogGroup作為讀寫的基本單位,而一個LogGroup中可能包含上萬行日誌,這就意味著Spark中直接限制每個批次的行數難以實現。因此,Direct限流涉及到兩個配置參數:

參數說明默認值spark.streaming.loghub.maxRatePerShard每個批次每個Shard讀取行數,決定了限流的下限10000spark.loghub.batchGet.step每次請求讀取LogGroup個數,決定了限流的粒度100

可以通過適當縮小spark.loghub.batchGet.step來控制限流的精度,但是即便如此,在某些情況下還是會存在較大誤差,如一個LogGroup中存在10000行日誌,spark.streaming.loghub.maxRatePerShard設置為100,spark.loghub.batchGet.step設置為1,那一個批次內該shard還是會拉取10000行日誌。


兩種模式的對比

和Receiver相比,Direct有如下的優勢:

  1. 降低資源消耗,不需要佔用Executor資源來作為Receiver的角色。
  2. 魯棒性更好,在計算的時候才會從服務端真正消費數據,降低內存使用,不再需要WAL,Failover 直接在讀一次就行了,更容易實現exactly once語義。
  3. 簡化並行。Spark partition 與 Logstore 的 shard 個數對應,增加shard個數就能提高Spark任務處理併發上限。

但是也存在一些缺點:

  1. 在SLS場景下,需要依賴本地的 ZooKeeper 來保存臨時 checkpoint,當調用 commitAsync 時從 ZooKeeper同步到日誌服務服務端。所以當需要重置 checkpoint 時,也需要先刪除本地 ZooKeeper 中的 checkpoint 才能生效。
  2. 上一個批次保存 checkpoint 之前,下一個批次無法真正開始,否則 ZooKeeper 中的 checkpoint 可能會被更新成一箇中間狀態。目前SDK在每個批次會檢查是否上一個批次的 checkpoint 還沒有提交,如果沒有提交則生成一個空批次,而不是繼續從服務端消費。
  3. 在 SLS 場景下,限流方式不夠精確。


Spark Streaming結果寫入SLS

與消費SLS相反,Spark Streaming的處理結果也可以直接寫入SLS。使用示例:

<code>...
val lines = loghubStream.map(x => x)

// 轉換函數把結果中每條記錄轉為一行日誌
def transformFunc(x: String): LogItem = {
val r = new LogItem()
r.PushBack("key", x)
r
}

val callback = new Callback with Serializable {
override def onCompletion(result: Result): Unit = {
println(s"Send result ${result.isSuccessful}")
}
}
// SLS producer config
val producerConfig = Map(
"sls.project" -> loghubProject,
"sls.logstore" -> targetLogstore,
"access.key.id" -> accessKeyId,
"access.key.secret" -> accessKeySecret,
"sls.endpoint" -> endpoint,
"sls.ioThreadCount" -> "2"
)
lines.writeToLoghub(
producerConfig,
"topic",
"streaming",
transformFunc, Option.apply(callback))

ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
ssc.start()
ssc.awaitTermination()/<code>


對Structured Streaming的支持

Structured Streaming 並不是最近才出現的技術,而是早在16年就已經出現,但是直到 Spark 2.2.0 才正式推出。其數據模型是基於無界表的概念,流數據相當於往一個表上不斷追加行。

日誌服務(SLS)集成 Spark 流計算實戰

圖-3 無界表模型

與Spark Streaming相比,Structured Streaming主要有如下特點:

  1. 底層實現基於Spark SQL引擎,可以使用大多數Spark SQL的函數。和Spark SQL共用大部分API,如果對Spark SQL熟悉的用戶,非常容易上手。複用Spark SQL的執行引用,性能更佳。
  2. 支持 Process time 和 Event time,而Spark Streaming只支持 Process Time。
  3. 批流同一的API。Structured Streaming 複用Spark SQL的 DataSet/DataFrame模型,和 RDD/DStream相比更High level,易用性更好。
  4. 實時性更好,默認基於micro-batch模式。在 Spark 2.3 中,還增加了連續處理模型,號稱可以做到毫秒級延遲。
  5. API 對用戶更友好,只保留了SparkSession一個入口,不需要創建各種Context對象,使用起來更簡單。


SDK使用示例

<code>import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object StructuredStreamingDemo {
def main(args: Array[String]) {

val spark = SparkSession
.builder
.appName("StructuredLoghubWordCount")

.master("local")
.getOrCreate()

import spark.implicits._
val schema = new StructType(
Array(StructField("content", StringType)))
val lines = spark
.readStream
.format("loghub")
.schema(schema)
.option("sls.project", "your project")
.option("sls.store", "your logstore")
.option("access.key.id", "your access key id")
.option("access.key.secret", "your access key secret")
.option("endpoint", "your endpoint")
.option("startingoffsets", "latest")
.load()
.select("content")
.as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
.outputMode("complete")
.format("loghub")
.option("sls.project", "sink project")
.option("sls.store", "sink logstore")
.option("access.key.id", "your access key id")
.option("access.key.secret", "your access key secret")
.option("endpoint", "your endpoint")
.option("checkpointLocation", "your checkpoint dir")
.start()

query.awaitTermination()
}
}/<code>

代碼解釋:1)schema 聲明瞭我們需要的字段,除了日誌中的字段外,還有如下的內部字段:

<code>__logProject__
__logStore__
__shard__
__time__
__topic__
__source__
__sequence_number__ // 每行日誌唯一id/<code>

如果沒有指定schema,SDK默認提供一個__value__字段,其內容為由所有字段組成的一個JSON字符串。

2)lines 定義了一個流。startingoffsets:開始位置,支持:

  • latest :日誌服務最新寫入位置。強烈建議從latest開始,從其他位置開始意味著需要先處理歷史數據,可能需要等待較長時間才能結束。
  • earliest:日誌服務中最早的日誌對應的位置。
  • 或者為每個shard指定一個開始時間,以JSON形式指定。

maxOffsetsPerTrigger:批次讀取行數,SDK中默認是64*1024 。

3)結果寫入到日誌服務format 指定為Loghub即可。


不足之處

  1. 不支持手動提交checkpoint,SDK內部自動保存checkpoint到checkpointLocation中。
  2. 不再需要提供consumerGroup名稱,也就是說checkpoint沒有保存到SLS服務端,無法在日誌服務裡面監控消費延遲,只能通過Spark 任務日誌觀察消費進度。

查看更多:https://yqh.aliyun.com/detail/6698?utm_content=g_1000107572

上雲就看雲棲號:更多雲資訊,上雲案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/


分享到:


相關文章: