1. spark 介紹
由加州大學伯克利分校AMP實驗室 開發的大數據計算引擎,以RDD (彈性分佈式數據集)為最小數據單,基於內存的列式數據結構DataFrame,計算速度較快。spark 專注於大數據一棧式服務生態,包含sparkCore/sparkSql/sparkStreaming/sparkMllib 等。
2. 測試環境架構
Linux 7.0 + zk3.4 集群 + kafka 2.11 集群 + scala 2.12 + jdk 1.8 + spark 2.1.0 集群
3. 環境搭建步驟
特別說明:
· scala2.12 版本需要jdk 1.8 環境支持:
· spark2.0 及以上版本默認配置scala2.1l
· Springboot 1.5 基於的SpringFrameWork4.3 不支持jdk8 ,因此只能用kafka 0.11 版本,即Spring-kafka 最高為1.37版
· SpringBoot 2.0 基於的SpringFrameWorld5.0 支持jdk8, 因此可以使用kafka 2.0 及以上版本
首先修改host:
1) 安裝Scala(與jdk 安裝方式一樣),以192.168.52.40 服務器為例。
a. 上傳安裝包到服務器路徑下解壓,例如 /opt
b.設置環境變量 SCALA_HOME, vim /etc/profile
使配置生效 source /etc/profile
測試安裝結果:
分別在192.168.52.35 和192.168.52.41 服務器按照相同的方法安裝scala
2) 安裝spark (前提是需要先安裝hadoop、jdk、scala)
(Hadoop 集群之前已經發出過安裝說明)
根據架構配置,40 服務器為master,35 和41 服務器都當做worker計算節點使用,以40服務器安裝為例。
A.傳壓縮包到 服務器路徑下並且解壓:
B. 修改環境變量
source /etc/profile
C. 修改spark配置環境
cd /opt/spark/conf
//複製一份配置模板
cp spark-env.sh.template spark-env.sh
//做如下修改,vim spark-env.sh
SCALA_HOME
JAVA_HOME
HADOOP_CONF_DIR//hadoop 配置文件路徑
SPARK_MASTER_IP//master 地址
SPARK_WORKER_MEMORY//每個work節點所能夠分配到的內存
D.配置slave(worker)
cp slaves.template slaves
vim slaves //此處嚴格地講是不應該將master 也當做worker的,40 服務器已經裝太多東東了
E.在35 和41 服務器以相同的方式安裝spark
(或者 將scp /opt/spark 到35 和41 服務器,但是確保scala、jdk和hadoop的安裝路徑相同,否則需要單獨修改)
F. 啟動spark 集群
cd /opt/spark/sbin
./start-all.sh
寫個測試用例試試:
Spark 集群環境搭建完成。
3) 安裝ZK集群,以40服務器安裝為例
上傳安裝包並且解壓
修改配置文件
cd ./conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
在dataDir 文件路徑下創建myid文件
touch myid
vim myid
為了在集群環境下,各機器之間的識別標識,唯一
將40服務器的ZK 分發到其他服務器,分別修改myid 文件為2/3,然後分別啟動ZKServer
./zkServer.sh
// 查看節點狀態
./zkServer.sh status
可以看出41 的ZK是leader,另兩臺是follower
4) 安裝kafka集群
上傳安裝包並且進行解壓
修改配置文件
cd ./config
vim server.properties
配置完成後,分發到35 和41 服務器,並且分別修改broker.id為1 和2 ,並且將listeners 改為對應服務器地址即可。
以守護進程方式 啟動三臺服務器的kafka
./kafka-server-start.sh -daemon ../config/server.properties
測試:
測試成功,kafka 集群安裝完成。
至此環境已經搭建完成,接下來從開發層面demo展示
4. Spark 項目Demo開發
本地開發調試也需要安裝scala、jdk環境,方式同上
應用場景
A.讀取靜態數據
通過SparkSql 從靜態數據源(RDDs、csv、json、hive、jdbc 數據源等)讀取數據進行處理。
創建測試文件sparkSql.txt(json格式)
{"chuangwei":"H44ddddddddddddddddd01","pv":13}
{"chuangwei":"H4402","pv":11}
{"chuangwei":"H4401","pv":12}
{"chuangwei":"H4403","pv":10}
{"chuangwei":"H4405","pv":13}
{"chuangwei":"H4401","pv":140}
SparkSQL 分為SparkContext 和 HiveContext(繼承自SparkContext),可以通過SparkSession創建
B.流式數據處理
從kafka、flume等數據源讀取數據進行實時分析
· Kafka與SpringBoot整合
由於Springboot不同的版本支持不同的jdk,因此需要不同版本的kafka 支持
Springboot 1.5 最高只能使用1.37 版本的Spring-kafka
Springboot2.0 可以使用2.0 以上版本的Spring-kafka
在 SpringbootApplication.yml 主配置文件關於kafak 配置如下
引入依賴包
模擬生產者發出消息(Spring 容器啟動後,注入Bean成功後將每個1 s 發出消息)
模擬消費者接受消息
· 通過SparkStreaming 實時讀取流數據
由於 idea 編輯器對scala 函數式——面向對象編程有更加友好的支持,因此測試過程中spark 項目都是idea 編輯
關於idea 的使用不做贅述,針對spark 項目開發只需要安裝插件scala 即可
添加依賴:
2.3.2 2.11 org.apache.spark spark-core_2.11 2.3.2 org.apache.spark spark-streaming_${scala.version} ${spark.version} org.apache.spark spark-sql_${scala.version} ${spark.version} org.apache.spark spark-hive_${scala.version} ${spark.version} org.apache.spark spark-mllib_${scala.version} ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0
測試案例
package com.fengmang.statimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.joda.time.DateTimeobject StatKafkaStreaming { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("statKafkaStreaming").setMaster("local[2]"); val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); val sc = sparkSession.sparkContext; sc.setLogLevel("WARN") //設置日誌級別 val kafkaParams = Map( "bootstrap.servers" -> "192.168.52.40:9092,192.168.52.35:9092,192.168.52.41:9092", //brokers "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark_group" //kafka 消費者group.id ) val streamingContext = new StreamingContext(sc,Seconds(10)); //實時流數據對象,每10s 從kafka 讀取一次數據 val topic = Array("stat") //主題 val inputDStream = KafkaUtils.createDirectStream[String, String]( //讀取數據流 streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) val date = new DateTime()// inputDStream.foreachRDD(_.foreach(println(_))) //輸出數據 inputDStream.foreachRDD(_.foreach(reccord => println( date + ":" + reccord.value()))) //輸出數據,與上面函數表達式功能相同 streamingContext.start() //開啟實時數據流任務 streamingContext.awaitTermination() }}
同時開啟上面springboot 項目的消息生產者和StatKafkaStreaming 流數據讀取任務
5. 問題點
1) 異常問題一:調試Spark遇到的問題
spark 程序項目打包執行時,出現scala 版本與spark 不匹配所致
Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcII$sp
查看當前運行版本spark 所支持的scala 版本,在spark 安裝目錄下查看jar 包中的scala 包版本,然後替換成相對應的版本
2) 異常問題二
component.AbstractLifeCycle: FAILED ServerConnector@4a9cc6cb{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use
java.net.BindException: Address already in use
每啟動一個SparkContext 時sparkUI 就會默認使用4040 端口,當其被佔用(即已經開啟了SaprkCotext),新開啟的SparkContext 會嘗試連接4041端口
3) SparkContext 衝突的問題
一個JVM 中默認只運行一個SparkContext,SparkContext 可以通過new StreamingContext(sc,Seconds(10)) 獲取
但是通過new StreamingContext(sparkConf.Seconds(10)) 就會衝突
該報錯是因為創建了多個sparkContext, 一般是因為在創建StreamingContext的時候使用了SparkContext而非SparkConf,如下:
val sc = new SparkContext(new SparkConf()) val ssc = new StreamingContext(sc,Minutes(5))
Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
4) DataFrame 操作時內存溢出
dataFrame.take()
dataFrame.takeAsList(n) //獲取前n 行並且以list 的方式展示
take 和 takeAsList 會將獲取的數據返回到driver端,因此需要特別注意使用這兩個方法時,返回的數據量,避免OOM 內存溢出
5) 引入SparkSession 下的隱式函數失敗
val sparkSession= SparkSession.builder.appName("SparkSqlTest").master("local").getOrCreate()
//需要引入當前sparkSession 下的隱式函數,否則 $ 將被當做不可識別符號
import sparkSession.implicits._
dataFrame.select($"weight" + 10)
6) 遠程服務無法訪問producer和consumer,異常報連接超時和
Marking the coordinator dead for group(Kafka)
問題在於服務器沒有配置kafka 服務器需要監聽的端口號,如果服務是在本地運行可不用配置,會使用localhost默認地址。
如是遠程訪問得必須配置 listeners=PLAINTEXT:// 192.168.1.1:9092
7) Kafka 與springboot 版本兼容問題(上文已經提到)
。