数据采集工具之Flume的常用采集方式使用示例 您所在的位置:网站首页 数据的采集工具包括哪些 数据采集工具之Flume的常用采集方式使用示例

数据采集工具之Flume的常用采集方式使用示例

2024-07-12 08:43| 来源: 网络整理| 查看: 265

数据采集工具之Flume的常用采集方式使用示例 FlumeFlume概述Flume架构核心的组件常用Channel、Sink、Source类型Flume架构模式 安装FlumeFlume的基本使用编写配置文件配置Agent实例各组件名称配置Source配置Channel配置Sink将source和sink绑定到channel 启动Agent实例测试 采集目录数据到HDFS编写配置文件启动Agent实例测试 采集文件数据到HDFS编写配置文件启动Agent实例 采集文件数据到Kafka编写配置文件创建Topic启动消费者启动Flume 多个Agent级联配置Agent1配置Agent2启动Agent实例执行测试 Flume的高可用配置Agent1配置Agent2与Agent3启动各Agent故障转移Failover测试 Flume的负载均衡Flume拦截器配置数据采集Agent1配置数据接受Agent2启动Agent执行测试 Flume的监控安装Ganglia配置Flume启动Flume任务执行测试

Flume Flume概述

官网:https://flume.apache.org/

文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

Apache Flume 是 Apache软件基金会的顶级项目。Flume初始发行版本被统称为Flume OG(original generation),属于cloudera。后来重构核心组件、核心配置以及代码架构,重构后的版本统称为Flume NG(next generation)

Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,Flume可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎任何可能的数据源。

Flume架构

外部数据源以特定格式向 Flume 发送 events (事件),当 source 接收到 events 时,它将其存储到一个或多个 channel,channe 会一直保存 events 直到它被 sink 所消费。sink 的主要功能从 channel 中读取 events,并将其存入外部存储系统或转发到下一个 source,成功后再从 channel 中移除 events。

在这里插入图片描述

Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。

核心的组件

Flume运行的核心是Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。相当于一个数据传递者 ,一个独立的Flume进程。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件, Event可以从一个地方流向另一个地方

Client

Client:Client生产数据,运行在一个独立的线程

Event

Event: 一个数据传输的基本单元,消息头和消息体组成,前者是键/值映射,后者是任意字节数组

Agent

Agent:相当于一个数据传递者 ,一个独立独立的 (JVM) 进程,包含组件Source、 Channel、 Sink

Source

Source是数据收集组件,负责将数据采集后进行特殊格式化,将数据封装到事件event里,然后将事件推入Channel中。

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

在这里插入图片描述 Channel

Channel: 传输通道组件,中转Event的一个临时存储,可以看做一个数据的缓冲区(数据队列),保存由Source组件传递过来的Event。可以是内存或持久化的文件系统:

Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机) File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channels

在这里插入图片描述 Sink

Sink: 下沉组件,从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent或者往最终存储系统传递数据

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks

在这里插入图片描述 Interceptor

当需要对数据进行过滤时,除了在Source、 Channel和Sink进行代码修改之外, Flume提供了拦截器,拦截器也是chain形式的。拦截器的位置在Source和Channel之间,当为Source指定拦截器后,在拦截器中会得到event,根据需求可以对event进行保留还是 抛弃,抛弃的数据不会进入Channel中。

在这里插入图片描述

常用Channel、Sink、Source类型

Channel支持的类型 在这里插入图片描述 Sink支持的类型 在这里插入图片描述 Source支持的类型 在这里插入图片描述

Flume架构模式

架构1:

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 Avro 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口

在这里插入图片描述 架构2:

Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

在这里插入图片描述 架构3:

Flume 支持从一个 Source向多个 Channel,也就是向多个 Sink 传递事件。默认情况下是向所有的 Channel复制 Event,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

在这里插入图片描述

安装Flume

下载地址:http://archive.apache.org/dist/flume/

wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz tar -zxvf apache-flume-1.9.0-bin.tar.gz mv apache-flume-1.9.0-bin flume

查看jdk安装路径

find / -name java

修改配置文件

cd flume/conf cp flume-env.sh.template flume-env.sh vim flume-env.sh # export JAVA_HOME=/usr/lib/jvm/java-8-oracle export JAVA_HOME=/usr/local/jdk1.8/ # 建议: 当chnnel设置为内存存储的时候,给Flume分配更多的内存 # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

修改环境变量

vim /etc/profile export FLUME_HOME=/usr/local/program/flume export PATH=$FLUME_HOME/bin:$PATH

刷新配置文件

source /etc/profile

验证

flume-ng version Flume 1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9 Flume的基本使用

监控端口数据参考: https://flume.apache.org/FlumeUserGuide.html#a-simple-example

编写配置文件

根据数据采集的需求配置采集方案,描述在配置文件中

在flume的conf目录下新建一个配置文件

vim /usr/local/program/flume/conf/example.conf 配置Agent实例各组件名称 #a1:Agent实例的名称 #sources名称 a1.sources = r1 # sinks的名称 a1.sinks = k1 # channels的名称 a1.channels = c1 配置Source # source类型:从网络套接字/端口接收数据 a1.sources.r1.type = netcat # 监听地址 a1.sources.r1.bind = node001 # 监听端口 a1.sources.r1.port = 44444 配置Channel # 数据存放位置,存放内存中 a1.channels.c1.type = memory # 通道中最大可存储的event数量 a1.channels.c1.capacity = 1000 # 一次最大可以从source中拿到或者送到sink中的event数量 a1.channels.c1.transactionCapacity = 100 配置Sink # 数据存放位置,存放日志系统中 a1.sinks.k1.type = logger 将source和sink绑定到channel # source使用的channel a1.sources.r1.channels = c1 # sink使用的channel a1.sinks.k1.channel = c1 启动Agent实例

指定采集方案配置文件,在相应的节点上启动flume agent

-c conf 指定flume自身的配置文件所在目录 -f conf/example.con 指定描述的采集方案 -n a1 指定agent的名字 bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 20:43:58,557 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2022-03-08 20:43:58,558 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2022-03-08 20:43:58,560 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting 2022-03-08 20:43:58,581 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.22.4.21:44444] 测试

安装telnet

yum -y install telnet

模拟发送数据

[root@administrator ~]# telnet node001 44444 Trying 172.22.4.21... Connected to node01. Escape character is '^]'. Hello World OK

收到数据

2022-03-08 20:44:21,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D Hello World. } 采集目录数据到HDFS

采集目录数据使用Spooling Source,参考:https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

编写配置文件

vim /usr/local/program/flume/conf/spooldir.conf

1.配置Agent实例各组件的名称

a1.sources = r1 a1.sinks = k1 a1.channels = c1

2.配置Source

# Source类型:监控某目录下文件的变化 a1.sources.r1.type = spooldir # 监控目录的路径 a1.sources.r1.spoolDir= /usr/local/program/flume/tempData # 是否添加文件头部信息 a1.sources.r1.fileHeader= true

3.配置Channel

# 数据存放内存中 a1.channels.c1.type = memory # 存放数据量,指存放Event数量,即每条/每行数据 a1.channels.c1.capacity = 1000 # 向Sink组件一次性传递多少Event数据量(队列);从chanel中获取event存入队列,队列满了发送Sink a1.channels.c1.transactionCapacity = 100

4.配置Sink

hdfs sink https://flume.apache.org/FlumeUserGuide.html#hdfs-sink

# sink类型:hdfs a1.sinks.k1.type = hdfs # 数据存放hdfs的位置 a1.sinks.k1.hdfs.path = hdfs://node01:9000/flume/spooldir/files/%y-%m-%d/%H%M/ # 文件前缀 a1.sinks.k1.hdfs.filePrefix = events- # 文件是否滚动,产生新的文件 a1.sinks.k1.hdfs.round = true # 文件夹滚动周期,即生成文件夹;每10分钟产生1个文件夹 a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # 每隔3秒产生一个新文件 a1.sinks.k1.hdfs.rollInterval = 3 # 文件(hdfs临时)大小达到20字节产生新文件 a1.sinks.k1.hdfs.rollSize = 20 # 文件(hdfs临时)大小达到5个event,产生新文件 a1.sinks.k1.hdfs.rollCount = 5 # 一次从channel向队列存入的event个数;batchSize < transactionCapacity < capacity a1.sinks.k1.hdfs.batchSize = 1 # 是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true

5.将source和sink绑定到channel

# sources对应的channel a1.sources.r1.channels = c1 # sink对应的channel a1.sinks.k1.channel = c1 启动Agent实例 bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 22:20:02,045 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2022-03-08 22:20:02,200 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2022-03-08 22:20:02,203 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2022-03-08 22:20:02,205 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:85)] SpoolDirectorySource source starting with directory: /usr/local/program/flume/tempData 2022-03-08 22:20:02,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2022-03-08 22:20:02,209 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started 2022-03-08 22:20:02,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2022-03-08 22:20:02,255 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started 测试

在监控目录新建文件

cd /usr/local/program/flume/tempData echo "hello world" >> spooldir.txt

采集日志

2022-03-08 22:22:37,622 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2022-03-08 22:22:37,624 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /usr/local/program/flume/tempData/spooldir.txt to /usr/local/program/flume/tempData/spooldir.txt.COMPLETED 2022-03-08 22:22:40,262 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false 2022-03-08 22:22:40,573 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp 2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called. 2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp 2022-03-08 22:22:46,664 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257.tmp to hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257

查看文件是否被采集

采集完成的文件,会被agent自动添加一个后缀:COMPLETED

ls /usr/local/program/flume/tempData

spooldir.txt.COMPLETED

查看HDFS 在这里插入图片描述

采集文件数据到HDFS

采集文件数据使用Exec Source,参考:https://flume.apache.org/FlumeUserGuide.html#exec-source

编写配置文件

vim /usr/local/program/flume/conf/exec.conf

agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 监控文件内容更新 agent1.sources.source1.type = exec # 监控那个文件 agent1.sources.source1.command = tail -F /usr/local/program/flume/tempData/test.txt # sink类型:hdfs agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://node01:9000/flume/exec/files/%y-%m-%d/%H%M/ agent1.sinks.sink1.hdfs.filePrefix = access_log # 最大打开文件 agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 # 写文件类型,普通类型 agent1.sinks.sink1.hdfs.fileType = DataStream # 写文件的格式 agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true agent1.channels.channel1.type = memory # event添加到通道中或者移出的允许时间 agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 5000 agent1.channels.channel1.transactionCapacity = 600 agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 启动Agent实例 bin/flume-ng agent -c conf -f conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 22:41:39,484 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink sink1 2022-03-08 22:41:39,485 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source source1 2022-03-08 22:41:39,487 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt 2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean. 2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sink1 started 2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean. 2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: source1 started

向监控文件写入数据

echo "hello world" >> /usr/local/program/flume/tempData/test.txt

查看监控日志

2022-03-08 22:41:42,555 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false 2022-03-08 22:41:42,975 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp 2022-03-08 22:42:16,054 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called. 2022-03-08 22:42:16,455 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp 2022-03-08 22:42:16,497 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556.tmp to hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556

查看DHFS 在这里插入图片描述

采集文件数据到Kafka 编写配置文件

vim /usr/local/program/flume/conf/exec2kafka.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = flumeTopic a1.sinks.k1.brokerList = node001:9092,node002:9092,node003:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 10 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 创建Topic kafka-topics.sh --zookeeper node001:2181 --create --replication-factor 3 --partitions 3 --topic flumeTopic 启动消费者 kafka-console-consumer.sh --zookeeper node001:2181 --from-beginning --topic flumeTopic 启动Flume flume-ng agent -n a1 -f /usr/local/program/flume/conf/exec2kafka.conf -Dflume.root.logger=INFO,console 多个Agent级联

Agent 级联过程:

1.第一个agent(1号服务器)收集数据,通过网络发送到第二个agent(2号服务器) 2.第二个agent(2号服务器)接收第一个agent(1号服务器)发送的数据,并将数据保存到hdfs 配置Agent1

配置第一个Agent实例

agent1.sources = r1 agent1.sinks = k1 agent1.channels = c1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt agent1.sources.r1.channels = c1 agent1.sinks = k1 # sink端的avro是一个数据发送者 agent1.sinks.k1.type = avro agent1.sinks.k1.channel = c1 agent1.sinks.k1.hostname = node01 agent1.sinks.k1.port = 4141 agent1.sinks.k1.batch-size = 10 agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1 配置Agent2

配置第二个Agent实例,由于使用一台服务器,故修改Agent2实例名称位agent2,Agent三组件名称应该可以同Agent1一样。

agent2.sources = r2 agent2.sinks = k2 agent2.channels = c2 ## source中的avro组件是一个接收者服务 agent2.sources.r2.type = avro agent2.sources.r2.channels = c2 agent2.sources.r2.bind = node01 agent2.sources.r2.port = 4141 # Describe the sink agent2.sinks.k2.type = hdfs agent2.sinks.k2.hdfs.path = hdfs://node01:9000/flume/avro/files/%y-%m-%d/%H%M/ agent2.sinks.k2.hdfs.filePrefix = events- agent2.sinks.k2.hdfs.round = true agent2.sinks.k2.hdfs.roundValue = 10 agent2.sinks.k2.hdfs.roundUnit = minute agent2.sinks.k2.hdfs.rollInterval = 3 agent2.sinks.k2.hdfs.rollSize = 20 agent2.sinks.k2.hdfs.rollCount = 5 agent2.sinks.k2.hdfs.batchSize = 1 agent2.sinks.k2.hdfs.useLocalTimeStamp = true agent2.sinks.k2.hdfs.fileType = DataStream agent2.channels.c2.type = memory agent2.channels.c2.capacity = 1000 agent2.channels.c2.transactionCapacity = 100 agent2.sources.r2.channels = c2 agent2.sinks.k2.channel = c2 启动Agent实例 先启动第二个Agent实例,再启动第一个Agent实例。

bin/flume-ng agent -c conf -f conf/avro2.conf -n agent2 -Dflume.root.logger=INFO,console

2022-03-11 21:32:36,296 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c2 2022-03-11 21:32:36,551 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean. 2022-03-11 21:32:36,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c2 started 2022-03-11 21:32:36,552 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2 2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean. 2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started 2022-03-11 21:32:36,560 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2 2022-03-11 21:32:36,562 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 4141 }... 2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean. 2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started 2022-03-11 21:32:37,056 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.

bin/flume-ng agent -c conf -f conf/avro1.conf -n agent1 -Dflume.root.logger=INFO,console

2022-03-11 21:33:42,883 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2022-03-11 21:33:43,056 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2022-03-11 21:33:43,057 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2022-03-11 21:33:43,058 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2022-03-11 21:33:43,062 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:294)] Starting RpcSink k1 { host: 172.22.4.21, port: 4141 }... 2022-03-11 21:33:43,063 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2022-03-11 21:33:43,063 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started 2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: 172.22.4.21, port: 4141 2022-03-11 21:33:43,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt 2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started 2022-03-11 21:33:43,065 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client. 2022-03-11 21:33:43,090 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers 2022-03-11 21:33:43,578 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:308)] Rpc sink k1 started. 执行测试

向 /usr/local/program/flume/tempData/test.txt写入数据

echo "Hello World" >> test.txt

第二个Agent实例

2022-03-11 21:38:26,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false 2022-03-11 21:38:26,865 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp 2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called. 2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp 2022-03-11 21:38:32,551 (hdfs-k2-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610.tmp to hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610

查看HDFS 在这里插入图片描述

Flume的高可用

Flume高可用搭建过程:

Agent1(1号服务器)采集数据,Agent2(2号服务器)与Agent3(3号服务器)接收Agent1发送的数据并写入HDFS 配置Agent1

配置第一台服务器,Agent1

agent1.channels = c1 agent1.sources = r1 # 指定sink有2个 agent1.sinks = k1 k2 # 2个sink分配一个组 agent1.sinkgroups = g1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 # 配置sink1,数据发送到指定服务器 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = node02 agent1.sinks.k1.port = 4141 # 配置sink1,数据发送到指定服务器 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = node03 agent1.sinks.k2.port = 4141 # 指定sink组配置 agent1.sinkgroups.g1.sinks = k1 k2 # 配置sink组的参数;failover:开启故障转移 agent1.sinkgroups.g1.processor.type = failover # sink权值设置,权值高的优先接收数据 agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000 agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1 agent1.sinks.k2.channel = c1 配置Agent2与Agent3

Agent2与Agent3的配置几乎一致

a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.type = avro a1.sources.r1.bind = node02 a1.sources.r1.port = 4141 a1.sources.r1.channels = c1 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=TEXT a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d a2.sources = r1 a2.channels = c1 a2.sinks = k1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sources.r1.type = avro a2.sources.r1.bind = node03 a2.sources.r1.port = 5151 a2.sources.r1.channels = c1 a2.sinks.k1.type=hdfs a2.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/ a2.sinks.k1.hdfs.fileType=DataStream a2.sinks.k1.hdfs.writeFormat=TEXT a2.sinks.k1.hdfs.rollInterval=10 a2.sinks.k1.channel=c1 a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d 启动各Agent

注意启动顺序:

分别先启动Agent2与Agent3实例,再启动Agent1实例

启动后报错:

org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

由于写入hfds使用到了时间戳区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳,再Agent2与Agent3中配置 a2.sinks.k2.hdfs.useLocalTimeStamp = true

Agent2

2022-03-11 22:48:39,568 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2022-03-11 22:48:39,572 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms 2022-03-11 22:48:39,814 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2022-03-11 22:48:39,815 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2022-03-11 22:48:40,073 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started 2022-03-11 22:48:40,077 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2022-03-11 22:48:40,082 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: 172.22.4.21, port: 4141 }... 2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started # Agent1启动后 2022-03-11 22:48:40,614 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started. 2022-03-11 22:48:48,804 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] OPEN 2022-03-11 22:48:48,808 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] BOUND: /172.22.4.21:4141 2022-03-11 22:48:48,809 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] CONNECTED: /172.22.4.21:35448

Agent3

2022-03-11 22:46:08,010 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2 2022-03-11 22:46:08,012 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2 2022-03-11 22:46:08,014 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 5151 }... 2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean. 2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started 2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean. 2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started 2022-03-11 22:46:08,699 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started. # Agent1启动后 2022-03-11 22:48:49,210 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] OPEN 2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] BOUND: /172.22.4.21:5151 2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] CONNECTED: /172.22.4.21:55952 故障转移Failover测试 由于node02权重值大于node03,故node02优先采集数据上传HDFS。 当node02宕机,node03将复制采集数据上传HDFS。 当node02服务恢复,node02将优先采集数据上传HDFS。 Flume的负载均衡

Flume的负载均衡就是:某Agent路由节点,负责将 Channel 暂存的Event 均衡到对应的多个 Sink组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上

参考flume高可用Agent1的配置开启Flume的负载均衡

# 配置sink组的参数;failover:开启故障转移 agent1.sinkgroups.g1.processor.type = failover # sink权值设置,权值高的优先接收数据 agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000 # 负载均衡 agent1.sinkgroups.g1.processor.type = load_balance agent1.sinkgroups.g1.processor.backoff = true # 轮询 agent1.sinkgroups.g1.processor.selector = round_robin agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

Agent2与Agent3配置参考flume高可用的Agent2余Agent3;最终,当Agent1采集数据时,Agent2与Agent3轮询接受数据

Flume拦截器 配置数据采集Agent1

vim interceptors1.con

# 代表source源有3个 a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 # 第一个source源配置 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test1.txt # 指定拦截器名称 a1.sources.r1.interceptors = i1 # 拦截器类型 a1.sources.r1.interceptors.i1.type = static ## static拦截器作用:往采集到的数据的header中插入自己定义的key-value对 a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = test1 a1.sources.r2.type = exec a1.sources.r2.command = tail -F /usr/local/program/flume/tempData/test2.txt a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = test2 a1.sources.r3.type = exec a1.sources.r3.command = tail -F /usr/local/program/flume/tempData/test3.txt a1.sources.r3.interceptors = i3 a1.sources.r3.interceptors.i3.type = static a1.sources.r3.interceptors.i3.key = type a1.sources.r3.interceptors.i3.value = test3 a1.sinks.k1.type = avro a1.sinks.k1.hostname = node01 a1.sinks.k1.port = 41414 a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000 a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1 配置数据接受Agent2

vim interceptors2.conf

a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.type = avro a2.sources.r1.bind = node01 a2.sources.r1.port =41414 # 添加时间拦截器 a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a2.channels.c1.type = memory a2.channels.c1.capacity = 20000 a21.channels.c1.transactionCapacity = 10000 a2.sinks.k1.type = hdfs # %{type}%获取Agent1中配置的Key-Value对的值 a2.sinks.k1.hdfs.path=hdfs://node001:9000/flume/interceptors/files/%{type}%/%y-%m-%d/%H%M/ a2.sinks.k1.hdfs.filePrefix =events a2.sinks.k1.hdfs.fileType = DataStream a2.sinks.k1.hdfs.writeFormat = Text # 时间类型,使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 生成的文件不按条数生成 a21.sinks.k1.hdfs.rollCount = 0 # 生成的文件按时间生成 a2.sinks.k1.hdfs.rollInterval = 30 # 生成的文件按大小生成 a2.sinks.k1.hdfs.rollSize = 10485760 # 批量写入hdfs的个数 a2a1.sinks.k1.hdfs.batchSize = 10000 # flume操作hdfs的线程数(包括新建,写入等) a21.sinks.k1.hdfs.threadsPoolSize=10 # 操作hdfs超时时间 a2.sinks.k1.hdfs.callTimeout=30000 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 启动Agent bin/flume-ng agent -c conf -f conf/interceptors2.conf -n a2 -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf -f conf/interceptors1.conf -n a1 -Dflume.root.logger=INFO,console 执行测试 [root@administrator tempData]# echo "Hello world" >> test1.txt [root@administrator tempData]# echo "Hello world" >> test2.txt [root@administrator tempData]# echo "Hello world" >> test3.txt

在这里插入图片描述

Flume的监控

Ganglia是一款大规模集群监控软件,适合分布式集群或并行集群监控

官网地址:http://ganglia.info/

安装Ganglia

使用Ganglia镜像创建容器配置Flume监控或安装Ganglia相关软件进行Flume的监控

拉取Ganglia相关镜像

docker pull jackchencoding/ganglia

创建Ganglia容器

docker run -itd --name ganglia -p 8080:80 -p 8081:8649 --privileged=true ganglia:latest 配置Flume

修改/usr/local/program/flume/conf目录下的flume-env.sh,进行如下配置:

JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=IP:8649 -Xms100m -Xmx200m" 启动Flume任务 bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console 执行测试

访问:IP:8080 在这里插入图片描述

访问:IP:8080/ganglia,模拟发送数据,操作Flume测试监控

[root@administrator ~]# telnet node001 44444 Trying 172.22.4.21... Connected to node01. Escape character is '^]'. Hello World OK

在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有