親手搭建個spark 環境試試


1. spark 介紹

由加州大學伯克利分校AMP實驗室 開發的大數據計算引擎,以RDD (彈性分佈式數據集)為最小數據單,基於內存的列式數據結構DataFrame,計算速度較快。spark 專注於大數據一棧式服務生態,包含sparkCore/sparkSql/sparkStreaming/sparkMllib 等。

2. 測試環境架構

Linux 7.0 + zk3.4 集群 + kafka 2.11 集群 + scala 2.12 + jdk 1.8 + spark 2.1.0 集群

3. 環境搭建步驟

特別說明:

· scala2.12 版本需要jdk 1.8 環境支持:

· spark2.0 及以上版本默認配置scala2.1l

· Springboot 1.5 基於的SpringFrameWork4.3 不支持jdk8 ,因此只能用kafka 0.11 版本,即Spring-kafka 最高為1.37版

· SpringBoot 2.0 基於的SpringFrameWorld5.0 支持jdk8, 因此可以使用kafka 2.0 及以上版本

首先修改host:

1) 安裝Scala(與jdk 安裝方式一樣),以192.168.52.40 服務器為例。

a. 上傳安裝包到服務器路徑下解壓,例如 /opt

b.設置環境變量 SCALA_HOME, vim /etc/profile

使配置生效 source /etc/profile

測試安裝結果:

分別在192.168.52.35 和192.168.52.41 服務器按照相同的方法安裝scala

2) 安裝spark (前提是需要先安裝hadoop、jdk、scala)

(Hadoop 集群之前已經發出過安裝說明)

根據架構配置,40 服務器為master,35 和41 服務器都當做worker計算節點使用,以40服務器安裝為例。

A.傳壓縮包到 服務器路徑下並且解壓:

B. 修改環境變量

source /etc/profile

C. 修改spark配置環境

cd /opt/spark/conf

//複製一份配置模板

cp spark-env.sh.template spark-env.sh

//做如下修改,vim spark-env.sh

SCALA_HOME

JAVA_HOME

HADOOP_CONF_DIR//hadoop 配置文件路徑

SPARK_MASTER_IP//master 地址

SPARK_WORKER_MEMORY//每個work節點所能夠分配到的內存

D.配置slave(worker)

cp slaves.template slaves

vim slaves //此處嚴格地講是不應該將master 也當做worker的,40 服務器已經裝太多東東了

E.在35 和41 服務器以相同的方式安裝spark

(或者 將scp /opt/spark 到35 和41 服務器,但是確保scala、jdk和hadoop的安裝路徑相同,否則需要單獨修改)

F. 啟動spark 集群

cd /opt/spark/sbin

./start-all.sh

寫個測試用例試試:

Spark 集群環境搭建完成。

3) 安裝ZK集群,以40服務器安裝為例

上傳安裝包並且解壓

修改配置文件

cd ./conf

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

在dataDir 文件路徑下創建myid文件

touch myid

vim myid

為了在集群環境下,各機器之間的識別標識,唯一

將40服務器的ZK 分發到其他服務器,分別修改myid 文件為2/3,然後分別啟動ZKServer

./zkServer.sh

// 查看節點狀態

./zkServer.sh status

可以看出41 的ZK是leader,另兩臺是follower

4) 安裝kafka集群

上傳安裝包並且進行解壓

修改配置文件

cd ./config

vim server.properties

配置完成後,分發到35 和41 服務器,並且分別修改broker.id為1 和2 ,並且將listeners 改為對應服務器地址即可。

以守護進程方式 啟動三臺服務器的kafka

./kafka-server-start.sh -daemon ../config/server.properties

測試:

測試成功,kafka 集群安裝完成。

至此環境已經搭建完成,接下來從開發層面demo展示

4. Spark 項目Demo開發

本地開發調試也需要安裝scala、jdk環境,方式同上

應用場景

A.讀取靜態數據

通過SparkSql 從靜態數據源(RDDs、csv、json、hive、jdbc 數據源等)讀取數據進行處理。

創建測試文件sparkSql.txt(json格式)

{"chuangwei":"H44ddddddddddddddddd01","pv":13}

{"chuangwei":"H4402","pv":11}

{"chuangwei":"H4401","pv":12}

{"chuangwei":"H4403","pv":10}

{"chuangwei":"H4405","pv":13}

{"chuangwei":"H4401","pv":140}

SparkSQL 分為SparkContext 和 HiveContext(繼承自SparkContext),可以通過SparkSession創建

B.流式數據處理

從kafka、flume等數據源讀取數據進行實時分析

· Kafka與SpringBoot整合

由於Springboot不同的版本支持不同的jdk,因此需要不同版本的kafka 支持

Springboot 1.5 最高只能使用1.37 版本的Spring-kafka

Springboot2.0 可以使用2.0 以上版本的Spring-kafka

在 SpringbootApplication.yml 主配置文件關於kafak 配置如下

引入依賴包

模擬生產者發出消息(Spring 容器啟動後,注入Bean成功後將每個1 s 發出消息)

模擬消費者接受消息

· 通過SparkStreaming 實時讀取流數據

由於 idea 編輯器對scala 函數式——面向對象編程有更加友好的支持,因此測試過程中spark 項目都是idea 編輯

關於idea 的使用不做贅述,針對spark 項目開發只需要安裝插件scala 即可

添加依賴:

 2.3.2 2.11  org.apache.spark spark-core_2.11 2.3.2   org.apache.spark spark-streaming_${scala.version} ${spark.version}   org.apache.spark spark-sql_${scala.version} ${spark.version}   org.apache.spark spark-hive_${scala.version} ${spark.version}   org.apache.spark spark-mllib_${scala.version} ${spark.version}   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0 

測試案例

package com.fengmang.statimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.joda.time.DateTimeobject StatKafkaStreaming { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("statKafkaStreaming").setMaster("local[2]"); val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); val sc = sparkSession.sparkContext; sc.setLogLevel("WARN") //設置日誌級別 val kafkaParams = Map( "bootstrap.servers" -> "192.168.52.40:9092,192.168.52.35:9092,192.168.52.41:9092", //brokers "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark_group" //kafka 消費者group.id ) val streamingContext = new StreamingContext(sc,Seconds(10)); 
	//實時流數據對象,每10s 從kafka 讀取一次數據 val topic = Array("stat") //主題 val inputDStream = KafkaUtils.createDirectStream[String, String]( 
	//讀取數據流 streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) val date = new DateTime()// inputDStream.foreachRDD(_.foreach(println(_))) //輸出數據 inputDStream.foreachRDD(_.foreach(reccord => println( date + ":" + reccord.value()))) //輸出數據,與上面函數表達式功能相同 streamingContext.start() //開啟實時數據流任務 streamingContext.awaitTermination() }}

同時開啟上面springboot 項目的消息生產者和StatKafkaStreaming 流數據讀取任務

5. 問題點

1) 異常問題一:調試Spark遇到的問題

spark 程序項目打包執行時,出現scala 版本與spark 不匹配所致

Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcII$sp

查看當前運行版本spark 所支持的scala 版本,在spark 安裝目錄下查看jar 包中的scala 包版本,然後替換成相對應的版本

2) 異常問題二

component.AbstractLifeCycle: FAILED ServerConnector@4a9cc6cb{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use

java.net.BindException: Address already in use

每啟動一個SparkContext 時sparkUI 就會默認使用4040 端口,當其被佔用(即已經開啟了SaprkCotext),新開啟的SparkContext 會嘗試連接4041端口

3) SparkContext 衝突的問題

一個JVM 中默認只運行一個SparkContext,SparkContext 可以通過new StreamingContext(sc,Seconds(10)) 獲取

但是通過new StreamingContext(sparkConf.Seconds(10)) 就會衝突

該報錯是因為創建了多個sparkContext, 一般是因為在創建StreamingContext的時候使用了SparkContext而非SparkConf,如下:

val sc = new SparkContext(new SparkConf()) val ssc = new StreamingContext(sc,Minutes(5))

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

4) DataFrame 操作時內存溢出

dataFrame.take()

dataFrame.takeAsList(n) //獲取前n 行並且以list 的方式展示

take 和 takeAsList 會將獲取的數據返回到driver端,因此需要特別注意使用這兩個方法時,返回的數據量,避免OOM 內存溢出

5) 引入SparkSession 下的隱式函數失敗

val sparkSession= SparkSession.builder.appName("SparkSqlTest").master("local").getOrCreate()

//需要引入當前sparkSession 下的隱式函數,否則 $ 將被當做不可識別符號

import sparkSession.implicits._

dataFrame.select($"weight" + 10)

6) 遠程服務無法訪問producer和consumer,異常報連接超時和

Marking the coordinator dead for group(Kafka)

問題在於服務器沒有配置kafka 服務器需要監聽的端口號,如果服務是在本地運行可不用配置,會使用localhost默認地址。

如是遠程訪問得必須配置 listeners=PLAINTEXT:// 192.168.1.1:9092

7) Kafka 與springboot 版本兼容問題(上文已經提到)


分享到:


相關文章: