[从源码学设计] Flume 之 memory channel 您所在的位置:网站首页 channel数据 [从源码学设计] Flume 之 memory channel

[从源码学设计] Flume 之 memory channel

2023-03-15 22:31| 来源: 网络整理| 查看: 265

[从源码学设计] Flume 之 memory channel

目录[从源码学设计] Flume 之 memory channel0x00 摘要0x01 业务范畴1.1 用途和特点1.2 Channel1.3 研究重点1.4 实际能够学到什么1.5 总述0x02 定义2.1 接口2.2 配置参数2.2.1 channel属性2.4 Semaphore和Queue2.5 MemoryTransaction0x03 使用3.1 channel如何使用3.2 source往channel放数据3.3 sink从channel取数据0x04 实现事务4.1 put事务4.2 take事务4.3 提交事务4.4 回滚事务0x05 动态扩容0x06 丢失数据的可能6.1 错误6.1.1 异常原因6.1.2 失败处理6.1.3 解决方案6.2 丢失数据的可能6.2.1 事务保证6.2.2 管道容量6.2.3 MemoryChannel6.2.4 数据重复0xFF 参考

0x00 摘要

在使用Flume时,有时遇到如下错误信息:Space for commit to queue couldn't be acquired。

究其原因,是在memory channel的使用中出现了问题。

本文就以此为切入点,带大家一起剖析下 Flume 中 MemoryChannel 的实现

0x01 业务范畴 1.1 用途和特点

Flume的用途:高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

这里我们介绍与本文相关的特点:

Flume的管道是基于事务,保证了数据在传送和接收时的一致性. Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。 当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据. 1.2 Channel

这里就要介绍channel的概念。channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的source和sink链接。

支持的类型主要有: JDBC channel , File System channel , Memory channel等,大致区别如下:

Memory Channel:events存储在Java Heap,即内存队列中(内存的大小是可以指定的)。对于流量较高和由于agent故障而准备丢失数据的流程来说,这是一个理想的选择; File Channel:event保存在本地文件中,可靠性高,但吞吐量低于Memory Channel; JDBC Channel :event存储在持久化存储库中(其背后是一个数据库),JDBC channel目前支持嵌入式Derby。这是一个持续的channel,对于可恢复性非常重要的流程来说是理想的选择; Kafka Channel:events存储在Kafka集群中。Kafka提供高可用性和高可靠性,所以当agent或者kafka broker 崩溃时,events能马上被其他sinks可用。

本文主要涉及Memory Channel,所以看看其特性。

好处:速度快,吞吐量大; 坏处:根据计算机工作的原理就可以得知,凡是在内存中计算的数据,只要电脑出现故障导致停机,那么内存中数据是不会进行保存的; 所适用的场景:高吞吐量,允许数据丢失的业务中; 1.3 研究重点

由此,我们可以总结出来 Flume 的一些重点功能:

可靠的,容错性高的; 实现事务; 速度快,吞吐量大; 可以调节收集的速度以解决生产者消费者不一致; 可升级的,易管理,可定制的;

因为MemoryChannel属于Flume的重要模块,所以,我们本文就看看是MemoryChannel是如何确保Flume以上特点的,这也是本文的学习思路。

1.4 实际能够学到什么

如何回滚,使用锁,信号量 ,动态扩容,如何解决生产者消费者不一致问题。

1.5 总述

MemoryChannel还是比较简单的,主要是通过MemoryTransaction中的putList、takeList与MemoryChannel中的queue进行数据流转和事务控制,这里的queue相当于持久化层,只不过放到了内存中,如果是FileChannel的话,会把这个queue放到本地文件中。

MemoryChannel受内存空间的影响,如果数据产生的过快,同时获取信号量超时容易造成数据的丢失。而且Flume进程挂掉,数据也会丢失。

具体是:

维持一个队列,队列的两端分别是source和sink。 source使用doPut方法往putList插入Event sink使用doTake方法从queue中获取event放入takeList,并且提供rollback方法,用于回滚。 commit方法作用是把putList中的event一次性写到queue;

下面表示了Event在一个使用了MemoryChannel的agent中数据流向:

source ---> putList ---> queue ---> takeList ---> sink

为了大家更好的理解,我们提前把最终图例发到这里。

具体如下图:

+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+

手机上如图:

img

0x02 定义

我们要看看MemoryChannel重要变量的定义,这里我们没有按照代码顺序来,而是重新整理。

2.1 接口

MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三个接口。

Channel接口 主要声明了Channel中的三个方法,就是队列基本功能:

public void put(Event event) throws ChannelException; //从指定的Source中获得Event放入指定的Channel中 public Event take() throws ChannelException; //从Channel中取出event放入Sink中 public Transaction getTransaction(); //获得当前Channel的事务实例

Transaction接口 主要声明了flume中事务机制的四个方法,就是事务功能:

enum TransactionState { Started, Committed, RolledBack, Closed } //枚举类型,指定了事务的四种状态,事务开始、提交、失败回滚、关闭 void begin(); void commit(); void rollback(); void close();

Configurable接口 主要是和flume配置组件相关的,需要从flume配置系统获取配置信息的任何组件,都必须实现该接口。该接口中只声明了一个context方法,用于获取配置信息。

大体逻辑如下:

+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +-----------+ +--------------+ +---------------+ ^ ^ ^ | | | | | | | | | | +-------------+--------------+ | | | | | | | MemorChannel +---------+ +-------+ | | | | | | | | | | | | | | +----------------------------+

下面我们具体讲讲成员变量。

2.2 配置参数

首先是一系列业务配置参数。

//定义队列中一次允许的事件总数 private static final Integer defaultCapacity = 100; //定义一个事务中允许的事件总数 private static final Integer defaultTransCapacity = 100; //将物理内存转换成槽(slot)数,默认是100 private static final double byteCapacitySlotSize = 100; //定义队列中事件所使用空间的最大字节数(默认是JVM最大可用内存的0.8) private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80); //定义byteCapacity和预估Event大小之间的缓冲区百分比: private static final Integer defaultByteCapacityBufferPercentage = 20; //添加或者删除一个event的超时时间,单位秒: private static final Integer defaultKeepAlive = 3; // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; private volatile int byteCapacity; private volatile int lastByteCapacity; private volatile int byteCapacityBufferPercentage; private ChannelCounter channelCounter;

这些参数基本都在configure(Context context)中设置,基本逻辑如下:

设置 capacity:MemroyChannel的容量,默认是100。

设置 transCapacity:每个事务最大的容量,也就是每个事务能够获取的最大Event数量。默认也是100。事务容量必须小于等于Channel Queue容量。

设置 byteCapacityBufferPercentage:用来确定byteCapacity的一个百分比参数,即我们定义的字节容量和实际事件容量的百分比,因为我们定义的字节容量主要考虑Event body,而忽略Event header,因此需要减去Event header部分的内存占用,可以认为该参数定义了Event header占了实际字节容量的百分比,默认20%;

设置 byteCapacity:byteCapacity等于设置的byteCapacity值或堆的80%乘以1减去byteCapacityBufferPercentage的百分比,然后除以100。具体是首先读取配置文件定义的byteCapacity,如果没有定义,则使用默认defaultByteCapacity,而defaultByteCapacity默认是JVM物理内存的80%(Runtime.getRuntime().maxMemory() * .80);那么实际byteCapacity=定义的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默认100,即计算百分比的一个系数。

设置 keep-alive:增加和删除一个Event的超时时间(单位:秒)。

设置初始化 LinkedBlockingDeque对象,大小为capacity。以及各种信号量对象。

最后初始化计数器。

配置代码摘要如下:

public void configure(Context context) { capacity = context.getInteger("capacity", defaultCapacity); transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } keepAlive = context.getInteger("keep-alive", defaultKeepAlive); resizeQueue(capacity); if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } } 2.2.1 channel属性

ChannelCounter 需要单独说一下。其就是把channel的一些属性封装了一下,初始化了一个ChannelCounter,是一个计数器,记录如当前队列放入Event数、取出Event数、成功数等。

private ChannelCounter channelCounter;

定义如下:

public class ChannelCounter extends MonitoredCounterGroup implements ChannelCounterMBean { private static final String COUNTER_CHANNEL_SIZE = "channel.current.size"; private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt"; private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt"; private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success"; private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity"; } 2.4 Semaphore和Queue

其次是Semaphore和Queue。主要就是用来协助制事务。

MemoryChannel有三个信号量用来控制事务,防止容量越界:queueStored,queueRemaining,bytesRemaining。

queueLock:创建一个Object当做队列锁,操作队列的时候保证数据的一致性; queue:使用LinkedBlockingDeque queue维持一个队列,队列的两端分别是source和sink; queueStored:来保存queue中当前的保存的event的数目,即已经存储的容量大小,后面tryAcquire方法可以判断是否可以take到一个event; queueRemaining:来保存queue中当前可用的容量,即空闲的容量大小,可以用来判断当前是否有可以提交一定数量的event到queue中; bytesRemaining : 表示可以使用的内存大小。该大小就是计算后的byteCapacity值。 private Object queueLock = new Object(); @GuardedBy(value = "queueLock") private LinkedBlockingDeque queue; private Semaphore queueRemaining; private Semaphore queueStored; private Semaphore bytesRemaining;// 表示可以使用的内存大小。该大小就是计算后的byteCapacity值。 2.5 MemoryTransaction

内部类MemoryTransaction是整个事务保证最重要的类。

MemoryTransaction用来接收数据和事务控制。该类继承BasicTransactionSemantics类。

MemoryTransaction维护了两个队列,一个用于Source的put,一个用于Sink的take,容量大小为事务的容量(transCapacity)。

takeList:take事务用到的队列;阻塞双端队列,从channel中取event先放入takeList,输送到sink,commit成功,从channel queue中删除; putList:put事务用到的队列;从source 会先放至putList,然后commit传送到channel queue队列; channelCounter:channel属性;ChannelCounter类定义了监控指标数据的一些属性方法; putByteCounter:put字节数计数器; takeByteCounter:take字节计数器; private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque takeList; private LinkedBlockingDeque putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; }

无论是Sink,还是Source都会调用getTransaction()方法,获取当前Channel的事务实例。

接口与成员变量大致逻辑可以理解如下,其中 Channel 的 API 表示这里是 MemorChannel 的对外 API:

+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +---+-------+ +--------------+ +---------------+ ^ | ^ ^ | | | | | | | +--------------------------------------------------------+ | | | | | | | | MemoryChannel | | | | | + | | | | | | | | MemoryTransaction | | | | | | | | Semaphore / Queue | | | | | | +--------+ | | API | | | | | | | Config Parameters +------------+ | | | | +--------------------------------------------------------+ 0x03 使用

看了上面讲的,估计大家还是会晕,因为成员变量和概念实在是太多了,所以我们从使用入手分析。

前面提到,memory channel内部有三个队列,分别是putList,queue,takeList。其中putList,takeList在MemoryTransaction之中。

3.1 channel如何使用

channel之上有一把锁,当source主动向channel放数据或者sink主动从channel取数据时,会抢锁,谁取到锁,谁就可以操作channel。

每次使用时会首先调用tx.begin()开始事务,也就是获取锁。然后调用tx.commit()提交数据或者调用tx.rollback()取消操作。

这里需要注意的是:Source, Sink 都是死循环,抢同一个锁。所以就会有消费者,生产者速度不一致的情况,所以就需要有 一个内部的 buffer,就是我们的Queue。

3.2 source往channel放数据

这是一个死循环,source一直试图获取channel锁,然后从kafka获取数据,放入channel中,那每次放入多少个数据呢?在KafkaSource.java中,代码是这样的:

while (eventList.size() < batchUpperLimit && System.currentTimeMillis() < maxBatchEndTime) { }

含义就是:每次最多放batchUpperLimit或最多等待maxBatchEndTime的时间,就结束向channel放数据。

当获取了足够的数据,首先放入putList中,然后就会调用tx.commit()将putList的全部数据放入queue中。

3.3 sink从channel取数据

也是一个死循环,sink一直试图获取channel锁,然后从channel取一批数据,放入sink和takeList(仅仅用于回滚,在调用rollback时takeList的数据会回滚到queue中)。每次取多少个event呢?以HDFSEventSink为例,代码如下:

for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); if (event == null) break; }

batchSize的大小默认是100,由hdfs.batchSize控制。

具体如下:

+---------------> ^ | | | while(1) | v +-----------+ | +----+----+ | Source | | take | Sink | | | | | | +-----+-----+ | +---------+ | | | +-------------+--+ | | Channel | | | | While(1) | | | | | buffer | | +----------------+ | | ^ | | | | put v ----------------^ 0x04 实现事务

此处回答了前面提到的两个重点:

可靠的,容错性高的; 实现事务;

其实就是用事务保证整个流程的高可靠,其核心就在从source抽取数据到channel,从channel抽取到sink,当sink被消费后channel数据删除的这三个环节。而这些环节在flume中被统一的用事务管理起来。可以说,这是flume高可靠的关键一点。

具体涉及到的几个点如下:

MemoryTransaction是实现事务的核心。每次使用时会首先调用tx.begin()开始事务,也就是获取锁。然后调用tx.commit()提交数据或者调用tx.rollback()取消操作。 MemoryChannel时设计时考虑了两个容量:Channel Queue容量和事务容量,而这两个容量涉及到了数量容量和字节数容量。 MemoryChannel 会根据事务容量 transCapacity 创建两个阻塞双端队列putList和takeList,这两个队列(相当于两个临时缓冲队列)主要就是用于事务处理的。即,每个事务都有一个Take List和Put List分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 首先由一个Channel Queue用于存储整个Channel的Event数据; 当从Source往 Channel中放事件event 时,会先将event放入 putList 队列,然后将putList队列中的event 放入 MemoryChannel的queue中。 当从 Channel 中将数据传送给 Sink 时,则会将event先放入 takeList 队列中,然后从takeList队列中将event送入Sink,不论是 put 还是 take 发生异常,都会调用 rollback 方法回滚事务。 回滚时,会先给 Channel 加锁防止回滚时有其他线程访问,若takeList 不为空, 就将写入 takeList中的event再次放入 Channel 中,然后移除 putList 中的所有event(即就是丢弃写入putList临时队列的 event)。 因为多个事务要操作Channel Queue,还要考虑Channel Queue的动态扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。

我们下面具体走一下这个流程。

4.1 put事务

此事务发生在在Source到Channel之间,是从指定的Source中获得Event放入指定的Channel中,具体包括:

doPut:将批数据先写入临时缓冲区 putList; doCommit:检查 channel 内存队列是否足够合并; doRollback:channel 内存队列空间不足,回滚数据;

如下调用。

try { tx.begin(); //底层就是调用的doPut方法 // Source写事件调用put方法 reqChannel.put(event); tx.commit(); } catch (Throwable t) { // 发生异常则回滚事务 tx.rollback(); if (t instanceof Error) { throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } }

下面分析doPut方法。

doPut逻辑如下:

计算event大概占用的slot数; offer方法往putList中添加event,等事务提交时转移到Channel Queue,如果满了则直接抛异常回滚事务; 累加这一条event所占用的slot空间,以便之后做字节容量限制。

具体代码如下:

protected void doPut(Event event) throws InterruptedException { //增加放入事件计数器 channelCounter.incrementEventPutAttemptCount(); //estimateEventSize计算当前Event body大小 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); /* * offer若立即可行且不违反容量限制,则将指定的元素插入putList阻塞双端队列中(队尾), * 并在成功时返回,如果当前没有空间可用,则抛异常回滚事务 * */ if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } //记录Event的byte值 putByteCounter += eventByteSize; }

具体如下图,我们暂时忽略commit与rollback:

+----------+ | Source | +---------------------------+ +-----+----+ | [MemoryChannel] | | | +---------------------+ | | | | [MemoryTransaction] | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | putByteCounter | | | | | | | | | | +-----------+ | | +----------------> | putList | | | doPut | | +-----------+ | | | +---------------------+ | +---------------------------+ 4.2 take事务

此事务发生在Channel到Sink之间,主要是从Channel中取出event放入Sink中,具体包括。

doTake:将数据取到临时缓冲区 takeList,并将数据发送到 HDFS; doCommit:如果数据全部发送成功,则清除临时缓冲区 takeList; doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列;

如下调用:

transaction = channel.getTransaction(); transaction.begin(); ...... event = channel.take(); ...... transaction.commit();

逻辑如下:

判断takeList中是否还有空间,如果没有空间则抛出异常; 判断当前MemoryChannel中的queue中是否还有空间,这里通过信号量来判断; 从queue头部弹出一条消息,放入takeList中; 估算这条Event所占空间(slot数),累加takeList中的字节数; 将取出来的这条Event返回;

doTake具体代码如下:

protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount();//将正在从channel中取出的event计数器原子的加一,即增加取出事件计数器 //如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event,抛异常 if (takeList.remainingCapacity() == 0) {//takeList队列剩余容量为0 throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } //尝试获取一个信号量获取许可,如果可以获取到许可的话,证明queue队列有空间,超时直接返回null if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized (queueLock) { event = queue.poll(); //获取并移除MemoryChannel双端队列表示的队列的头部(也就是队列的第一个元素),队列为空返回null,同一时间只能有一个线程访问,加锁同步 } //因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); //将取出的event暂存到事务的takeList队列 //计算当前Event body大小并增加取出队列字节数计数器 /* 计算event的byte大小 */ int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); //更新takeByteCounter大小 takeByteCounter += eventByteSize; return event; }

于是我们把take事务加入,我们暂时忽略commit与rollback。具体如下图,目前两个事务是没有联系的:

+----------+ +-------+ | Source | +---------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +-----------+ +------+-----+ | | | | ^ | | | | | | | | +--------------------------------------------------+ | | | | | | | | | | | +---------+ poll | | | | queue | +---------+ | | +---------+ | +---------------------------------------------------------+ 4.3 提交事务

commit阶段主要做的事情是提交事务,此代码繁杂在于其包括了两个方面的操作:

从putList拿数据到Queue; 处理 takelist后续操作,就是根据此时具体情况调整各种数值;

commit其逻辑如下:

计算takeList中Event数与putList中的Event差值;int remainingChange = takeList.size() - putList.size(); 差值小于0,说明takeList小,也就是向该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放; 首先通过信号量来判断是否还有剩余空间;这一步tryAcquire方法会将bytesRemaining的值减去putByteCounter的值,如果bytesRemaining原来的值大于putByteCounter则返回true; 然后判断,在给定的keepAlive时间内,能否获取到充足的queue空间; 如果上面的两个判断都过了,那么把putList中的Event放到该MemoryChannel中的queue中; 将putList中的Event循环放入queue中; 面的工作完成后,清空putList和takeList,一次事务完成; 然后将两个计数器置零; 将queueStored的值加上puts的值,更新信号量; 如果takeList比putList大,说明该MemoryChannel中queue的数量应该是减少了,所以把(takeList-putList)的差值加到信号量queueRemaining; 更新channelCounter中的三个变量;

具体如下:

protected void doCommit() throws InterruptedException { //计算改变的Event数量,即取出数量-放入数量;如果放入的多,那么改变的Event数量将是负数 //如果takeList更小,说明该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放 int remainingChange = takeList.size() - putList.size(); //takeList.size()可以看成source,putList.size()看成sink //如果remainingChange小于0,则需要获取Channel Queue剩余容量的信号量 if (remainingChange < 0) { //sink的消费速度慢于source的产生速度 //利用bytesRemaining信号量判断是否有足够空间接收putList中的events所占的空间 //putByteCounter是需要推到channel中的数据大小,bytesRemainingchannel是容量剩余 //获取putByteCounter个字节容量信号量,如果失败说明超过字节容量限制了,回滚事务 if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { //channel 数据大小容量不足,事物不能提交 throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } //获取Channel Queue的-remainingChange个信号量用于放入-remainingChange个Event,如果获取不到,则释放putByteCounter个字节容量信号量,并抛出异常回滚事务 //因为source速度快于sink速度,需判断queue是否还有空间接收event if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { //remainingChange如果是负数的话,说明source的生产速度,大于sink的消费速度,且这个速度大于channel所能承载的值 bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } } int puts = putList.size(); //事务期间生产的event int takes = takeList.size(); //事务期间等待消费的event //如果上述两个信号量都有空间的话,那么把putList中的Event放到该MemoryChannel中的queue中。 //锁住队列开始,进行数据的流转 synchronized (queueLock) {//操作Channel Queue时一定要锁定queueLock if (puts > 0) { while (!putList.isEmpty()) { //如果有Event,则循环放入Channel Queue if (!queue.offer(putList.removeFirst())) { //如果放入Channel Queue失败了,说明信号量控制出问题了,这种情况不应该发生 throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //以上步骤执行成功,清空事务的putList和takeList putList.clear(); takeList.clear(); } //更新queue大小控制的信号量bytesRemaining //释放takeByteCounter个字节容量信号量 bytesRemaining.release(takeByteCounter); //重置字节计数器 takeByteCounter = 0; putByteCounter = 0; //释放puts个queueStored信号量,这样doTake方法就可以获取数据了 queueStored.release(puts); //从queueStored释放puts个信号量 //释放remainingChange个queueRemaining信号量 if (remainingChange > 0) { queueRemaining.release(remainingChange); } //ChannelCounter一些数据计数 if (puts > 0) { //更新成功放入Channel中的events监控指标数据 channelCounter.addToEventPutSuccessCount(puts); } if (takes > 0) { //更新成功从Channel中取出的events的数量 channelCounter.addToEventTakeSuccessCount(takes); } channelCounter.setChannelSize(queue.size()); }

此处涉及到两个信号量:

queueStored表示Channel Queue已存储事件容量(已存储的事件数量),队列取出事件时-1,放入事件成功时+N,取出失败时-N,即Channel Queue存储了多少事件。

queueStored信号量默认为0。 当doTake取出Event时减少一个queueStored信号量。 当doCommit提交事务时需要增加putList 队列大小的queueStored信号量。 当doRollback回滚事务时需要减少takeList队列大小的queueStored信号量。

queueRemaining表示Channel Queue可存储事件容量(可存储的事件数量),取出事件成功时+N,放入事件成功时-N。

queueRemaining信号量默认为Channel Queue容量。其在提交事务时首先通过remainingChange = takeList.size() - putList.size()计算获得需要增加多少变更事件; 如果小于0表示放入的事件比取出的多,表示有 remainingChange个事件放入,此时应该减少queueRemaining信号量; 而如果大于0,则表示取出的事件比放入的多,表示有queueRemaining个事件取出,此时应该增加queueRemaining信号量;即消费事件时减少信号量,生产事件时增加信号量。

而bytesRemaining是字节容量信号量,超出容量则回滚事务。

具体如下图,现在整体业务已经走通:

+----------+ +-------+ | Source | +---------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+------+ +------+-----+ | | | | | ^ | | | | | | | | | +--------------------------------------------------------+ | | | | poll | | | | | | | | | | | doCommit +---------+ doCommit | | | +------------> | queue | +---------+ | | +---------+ | +---------------------------------------------------------------+

手机如下图:

img

4.4 回滚事务

当一个事务失败时,会进行回滚,即调用本方法。在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。具体逻辑如下:

得到takeList中的Event数量 int takes = takeList.size(); 首先把takeList中的Event放回到MemoryChannel中的queue中; 先判断queue中能否有足够的空间将takeList的Events放回去; 从takeList的尾部依次取出Event,放入queue的头部; 然后清空putList; 因为清空了putList,所以需要把putList所占用的空间大小添加到bytesRemaining中;

具体代码如下:

protected void doRollback() { //获取takeList的大小,然后bytesRemaining中释放 int takes = takeList.size(); //将takeList中的Event重新放回到queue队列中。 synchronized (queueLock) { //操作Channel Queue时一定锁住queueLock //前置条件判断,检查是否有足够容量回滚事务 Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); //回滚事务的takeList队列到Channel Queue while (!takeList.isEmpty()) { //takeList不为空,将其events全部放回queue //removeLast()获取并移除此双端队列的最后一个元素 queue.addFirst(takeList.removeLast()); } //最后清空putList putList.clear(); } //清空了putList,所以需要把putList占用的空间添加到bytesRemaining中 //即,释放putByteCounter个bytesRemaining信号量 bytesRemaining.release(putByteCounter); //计数器重置 putByteCounter = 0; takeByteCounter = 0; //释放takeList队列大小个已存储事件容量 queueStored.release(takes); channelCounter.setChannelSize(queue.size()); }

具体如下图:

+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+

手机上如图:

img

0x05 动态扩容

此小节回答了如下问题:

可升级的,易管理,可定制的;

MemoryChannel 中使用锁配合信号实现动态增减容量。

MemoryChannel会通过configure方法获取配置文件系统,初始化MemoryChannel,其中对于配置信息的读取有两种方法,只在启动时读取一次或者动态的加载配置文件,动态读取配置文件时若修改了Channel 的容量大小,则会调用 resizeQueue 方法进行调整,如下:

if (queue != null) { //queue不为null,则为动态修改配置文件时,重新指定了capacity try { resizeQueue(capacity); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { //初始化queue,根据指定的capacity申请双向阻塞队列,并初始化信号量 synchronized (queueLock) { queue = new LinkedBlockingDeque(capacity); queueRemaining = new Semaphore(capacity); queueStored = new Semaphore(0); } }

动态调整 Channel 容量主要分为三种情况:

新老容量相同,则直接返回;

老容量大于新容量,缩容,需先给未被占用的空间加锁,防止在缩容时有线程再往其写数据,然后创建新容量的队列,将原本队列加入中所有的 event 添加至新队列中;

老容量小于新容量,扩容,然后创建新容量的队列,将原本队列加入中所有的 event 添加至新队列中。

具体代码如下:

private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; //首先计算扩容前的Channel Queue的容量 //计算原本的Channel Queue的容量 synchronized (queueLock) { //老的容量=队列现有余额+在事务被处理了但是是未被提交的容量 oldCapacity = queue.size() + queue.remainingCapacity(); } //新容量和老容量相等,不需要调整返回 if (oldCapacity == capacity) {//如果老容量大于新容量,缩容 return; } else if (oldCapacity > capacity) { //缩容 //首先要预占老容量-新容量的大小,以便缩容容量 //首先要预占用未被占用的容量,防止其他线程进行操作 //尝试占用即将缩减的空间,以防被他人占用 if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { //如果获取失败,默认是记录日志然后忽略 LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted"); } else { //直接缩容量 //锁定queueLock进行缩容,先创建新capacity的双端阻塞队列,然后复制老Queue数据。线程安全 //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,因为这一系列操作要线程安全 synchronized (queueLock) { LinkedBlockingDeque newQueue = new LinkedBlockingDeque(capacity); newQueue.addAll(queue); queue = newQueue; } } } else { //扩容,加锁,创建新newQueue,复制老queue数据 //扩容 synchronized (queueLock) { LinkedBlockingDeque newQueue = new LinkedBlockingDeque(capacity); newQueue.addAll(queue); queue = newQueue; } //增加/减少Channel Queue的新的容量 //释放capacity - oldCapacity个许可,即就是增加这么多可用许可 queueRemaining.release(capacity - oldCapacity); } } 0x06 丢失数据的可能

回到本文最初的错误信息:Space for commit to queue couldn't be acquired。

这说明Flume是会出现数据相关问题的。我们首先分析此问题。

6.1 错误 6.1.1 异常原因

因为“source往putList放数据,然后提交到queue中”与“sink从channel中取数据到sink和takeList,然后再从putList取数据到queue中”这两部分是分开来,任他们自由抢锁,所以,当前者多次抢到锁,后者没有抢到锁,同时queue的大小又太小,撑不住多次往里放数据,就会导致触发这个异常。

6.1.2 失败处理

正常情况下,如果遇到此问题,flume会暂停source向channel放数据,等待几秒钟,这期间sink应该会消费channel中的数据,当source再次开始想channel放数据时channel就有足够的空间了。

但是如果一直出现异常,就需要启用解决方案。

6.1.3 解决方案

解决这个问题最直接的办法就是增大queue的大小,增大capacity和transacCapacity之间的差距,queue能撑住多次往里面放数据即可。

6.2 丢失数据的可能

下面我们看看Flume使用中,丢失数据的可能。

6.2.1 事务保证

根据Flume的架构原理,采用FileChannel的Flume是不可能丢失数据的,因为其内部有完善的事务机制(ACID)。

Source到Channel是事务性的, Channel到Sink也是事务性的,

这两个环节都不可能丢失数据。

6.2.2 管道容量

一旦管道中所有Flume Agent的容量之和被使用完,Flume 将不再接受来自客户端的数据。此时,客户端需要缓冲数据,否则数据可能会丢失。因此,配置管道能够处理最大预期的停机时间是非常重要的。

6.2.3 MemoryChannel

Channel采用MemoryChannel时候,会出现丢失。

MemoryChannel受内存空间的影响,如果数据产生的过快,同时获取信号量超时容易造成数据的丢失。此时Source不再写入数据,造成未写入的数据丢失;就是本文的情况; Flume进程挂掉,数据也会丢失,因为之前数据在内存中;

所以如果想要不丢失数据,需要采用File channel。

Memory Channel 是一个内存缓冲区,因此如果Java23 虚拟机(JVM)或机器重新启动,任何缓冲区中的数据将丢失。另一方面,File Channel是在磁盘上的。即使JVM 或机器重新启动,File Channel 也不丢失数据,只要磁盘上存储的数据仍然是起作用的和可访问的。机器和Agent 一旦开始运行,任何存储在FileChannel 中的数据将最终被访问。

6.2.4 数据重复

在Channel发送到Sink这阶段,容易出现数据重复问题。

比如:如果flush到HDFS的时候,数据flush了一半之后出问题了,这意味着已经有一半的数据已经发送到HDFS上面了,现在出了问题,同样需要调用doRollback方法来进行回滚。

回滚并没有“一半”之说,它只会把整个takeList中的数据返回给channel,然后继续进行数据的读写。这样开启下一个事务的时候就容易造成数据重复的问题。

所以,在某种程度上,flume对数据进行采集传输的时候,它有可能会造成数据的重复,但是其数据不丢失。

Flume 保证事件至少一次被送到它们的目的地,只有一次倾力写数据,且不存在任何类型的故障事件只被写一次。但是像网络超时或部分写入存储系统的错误,可能导致事件不止被写一次,因为Flume 将重试写操作直到它们完全成功。网络超时可能表示写操作的失败,或者只是机器运行缓慢。如果是机器运行缓慢,当Flume 重试这将导致重复。因此,确保每个事件都有某种形式的唯一标识符通常是一个好主意,如果需要,最终可以用来删除事件数据。

0xFF 参考

基于Flume的美团日志收集系统(一)架构和设计

基于Flume的美团日志收集系统(二)改进和优化

事件序列化器 Flume 的无数据丢失保证,Channel 和事务

flume MemoryChannel分析

Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channel Flume 1.7 源码分析(五)从Channel获取数据写入Sink

Flume - MemoryChannel源码解析

flume到底会丢数据吗?其可靠性如何?——轻松搞懂Flume事务机制

Flume会不会丢失数据?

flume MemoryChannel分析

Flume架构与源码分析-MemoryChannel事务实现

flume“Space for commit to queue couldn't be acquired”异常产生分析

源码趣事-flume-队列动态扩容及容量使用

并发性标注 @GuardedBy @NotThreadSafe @ThreadSafe

秒懂,Java 注解 (Annotation)你可以这样学

Flume之MemoryChannel源码解读

Flume MemoryChannel源码分析

搞懂分布式技术17,18:分布式事务总结



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有