01.07 flume部署安裝以及案例運行

基本認識:

大數據階段數據的收集來源, flume的收集數據一般是日誌,比如:網站日誌

flume是一個分佈式的,可靠的,可用的

flume可以做離線也可以做實時分析

collecting --》source --》數據採集來源

aggregating --》channel --》數據臨時緩存(只要數據被move了,那就不在存儲了)

moving --》sink --》數據的轉移

1、agent :source、channel、sink

(1)source:用於採集數據,將產生的數據流傳輸到Channel

(2)channel:連接 sources 和 sinks ,臨時緩存數據

(3)sink:從Channel收集數據,將數據寫到目標源

2、Events:

(1)是Flume數據傳輸的基本單元

(2)由header和載有數據的一個byte array構成,byte array字節數組:存儲真實的數據

(3)每一個事件的大小:deserializer.maxLineLength2048字節,編碼格式:UTF-8

一個source,可以綁定多個channel

一個sink,只能綁定一個channel


flume安裝:

準備安裝包

apache-flume-1.7.0-bin.tar.gz

flume部署安裝以及案例運行

解壓縮

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

flume部署安裝以及案例運行

配置文件:flume-env.sh

flume部署安裝以及案例運行

mv flume-env.sh.template flume-env.sh

flume部署安裝以及案例運行

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

flume部署安裝以及案例運行

測試是否成功

bin/flume-ng version

flume部署安裝以及案例運行

flume的flume-ng命令

Usage: bin/flume-ng [options]...

flume部署安裝以及案例運行

例如一些提交任務的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情況選擇:

1、flume安裝在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安裝在hadoop集群中,而且還配置了HA:

(1)HDFS訪問入口變化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

3、flume不在hadoop集群裡:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

(3)將hadoop的一些jar包添加到flume的lib目錄下(用的是什麼版本拷貝什麼版本)


運行官網案例:

編輯配置文件flume-test.properties(創建一個)

flume部署安裝以及案例運行

準備配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安裝以及案例運行

查看系統有沒有安裝telnet:rpm -qa | grep telnet

flume部署安裝以及案例運行

沒有的就安裝: yum -y install telnet或yum -y install nc

flume部署安裝以及案例運行

文件log4j.properties顯示的是日誌信息配置

flume部署安裝以及案例運行

運行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

flume部署安裝以及案例運行

開啟一個窗口:telnet連接端口號

telnet masterhbase 44444 (卡在那是正常的,你可以隨意輸入信息)

flume部署安裝以及案例運行

輸入hello world

flume部署安裝以及案例運行

在flume中就可以看到數據

flume部署安裝以及案例運行

退出telnet:輸入ctrl + ] 然後輸入quit

flume部署安裝以及案例運行

運行實例一

需求:監控apache服務器的日誌,利用flume監控某一個文件

安裝httpd服務

yum -y install httpd

flume部署安裝以及案例運行

安裝完成之後,會有個目錄生成 /var/www/html

到/var/www/html這個目錄下 vim index.html [隨意輸入內容]

flume部署安裝以及案例運行

啟動服務: service httpd start

flume部署安裝以及案例運行

瀏覽網頁:輸入主機名[hostname]

flume部署安裝以及案例運行

日誌產生的路徑:/var/log/httpd/access_log

flume部署安裝以及案例運行

配置flume agent

source: exec

channel:memory

sink:hdfs

我們複製配置文件

flume部署安裝以及案例運行

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安裝以及案例運行

一些配置說明


問題:hdfs上的文件一般數據文件大小要大,而且文件數量是要少


hdfs.rollInterval = 600 (這個地方最好還是設置一個時間)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (這個不設置的話,上面的參數有可能不會生效)


在hdfs文件上設置時間格式分層 年月日/時 每小時生成一個文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

將準備好的jar上傳到flume/lib中

flume部署安裝以及案例運行

flume部署安裝以及案例運行

運行

flume部署安裝以及案例運行

查看hdfs上,不斷刷新會有新的文件

flume部署安裝以及案例運行

查看下進程

flume部署安裝以及案例運行

運行實例二

利用flume監控某一個文件目錄,將目錄下滾動好的文件實時抽取到HDFS上

類型選擇

source:spooldir

channel:file

sink:hdfs

創建配置文件flume-spooldir.properties

flume部署安裝以及案例運行

編寫信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安裝以及案例運行

配置信息概念補充

1.source:spooldir(已經生成好的最終的數據文件)

(1)recursiveDirectorySearch 是否監視子目錄以查找要讀取的新文件

(2)includePattern 正則表達式,指定要包含的文件 (只.csv數據文件,是正則匹配)

(3)ignorePattern 正則表達式,指定要忽略的文件 (不抽取.csv數據文件,是正則匹配)

(4)缺點:不能對目錄文件進行修改,如果有追加內容的文本文件,是不允許的(有可能不會被抽取,有可能會有錯誤)


2.flume監控目錄,支持文件修改,並記錄文件狀態

(1)source:taildir (類似exec + spooldir的組合)

(2)filegroups :設置source組 可設置多個 filegroups = f1

(3)filegroups.:設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件

(4)positionFile:設置定位文件的位置,以JSON格式寫入給定位置文件上每個文件的最後讀取位置

3.Memory Channel是一個不穩定的channel,它在內存中存儲所有事件,

如果進程異常停止,內存中的數據將不能讓恢復,而且受內存大小的限制。

4.flie channel:是一個持久化的channel,數據安全並且只要磁盤空間足夠,它就可以將數據存儲到磁盤上

5.checkpointDir:檢查數據完整性,存放檢查點目錄,可以檢測出哪些數據已被抽取,哪些還沒有

6.dataDirs:存放數據的目錄,dataDirs可以是多個目錄,以逗號隔開,用獨立的多個磁盤上的多個目錄可以提高file channel的性能。

7.hdfs上數據默認是二進制的文件類型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改為DataStream(數據流)hdfs.writeFormat = Text 改為文本格式

9.當使用DataStream時候,文件不會被壓縮,不需要設置hdfs.codeC;當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值;hdfs.codeC壓縮編碼解碼器 --》snappy壓縮

10.batchSize默認值:100 每個批次刷新到HDFS上的events數量;


創建目錄

mkdir –p /data/logs

flume部署安裝以及案例運行

模擬數據

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

flume部署安裝以及案例運行

查看數據

flume部署安裝以及案例運行

運行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

flume部署安裝以及案例運行

查看下HDFS

flume部署安裝以及案例運行

運行實例三

將hive的一些jar拷貝過來 flume的lib目錄下

flume部署安裝以及案例運行

flume部署安裝以及案例運行

配置flume agent

source:netcat

channel:Memory

sink:hive

啟動hive的元數據服務:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

flume部署安裝以及案例運行

創建庫和表 (表必須是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

flume部署安裝以及案例運行

準備配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安裝以及案例運行

配置概念補充

1.serializer: 負責解析事件中的字段並將它們映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的對象名稱直接映射到Hive表中具有相同名稱的列, 內部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:傳入數據中的字段分隔符,用雙引號括起來,例如"\\t"

serializer.fieldnames:從輸入字段到hive表中的列的映射,指定為hive表列名稱的逗號分隔列表

serializer.serdeSeparator :輸出字段分隔符,單引號括起來,例如'\\t'

hive參數設置vim hive-site.xml:


<code><property>/<code>
<code> <name>hive.metastore.uris/<name>/<code>
<code> <value>thrift://masterhbase:9083/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.txn.manager/<name>/<code>
<code> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.initiator.on/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.worker.threads/<name>/<code>
<code> <value>1/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.support.concurrency/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.enforce.bucketing/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name> hive.exec.dynamic.partition.mode/<name>/<code>
<code> <value>nonstrict/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.in.test/<name>/<code>
<code> <value>true/<value>/<code>
<code>


flume部署安裝以及案例運行

解決報錯問題

(1)報錯:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打開一部分事務支持

-》協同配置

hive.compactor.initiator.on=true; -》運行啟動程序和清除線程,用於打開所需參數的完整列表事務

hive.compactor.worker.threads=1; -》增加工作線程的數量將減少花費的時間

hive.support.concurrency=true; -》是否支持併發,默認是false

hive.enforce.bucketing=true; -》是否啟用bucketing,寫入table數據時會啟動分桶

hive.exec.dynamic.partition.mode=nonstrict; -》設置非嚴格模式

(2)啟動metastore時報錯:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下屬性:這個是用來創建COMPACTION_QUEUE這張表的


hive.in.test

true


(3)再啟動metastore時報錯:

Error rolling back: Can't call rollback when autocommit=true

去掉以下屬性:


hive.in.test

true

之前沒有安裝,先安裝

flume部署安裝以及案例運行

啟動flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

flume部署安裝以及案例運行

使用nc去連接,然後輸入數據,數據以製表符分割

flume部署安裝以及案例運行

Hive中可以看到數據

flume部署安裝以及案例運行

運行實例四(hive)

創建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

flume部署安裝以及案例運行

準備配置信息flume-sink-hive2.properties

flume部署安裝以及案例運行

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安裝以及案例運行

運行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

flume部署安裝以及案例運行

查看數據

flume部署安裝以及案例運行



分享到:


相關文章: