Flume笔记 | 您所在的位置:网站首页 › flume读取完文件删除 › Flume笔记 |
目录
概念与官方文档监听文件目录数据变更,输出到控制台拦截器
source监听文件目录,sink输出到hdfs将hbase日志信息写入hdfssource监听netcat端口,sink写入控制台
概念与官方文档
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。 Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。 一个agent内部有三个组件: Source:采集源,用于跟数据源对接,以获取数据;Channel:agent内部的数据传输通道,用于从source将数据传递到sink;Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据;在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。 event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 event可以是日志记录、 avro 对象等 flume传输数据时会使用事务,source推送数据到channel以及sink从channel拉取数据时都是以事务方式进行的。就比如sink输出数据完毕后应该将channel中缓存的数据删除,sink输出数据和channel删除数据是一个事务里面的,要么都成功要么都失败
至于Sink的类型,写入到hdfs,hive,hbase都是可以的 下面来说一下怎么使用flume 要想实现flume,需要编写conf文件,conf文件里面应该包含source,channel,sink的相关配置信息 这些信息可以在flume官网看到 flume官网文档,里面有source,channel,sink的相关配置信息 官网的文档内容是纯英文的,如果感觉阅读吃力,可以去已经翻译成中文的这个网站 Flume 1.9用户手册中文版 下面来举几个例子 监听文件目录数据变更,输出到控制台 # 首先先给agent起一个名字 叫a1 # 分别给source channel sink取名字 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 分别对source、channel、sink进行配置 # 配置source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/data/test a1.sources.r1.fileSuffix = .ok a1.sources.r1.fileHeader = true # 配置sink # 使用logger作为sink组件,可以将收集到数据直接打印到控制台 a1.sinks.k1.type = logger # 配置channel # 将channel的类型设置为memory,表示将event缓存在内存中 a1.channels.c1.type = memory # 组装 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1首先给source,channel,sink起一个别名,然后分别对其进行相关参数的配置 具体应该配置哪些参数,这些参数的含义是什么,可以从官方文档上获取 这里其实推荐使用中文文档,感谢翻译成中文的大佬 编写完相应的conf文件后,保存退出,开始执行flume flume-ng agent -n a1 -f spoolTolog.conf-n 的含义是agent的名字,与上面conf文件里面的agent名字保持一致 -f 的含义是conf文件位置,这里的spoolTolog.conf 就是我上面编写的conf文件名称 这里是在conf文件夹下面执行,如果在其他的文件夹下面执行吗,就需要 flume-ng agent -n a1 -f /usr/local/soft/flume-1.9.0/data/spoolTolog.conf
回到a.txt 文件所在的目录下面 另外在上面图中,打印监听文件目录的信息到控制台时, Event: { headers:{file=/usr/local/data/test/a.txt} body: 31 32 33 123 }这是sink输出在控制台的信息,可以发现是以 event 的形式传输的数据。 event里面分为两个部分,分别是 header 和 body header信息,event的头信息,这里显示的内容是file路径,是因为conf文件里面加上了a1.sources.r1.fileHeader = true参数 body信息,body信息又分为两个部分,后半部分的123是 a.txt 文件里面的内容,而前面的31,32,33则分别是 1,2,3的十六进制ASCII码值 拦截器flume监听指定目录下的文件数据变更信息,对于这些信息可以全盘保留,然后以指定的sink方式输出。但是也可以通过拦截器的方式,根据需要过滤或者保留一些信息,然后只会把拦截器拦截过后的这一部分信息传输到sink # 首先先给agent起一个名字 叫a1 # 分别给source channel sink取名字 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 分别对source、channel、sink进行配置 # 配置source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/data/test a1.sources.r1.fileSuffix = .ok a1.sources.r1.fileHeader = true # 给r1这个souces配置一个拦截器并取名为 i1 a1.sources.r1.interceptors = i1 # 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据 a1.sources.r1.interceptors.i1.type = regex_filter # 配置正则表达式 a1.sources.r1.interceptors.i1.regex = \\d{3,6} # excludeEvents = true 表示将匹配到的过滤,未匹配到的放行 # excludeEvents = false 表示只传输匹配成功的 a1.sources.r1.interceptors.i1.excludeEvents = true # 配置sink # 使用logger作为sink组件,可以将收集到数据直接打印到控制台 a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory # 组装 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1同样开始flume监听,然后到监听目录下面创建b.txt文件,文件内容为 12 123 123456 12345678 ab abcde 123abcflume输出在控制台的信息是 这里的12345678,123abc同样被拦截了,但正则表达式的要求是匹配3到6个数字,这些也匹配成功了,是因为一行数据内容只要包含能与正则表达式匹配成功的部分,即会满足正则表达式拦截器的条件。 source监听文件目录,sink输出到hdfs #给agent,source,channel,sink取名 a.sources = r1 a.sinks = k1 a.channels = c1 #配置source a.sources.r1.type = spooldir a.sources.r1.spoolDir = /usr/local/data/test a.sources.r1.fileHeader = true #配置拦截器 a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp #配置sink a.sinks.k1.type = hdfs a.sinks.k1.hdfs.path = /flume/data/dir1 # 设置sink写入文件的要求 # 指定达到多少数据量写一次文件 单位:bytes a.sinks.k1.hdfs.rollSize = 102400 # 指定多少条写一次文件 a.sinks.k1.hdfs.rollCount = 1000 # 指定文件类型为 流 来什么输出什么 a.sinks.k1.hdfs.fileType = DataStream # 指定文件输出格式 为text a.sinks.k1.hdfs.writeFormat = text # 指定文件名后缀 a.sinks.k1.hdfs.fileSuffix = .txt #指定channel a.channels.c1.type = memory #capacity,默认值为100,是内存中存储 Event 的最大数 a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1注意这里的sink修改为写入到hdfs后,需要加上一个配置信息 a.sinks.k1.hdfs.rollSize = 102400 a.sinks.k1.hdfs.rollCount = 1000 这两个参数都是用来限制hdfs中写入的文件大小和数目的 每一次roll滚动都会生成一个新的文件,假设文件一共有一千行,设置rollCount = 10,每十行滚动一次,那么每十行就会在指定目录/flume/data/dir1下生成一个新的hdfs文件,总共100个文件,导致小文件太多。 除了rollIsize和rollcount之外还可以通过 hdfs.rollInterval,当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,来设置划分文件的时间 下面这两个参数用来设置最终hdfs文件的格式,如果不指定,默认以Writable形式写入,以SequenceFile格式存储 a.sinks.k1.hdfs.fileType = DataStream a.sinks.k1.hdfs.writeFormat = text 在/usr/local/data/test目录下创建文件,并且随便输入些内容 然后在hdfs里面创建目录/flume/data/dir1 运行flume,然后查看hdfs里面的文件 在另一台终端启动hbase,然后来到flume终端查看信息
先启动flume,然后通过telnet在8888端口发送信息 |
CopyRight 2018-2019 实验室设备网 版权所有 |