Flume应用实战 您所在的位置:网站首页 简述flume的数据处理流程及其步骤及应用 Flume应用实战

Flume应用实战

2024-07-13 01:05| 来源: 网络整理| 查看: 265

转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/81482722 本文出自【我是干勾鱼的博客】

Ingredient:

Java:Java SE Development Kit 8u162(Oracle Java Archive),Linux下安装JDK修改环境变量

Flume:apache-flume-1.8.0-bin.tar.gz(Apache Flume archive repository)

1 Flume简介 1.1 什么是Flume?

Flume是Cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume初始的发行版本目前被统称为“Flume OG(original generation)”,属于 Cloudera。但随着FLume功能的扩展,“Flume OG”代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在“Flume OG”的最后一个发行版本0.94.0中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011年10月22号,Cloudera完成了“Flume-728”,对Flume进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为“Flume NG(next generation)”;改动的另一原因是将 Flume 纳入Apache旗下,“Cloudera Flume”改名为“Apache Flume”。

1.2 Flume的特点

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

Flume的数据流是由Event(事件)贯穿始终的。Event是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。其实可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志,比如保存到文本、HDFS、Hbase等,这是普通的Data flow model,如图所示:

这里写图片描述

当然Flume也支持Multiplexing the flow,也就是多元化的数据流,将数据推向更多的目的地,包括目的地是另一个Source,如图所示:

这里写图片描述

2 下载及安装 2.1 JDK下载及环境配置

Java SE Development Kit 8u162,按照Linux下安装JDK修改环境变量将JDK环境配置好。

2.2 Flume下载及安装

下载apache-flume-1.8.0-bin.tar.gz,将其放入“/opt/flume/”目录并解压缩,解压后的完整路径为:

/opt/flume/apache-flume-1.8.0-bin

测试Flume是否安装成功,进入安装目录:

cd /opt/flume/apache-flume-1.8.0-bin

输入命令查看安装版本信息:

./bin/flume-ng version Flume 1.8.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f Compiled by denes on Fri Sep 15 14:58:00 CEST 2017 From source with checksum fbb44c8c8fb63a49be0a59e27316833d 3 测试实例 3.1 实例1:使用telnet引入source源

这个例子参照官网中Flume 1.8.0 User Guide#Setup中的例子测试Flume的使用。

3.1.1 新建1个agent配置文件

在“conf”目录下创建“flume-conf.properties.example”文件并编辑:

vi conf/flume-conf.properties.example

加入内容:

# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3.1.2 启动1个agent: ./bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties.example --name a1 -Dflume.root.logger=INFO,console

信息响应如下:

./bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties.example --name a1 -Dflume.root.logger=INFO,console Info: Sourcing environment configuration script /opt/flume/apache-flume-1.8.0-bin/conf/flume-env.sh Info: Including Hive libraries found via () for Hive access + exec /opt/java/jdk1.8.0_162/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/flume/apache-flume-1.8.0-bin/conf:/opt/flume/apache-flume-1.8.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/flume-conf.properties.example --name a1 2018-08-07 16:56:15,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2018-08-07 16:56:15,024 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flume-conf.properties.example 2018-08-07 16:56:15,036 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a1 2018-08-07 16:56:15,038 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2018-08-07 16:56:15,038 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2018-08-07 16:56:15,053 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1] 2018-08-07 16:56:15,053 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels 2018-08-07 16:56:15,059 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2018-08-07 16:56:15,063 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1 2018-08-07 16:56:15,063 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat 2018-08-07 16:56:15,084 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2018-08-07 16:56:15,087 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [r1, k1] 2018-08-07 16:56:15,096 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5966e66c counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2018-08-07 16:56:15,113 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2018-08-07 16:56:15,209 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2018-08-07 16:56:15,209 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2018-08-07 16:56:15,215 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2018-08-07 16:56:15,216 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1 2018-08-07 16:56:15,217 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting 2018-08-07 16:56:15,228 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

这时候命令行会停在这里。

对于命令中各配置信息的详情可以使用:

./bin/flume-ng help

来查看,这里涉及到的几个参数:

-n:agent名称 -c:配置文件所在的目录 -f:配置文件名称 3.1.3 使用telnet给Flume发送一个event

新打开一个终端,输入命令:

telnet localhost 44444

这里注意“44444”是在配置文件flume-conf.properties.example中设置的端口号。

终端响应为:

telnet localhost 44444 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Hello world! OK

这里的意思是,输入“telnet localhost 44444”命令之后,连接成功了,然后手动输入字符串“Hello world!”并点击回车,终端会显示“OK”,同时如下所述sink中会输出字符串。

3.1.4 sink的输出

这个时候,之前创建agent的终端会输出相应的“Hello world!”字符串:

2018-08-07 16:56:15,215 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Ap2018-08-07 17:01:25,244 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

这说明我们成功的配置并发布了一个agent。

3.2 实例2:使用avro引入source源

这里我们把文件写入一个文本文件里,然后使用avro将其引入到source中。

3.2.1 新建agent配置文件

创建agent配置文件:

conf/avro.conf

其内容为:

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3.2.2 新建文本文件并写入内容

在Flume的系统目录:

/opt/flume/apache-flume-1.8.0-bin

中创建文件“log.00”并写入内容“Hello world2!”:

echo 'Hello world2!' > ./log.00 3.2.3 启动agent ./bin/flume-ng agent agent -c conf -f ./conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 3.2.4 使用avro-client发送文件 ./bin/flume-ng avro-client -H localhost -p 4141 -F ./log.00 3.2.5 sink的输出

这时候,之前创建agent的终端会输出相应的“Hello world!”字符串:

2018-08-07 17:55:29,586 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 32 21 Hello world2! } 4 参考

Apache Flume

Flume 1.8.0 User Guide

Flume1.5.0入门:安装、部署、及flume的案例

5 觉得还不错就打赏支持一下吧!


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有