##########################################################################################################
##########################################################################################################flume安装,解压后修改flume_env.sh配置文件,指定java_home即可。
cp hdfs jar包到flume lib目录下(否则无法抽取数据到hdfs上)
flume常见命令选项:
[hadoop@db01 flume-1.5.0]$ bin/flume-ng
commands:
agent run a Flume agentglobal options:
--conf,-c <conf> use configs in <conf> directory -Dproperty=value sets a Java system property valueagent options:
--name,-n <name> the name of this agent (required) --conf-file,-f <file> specify a config file (required if -z missing)eg:
bin/flume-ng agent --conf /opt/cdh-5.3.6/flume-1.5.0/conf --name agent-test --conf-file test.conf
bin/flume-ng agent -c /opt/cdh-5.3.6/flume-1.5.0/conf -n agent-test -f test.conf********************************************************************************************************
flume第一个案例:
定义配置文件/opt/cdh-5.3.6/flume-1.5.0/conf/a1.conf:
# The configuration file needs to define the sources,
# the channels and the sinks.###################################
a1.sources = r1a1.channels = c1a1.sinks = k1############define source#######################################
a1.sources.r1.type = netcata1.sources.r1.bind = db01a1.sources.r1.port = 55555#############define channel###################################
a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100##########define sinks#########################
a1.sinks.k1.type = loggera1.sinks.k1.maxBytesToLog = 1024#######bind###############################
a1.sources.r1.channels=c1a1.sinks.k1.channel = c1安装telnet:
[root@db01 softwares]# rpm -ivh telnet-*
Preparing... ########################################### [100%] 1:telnet-server ########################################### [ 50%] 2:telnet ########################################### [100%][root@db01 softwares]# [root@db01 softwares]# [root@db01 softwares]# rpm -ivh xinetd-2.3.14-39.el6_4.x86_64.rpm Preparing... ########################################### [100%] package xinetd-2:2.3.14-39.el6_4.x86_64 is already installed[root@db01 softwares]# [root@db01 softwares]# [root@db01 softwares]# [root@db01 softwares]# /etc/rc.d/init.d/xinetd restartStopping xinetd: [ OK ]Starting xinetd: [ OK ]启动flume:
bin/flume-ng agent \
--conf /opt/cdh-5.3.6/flume-1.5.0/conf \--name a1 \--conf-file /opt/cdh-5.3.6/flume-1.5.0/conf/a1.conf \-Dflume.root.logger=DEBUG,console登录telnet 测试:
[root@db01 ~]# telnet db01 55555
Trying 192.168.100.231...Connected to db01.Escape character is '^]'.hello flumeOKchavin king OK ------------ 日志输出如下 -------------2017-03-23 16:48:31,285 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 13
2017-03-23 16:48:31,290 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 12017-03-23 16:48:33,234 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 0D hello flume. }2017-03-23 16:48:39,224 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/opt/cdh-5.3.6/flume-1.5.0/conf/a1.conf for changes2017-03-23 16:48:47,031 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:318)] Chars read = 132017-03-23 16:48:47,032 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:322)] Events processed = 12017-03-23 16:48:48,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 63 68 61 76 69 6E 20 6B 69 6E 67 0D chavin king. }2017-03-23 16:49:09,225 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/opt/cdh-5.3.6/flume-1.5.0/conf/a1.conf for changes***************************************************************************
flume第二个案例:收集hive log
/user/hadoop/flume/hive-logs/
[hadoop@db01 hadoop-2.5.0]$ bin/hdfs dfs -mkdir -p /user/hadoop/flume/hive-logs/
a2.conf文件:
# The configuration file needs to define the sources,
# the channels and the sinks.###################################
a2.sources = r2a2.channels = c2a2.sinks = k2############define source#######################################
a2.sources.r2.type = execa2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1/data/logs/hive.loga2.sources.r2.shell = /bin/bash -c#############define channel###################################
a2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100##########define sinks#########################
a2.sinks.k2.type = hdfs#a2.sinks.k2.hdfs.path = hdfs://db02:8020/user/hadoop/flume/hive-logs/
#hadoop ha 配置方法,cp hadoop的配置文件到flume的conf目录下:#cp /opt/cdh-5.3.6/hadoop-2.5.0/etc/hadoop/core-site.xml /opt/cdh-5.3.6/hadoop-2.5.0/etc/hadoop/hdfs-site.xml /opt/cdh-5.3.6/flume-1.5.0/conf/a2.sinks.k2.hdfs.path = hdfs://ns1/user/hadoop/flume/hive-logs/a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Texta2.sinks.k2.hdfs.batchSize = 10#######bind###############################
a2.sources.r2.channels=c2a2.sinks.k2.channel = c2测试:
bin/flume-ng agent \--conf /opt/cdh-5.3.6/flume-1.5.0/conf \--name a2 \--conf-file /opt/cdh-5.3.6/flume-1.5.0/conf/a2.conf \-Dflume.root.logger=DEBUG,console******************************************************************************
flume第三个案例:编辑a3.conf文件:
# The configuration file needs to define the sources,
# the channels and the sinks.######define agent#############################
a3.sources = r3a3.channels = c3a3.sinks = k3############define source#######################################
a3.sources.r3.type = spooldira3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0/spoolinglogsa3.sources.r3.ignorePattern = ^(.)*\\.log$a3.sources.r3.fileSuffix = .delete#############define channel###################################
a3.channels.c3.type = filea3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0/filechannel/checkpointa3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0/filechannel/data##########define sinks#########################
a3.sinks.k3.type = hdfs#a3.sinks.k3.hdfs.path = hdfs://db02:8020/user/hadoop/flume/hive-logs/
a3.sinks.k3.hdfs.path = hdfs://ns1/user/hadoop/flume/splogs/%Y%m%da3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Texta3.sinks.k3.hdfs.batchSize = 10a3.sinks.k3.hdfs.useLocalTimeStamp = true#######bind###############################a3.sources.r3.channels=c3a3.sinks.k3.channel = c3 测试:bin/flume-ng agent \--conf /opt/cdh-5.3.6/flume-1.5.0/conf \--name a3 \--conf-file /opt/cdh-5.3.6/flume-1.5.0/conf/a3.conf \-Dflume.root.logger=DEBUG,console