基本認識:
大數據階段數據的收集來源, flume的收集數據一般是日誌,比如:網站日誌
flume是一個分佈式的,可靠的,可用的
flume可以做離線也可以做實時分析
collecting --》source --》數據採集來源
aggregating --》channel --》數據臨時緩存(只要數據被move了,那就不在存儲了)
moving --》sink --》數據的轉移
1、agent :source、channel、sink
(1)source:用於採集數據,將產生的數據流傳輸到Channel
(2)channel:連接 sources 和 sinks ,臨時緩存數據
(3)sink:從Channel收集數據,將數據寫到目標源
2、Events:
(1)是Flume數據傳輸的基本單元
(2)由header和載有數據的一個byte array構成,byte array字節數組:存儲真實的數據
(3)每一個事件的大小:deserializer.maxLineLength2048字節,編碼格式:UTF-8
一個source,可以綁定多個channel
一個sink,只能綁定一個channel
flume安裝:
準備安裝包
apache-flume-1.7.0-bin.tar.gz
解壓縮
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/
配置文件:flume-env.sh
mv flume-env.sh.template flume-env.sh
配置jdk
export JAVA_HOME=/opt/bigdata/jdk1.8
測試是否成功
bin/flume-ng version
flume的flume-ng命令
Usage: bin/flume-ng [options]...
例如一些提交任務的命令(熟悉下格式):
bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties
bin/flume-ng agent -c conf -n agent -f conf/test.properties
bin/flume-ng avro-client --conf conf --host host --port 8080
配置情況選擇:
1、flume安裝在hadoop集群中:
(1)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
2、flume安裝在hadoop集群中,而且還配置了HA:
(1)HDFS訪問入口變化
(2)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
(3)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄
3、flume不在hadoop集群裡:
(1)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
(2)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄
(3)將hadoop的一些jar包添加到flume的lib目錄下(用的是什麼版本拷貝什麼版本)
運行官網案例:
編輯配置文件flume-test.properties(創建一個)
準備配置信息
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = masterhbase
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
查看系統有沒有安裝telnet:rpm -qa | grep telnet
沒有的就安裝: yum -y install telnet或yum -y install nc
文件log4j.properties顯示的是日誌信息配置
運行:
bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console
開啟一個窗口:telnet連接端口號
telnet masterhbase 44444 (卡在那是正常的,你可以隨意輸入信息)
輸入hello world
在flume中就可以看到數據
退出telnet:輸入ctrl + ] 然後輸入quit
運行實例一
需求:監控apache服務器的日誌,利用flume監控某一個文件
安裝httpd服務
yum -y install httpd
安裝完成之後,會有個目錄生成 /var/www/html
到/var/www/html這個目錄下 vim index.html [隨意輸入內容]
啟動服務: service httpd start
瀏覽網頁:輸入主機名[hostname]
日誌產生的路徑:/var/log/httpd/access_log
配置flume agent
source: exec
channel:memory
sink:hdfs
我們複製配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/httpd/access_log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit = hour
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
一些配置說明
問題:hdfs上的文件一般數據文件大小要大,而且文件數量是要少
hdfs.rollInterval = 600 (這個地方最好還是設置一個時間)
hdfs.rollSize = 1048576 (1M,134217728-》128M)
hdfs.rollCount = 0
hdfs.minBlockReplicas = 1 (這個不設置的話,上面的參數有可能不會生效)
在hdfs文件上設置時間格式分層 年月日/時 每小時生成一個文件
hdfs.useLocalTimeStamp = true
hdfs.round = true
hdfs.roundValue= 1
hdfs.roundUnit = hour
將準備好的jar上傳到flume/lib中
運行
查看hdfs上,不斷刷新會有新的文件
查看下進程
運行實例二
利用flume監控某一個文件目錄,將目錄下滾動好的文件實時抽取到HDFS上
類型選擇
source:spooldir
channel:file
sink:hdfs
創建配置文件flume-spooldir.properties
編寫信息
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/logs
a1.sources.r1.recursiveDirectorySearch = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Describe the channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir
a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置信息概念補充
1.source:spooldir(已經生成好的最終的數據文件)
(1)recursiveDirectorySearch 是否監視子目錄以查找要讀取的新文件
(2)includePattern 正則表達式,指定要包含的文件 (只.csv數據文件,是正則匹配)
(3)ignorePattern 正則表達式,指定要忽略的文件 (不抽取.csv數據文件,是正則匹配)
(4)缺點:不能對目錄文件進行修改,如果有追加內容的文本文件,是不允許的(有可能不會被抽取,有可能會有錯誤)
2.flume監控目錄,支持文件修改,並記錄文件狀態
(1)source:taildir (類似exec + spooldir的組合)
(2)filegroups :設置source組 可設置多個 filegroups = f1
(3)filegroups.:設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件
(4)positionFile:設置定位文件的位置,以JSON格式寫入給定位置文件上每個文件的最後讀取位置
3.Memory Channel是一個不穩定的channel,它在內存中存儲所有事件,
如果進程異常停止,內存中的數據將不能讓恢復,而且受內存大小的限制。
4.flie channel:是一個持久化的channel,數據安全並且只要磁盤空間足夠,它就可以將數據存儲到磁盤上
5.checkpointDir:檢查數據完整性,存放檢查點目錄,可以檢測出哪些數據已被抽取,哪些還沒有
6.dataDirs:存放數據的目錄,dataDirs可以是多個目錄,以逗號隔開,用獨立的多個磁盤上的多個目錄可以提高file channel的性能。
7.hdfs上數據默認是二進制的文件類型:bin/hdfs dfs -text /
8.可以修改hdfs.fileType 改為DataStream(數據流)hdfs.writeFormat = Text 改為文本格式
9.當使用DataStream時候,文件不會被壓縮,不需要設置hdfs.codeC;當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值;hdfs.codeC壓縮編碼解碼器 --》snappy壓縮
10.batchSize默認值:100 每個批次刷新到HDFS上的events數量;
創建目錄
mkdir –p /data/logs
模擬數據
cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/
查看數據
運行
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties
查看下HDFS
運行實例三
將hive的一些jar拷貝過來 flume的lib目錄下
配置flume agent
source:netcat
channel:Memory
sink:hive
啟動hive的元數據服務:
/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &
創建庫和表 (表必須是CLUSTERED BY ,INTO BUCKETS)
create database flume_test;
use flume_test;
create table flume_user(
user_id int,
user_name string,
user_age int
)CLUSTERED BY (user_id) INTO 2 BUCKETS
row format delimited fields terminated by '\\t'
stored as orc;
準備配置文件flume-sink-hive.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = masterhbase
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://masterhbase:9083
a1.sinks.k1.hive.database = flume_test
a1.sinks.k1.hive.table = flume_user
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\\t"
a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age
a1.sinks.k1.serializer.serdeSeparator = '\\t'
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置概念補充
1.serializer: 負責解析事件中的字段並將它們映射到hive表中的列
(2)DELIMITED 普通文本
(2)json json文件 (不需要配置,JSON中的對象名稱直接映射到Hive表中具有相同名稱的列, 內部使用
org.apache.hive.hcatalog.data.JsonSerDe)
2.DELIMITED:
serializer.delimiter:傳入數據中的字段分隔符,用雙引號括起來,例如"\\t"
serializer.fieldnames:從輸入字段到hive表中的列的映射,指定為hive表列名稱的逗號分隔列表
serializer.serdeSeparator :輸出字段分隔符,單引號括起來,例如'\\t'
hive參數設置vim hive-site.xml:
<code><property>/<code>
<code> <name>hive.metastore.uris/<name>/<code>
<code> <value>thrift://masterhbase:9083/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.txn.manager/<name>/<code>
<code> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.initiator.on/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.worker.threads/<name>/<code>
<code> <value>1/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.support.concurrency/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.enforce.bucketing/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name> hive.exec.dynamic.partition.mode/<name>/<code>
<code> <value>nonstrict/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.in.test/<name>/<code>
<code> <value>true/<value>/<code>
<code>
解決報錯問題
(1)報錯:
Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
打開一部分事務支持
-》協同配置
hive.compactor.initiator.on=true; -》運行啟動程序和清除線程,用於打開所需參數的完整列表事務
hive.compactor.worker.threads=1; -》增加工作線程的數量將減少花費的時間
hive.support.concurrency=true; -》是否支持併發,默認是false
hive.enforce.bucketing=true; -》是否啟用bucketing,寫入table數據時會啟動分桶
hive.exec.dynamic.partition.mode=nonstrict; -》設置非嚴格模式
(2)啟動metastore時報錯:
Table 'metastore.COMPACTION_QUEUE' doesn't exist
配置以下屬性:這個是用來創建COMPACTION_QUEUE這張表的
hive.in.test
true
(3)再啟動metastore時報錯:
Error rolling back: Can't call rollback when autocommit=true
去掉以下屬性:
hive.in.test
true
之前沒有安裝,先安裝
啟動flume agent
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties
使用nc去連接,然後輸入數據,數據以製表符分割
Hive中可以看到數據
運行實例四(hive)
創建表
create table emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)CLUSTERED BY (empno) INTO 2 BUCKETS
row format delimited fields terminated by '\\t'
stored as orc;
準備配置信息flume-sink-hive2.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /data/emp.txt
# Describe the sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://masterhbase:9083
a1.sinks.k1.hive.database = flume_test
a1.sinks.k1.hive.table = emp
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\\t"
a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno
a1.sinks.k1.serializer.serdeSeparator = '\\t'
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
運行flume
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties
查看數據
閱讀更多 分享電腦學習 的文章