01.07 四個flume運行實例

基本認識:

大數據階段數據的收集來源, flume的收集數據一般是日誌,比如:網站日誌

flume是一個分佈式的,可靠的,可用的

flume可以做離線也可以做實時分析

collecting --》source --》數據採集來源

aggregating --》channel --》數據臨時緩存(只要數據被move了,那就不在存儲了)

moving --》sink --》數據的轉移

1、agent :source、channel、sink

(1)source:用於採集數據,將產生的數據流傳輸到Channel

(2)channel:連接 sources 和 sinks ,臨時緩存數據

(3)sink:從Channel收集數據,將數據寫到目標源

2、Events:

(1)是Flume數據傳輸的基本單元

(2)由header和載有數據的一個byte array構成,byte array字節數組:存儲真實的數據

(3)每一個事件的大小:deserializer.maxLineLength2048字節,編碼格式:UTF-8

一個source,可以綁定多個channel

一個sink,只能綁定一個channel


flume安裝:

準備安裝包

apache-flume-1.7.0-bin.tar.gz

四個flume運行實例

解壓縮

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

四個flume運行實例

配置文件:flume-env.sh

四個flume運行實例

mv flume-env.sh.template flume-env.sh

四個flume運行實例

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

四個flume運行實例

測試是否成功

bin/flume-ng version

四個flume運行實例

flume的flume-ng命令

Usage: bin/flume-ng [options]...

四個flume運行實例

例如一些提交任務的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情況選擇:

1、flume安裝在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安裝在hadoop集群中,而且還配置了HA:

(1)HDFS訪問入口變化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

3、flume不在hadoop集群裡:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

(3)將hadoop的一些jar包添加到flume的lib目錄下(用的是什麼版本拷貝什麼版本)


運行官網案例:

編輯配置文件flume-test.properties(創建一個)

四個flume運行實例

準備配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

四個flume運行實例

查看系統有沒有安裝telnet:rpm -qa | grep telnet

四個flume運行實例

沒有的就安裝: yum -y install telnet或yum -y install nc

四個flume運行實例

文件log4j.properties顯示的是日誌信息配置

四個flume運行實例

運行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

四個flume運行實例

開啟一個窗口:telnet連接端口號

telnet masterhbase 44444 (卡在那是正常的,你可以隨意輸入信息)

四個flume運行實例

輸入hello world

四個flume運行實例

在flume中就可以看到數據

四個flume運行實例

退出telnet:輸入ctrl + ] 然後輸入quit

四個flume運行實例

運行實例一

需求:監控apache服務器的日誌,利用flume監控某一個文件

安裝httpd服務

yum -y install httpd

四個flume運行實例

安裝完成之後,會有個目錄生成 /var/www/html

到/var/www/html這個目錄下 vim index.html [隨意輸入內容]

四個flume運行實例

啟動服務: service httpd start

四個flume運行實例

瀏覽網頁:輸入主機名[hostname]

四個flume運行實例

日誌產生的路徑:/var/log/httpd/access_log

四個flume運行實例

配置flume agent

source: exec

channel:memory

sink:hdfs

我們複製配置文件

四個flume運行實例

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

四個flume運行實例

一些配置說明


問題:hdfs上的文件一般數據文件大小要大,而且文件數量是要少


hdfs.rollInterval = 600 (這個地方最好還是設置一個時間)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (這個不設置的話,上面的參數有可能不會生效)


在hdfs文件上設置時間格式分層 年月日/時 每小時生成一個文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

將準備好的jar上傳到flume/lib中

四個flume運行實例

四個flume運行實例

運行

四個flume運行實例

查看hdfs上,不斷刷新會有新的文件

四個flume運行實例

查看下進程

四個flume運行實例

運行實例二

利用flume監控某一個文件目錄,將目錄下滾動好的文件實時抽取到HDFS上

類型選擇

source:spooldir

channel:file

sink:hdfs

創建配置文件flume-spooldir.properties

四個flume運行實例

編寫信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

四個flume運行實例

配置信息概念補充

1.source:spooldir(已經生成好的最終的數據文件)

(1)recursiveDirectorySearch 是否監視子目錄以查找要讀取的新文件

(2)includePattern 正則表達式,指定要包含的文件 (只.csv數據文件,是正則匹配)

(3)ignorePattern 正則表達式,指定要忽略的文件 (不抽取.csv數據文件,是正則匹配)

(4)缺點:不能對目錄文件進行修改,如果有追加內容的文本文件,是不允許的(有可能不會被抽取,有可能會有錯誤)


2.flume監控目錄,支持文件修改,並記錄文件狀態

(1)source:taildir (類似exec + spooldir的組合)

(2)filegroups :設置source組 可設置多個 filegroups = f1

(3)filegroups.:設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件

(4)positionFile:設置定位文件的位置,以JSON格式寫入給定位置文件上每個文件的最後讀取位置

3.Memory Channel是一個不穩定的channel,它在內存中存儲所有事件,

如果進程異常停止,內存中的數據將不能讓恢復,而且受內存大小的限制。

4.flie channel:是一個持久化的channel,數據安全並且只要磁盤空間足夠,它就可以將數據存儲到磁盤上

5.checkpointDir:檢查數據完整性,存放檢查點目錄,可以檢測出哪些數據已被抽取,哪些還沒有

6.dataDirs:存放數據的目錄,dataDirs可以是多個目錄,以逗號隔開,用獨立的多個磁盤上的多個目錄可以提高file channel的性能。

7.hdfs上數據默認是二進制的文件類型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改為DataStream(數據流)hdfs.writeFormat = Text 改為文本格式

9.當使用DataStream時候,文件不會被壓縮,不需要設置hdfs.codeC;當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值;hdfs.codeC壓縮編碼解碼器 --》snappy壓縮

10.batchSize默認值:100 每個批次刷新到HDFS上的events數量;


創建目錄

mkdir –p /data/logs

四個flume運行實例

模擬數據

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

四個flume運行實例

查看數據

四個flume運行實例

運行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

四個flume運行實例

查看下HDFS

四個flume運行實例

運行實例三

將hive的一些jar拷貝過來 flume的lib目錄下

四個flume運行實例

四個flume運行實例

配置flume agent

source:netcat

channel:Memory

sink:hive

啟動hive的元數據服務:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

四個flume運行實例

創建庫和表 (表必須是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

四個flume運行實例

準備配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

四個flume運行實例

配置概念補充

1.serializer: 負責解析事件中的字段並將它們映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的對象名稱直接映射到Hive表中具有相同名稱的列, 內部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:傳入數據中的字段分隔符,用雙引號括起來,例如"\\t"

serializer.fieldnames:從輸入字段到hive表中的列的映射,指定為hive表列名稱的逗號分隔列表

serializer.serdeSeparator :輸出字段分隔符,單引號括起來,例如'\\t'

hive參數設置vim hive-site.xml:


<code><property>/<code>
<code> <name>hive.metastore.uris/<name>/<code>
<code> <value>thrift://masterhbase:9083/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.txn.manager/<name>/<code>
<code> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.initiator.on/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.worker.threads/<name>/<code>
<code> <value>1/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.support.concurrency/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.enforce.bucketing/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name> hive.exec.dynamic.partition.mode/<name>/<code>
<code> <value>nonstrict/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.in.test/<name>/<code>
<code> <value>true/<value>/<code>
<code>


四個flume運行實例

解決報錯問題

(1)報錯:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打開一部分事務支持

-》協同配置

hive.compactor.initiator.on=true; -》運行啟動程序和清除線程,用於打開所需參數的完整列表事務

hive.compactor.worker.threads=1; -》增加工作線程的數量將減少花費的時間

hive.support.concurrency=true; -》是否支持併發,默認是false

hive.enforce.bucketing=true; -》是否啟用bucketing,寫入table數據時會啟動分桶

hive.exec.dynamic.partition.mode=nonstrict; -》設置非嚴格模式

(2)啟動metastore時報錯:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下屬性:這個是用來創建COMPACTION_QUEUE這張表的


hive.in.test

true


(3)再啟動metastore時報錯:

Error rolling back: Can't call rollback when autocommit=true

去掉以下屬性:


hive.in.test

true

之前沒有安裝,先安裝

四個flume運行實例

啟動flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

四個flume運行實例

使用nc去連接,然後輸入數據,數據以製表符分割

四個flume運行實例

Hive中可以看到數據

四個flume運行實例

運行實例四(hive)

創建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

四個flume運行實例

準備配置信息flume-sink-hive2.properties

四個flume運行實例

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

四個flume運行實例

運行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

四個flume運行實例

查看數據

四個flume運行實例



分享到:


相關文章: