由于域名解析异常导致的Kafka消息发送失败但无异常抛出 您所在的位置:网站首页 kafka发送消息失败 由于域名解析异常导致的Kafka消息发送失败但无异常抛出

由于域名解析异常导致的Kafka消息发送失败但无异常抛出

#由于域名解析异常导致的Kafka消息发送失败但无异常抛出| 来源: 网络整理| 查看: 265

背景

由于业务需要,最近部署了一个跨网段的服务,由一个网段中的应用服务器向另一个网段中的Kafka集群写入消息,应用服务器与Kafka集群之间已经开通网络,telnet结果显示相应端口之间连接正常。

初次上线之后,发现如下方法执行正常,未出现超时阻塞运行现象,但Kafka消费者无法接收发送的消息,怀疑消息并未真正写入Kafka集群中。

producer.send(record)

源码追踪

实际上,我们追踪源码可以发现,Kafka客户端发送时,为了提高吞吐量采用了batch异步发送机制,在真实发送消息时遵循如下的流程:

1.将待发消息写入本进程内存中,类似WAL的形式

2.在待发消息达到一定数量或者超过linger.ms之后,通过Sender调用RecordBatch进行批量发送,同时记录异常信息

由于发送过程是异步的,所以在发送时不会抛出异常,误以为消息已经正常发送。

事实上,作为异步发送方式的配套,Kafka提供了回调接口以供客户端查看消息发送状态,函数原型如下:

@Override public Future send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }

再来看Callback接口

public interface Callback { /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be * non-null. * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. * Possible thrown exceptions include: * * Non-Retriable exceptions (fatal, the message will never be sent): * * InvalidTopicException * OffsetMetadataTooLargeException * RecordBatchTooLargeException * RecordTooLargeException * UnknownServerException * * Retriable exceptions (transient, may be covered by increasing #.retries): * * CorruptRecordException * InvalidMetadataException * NotEnoughReplicasAfterAppendException * NotEnoughReplicasException * OffsetOutOfRangeException * TimeoutException * UnknownTopicOrPartitionException */ public void onCompletion(RecordMetadata metadata, Exception exception); }

调试异常

抛开冗长的注释,我们可以看到,如果发送过程中存在异常,可以通过回调接口获取异常。

producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { logger.error("Caught exception ", exception); } } });

将应用再次上线后,可以发现,应用大量抛出异常。异常内容为“Expiring xxx records for xxxx due to x ms has passed since batch creation plus linger time”。

由此可见,消息并未被真实发送至Kafka broker,而是在后台被静默抛弃了。在网络已经开通的情况下却无法发送消息,需要更进一步挖掘原因。

深入挖掘

我们将应用的日志调至DEBUG,观察从应用启动开始的日志,可以发现,KafkaProducer在初始化时,尝试从指定的brokers中获取meta信息,其中包括:topic信息、partition信息、broker节点信息、topic/partition与节点的对应关系。

观察返回的meta信息可以发现,broker节点信息的返回值为主机域名,这就是问题所在。

由此,我们知道了原因所在:

1.KafkaProducer初始化时尝试从指定的ip获取meta information

2.依据获取到的meta info向指定broker发送消息

由于两个网段间的DNS信息不同步,导致应用所在网段无法解析broker的域名,从而发送消息失败,在DNS无法修改的情况下,在应用节点的/etc/hosts文件中加入broker节点的域名和ip映射,问题解决。

后续

Kafka对外提供的meta info中的节点地址信息是直接读取zookeeper的注册信息,该注册信息是在集群启动时写入的,信息的最终来源为server.properties。依据Kafka官方文档http://kafka.apache.org/0101/documentation.html#brokerconfigs的描述,配置的优先级如下:

1.首先读取advertised.listeners的值

2.若1中值为空,使用advertised.host.name & advertised.port注册到zookeeper

KafkaProducer的发送模型在0.9版本之后默认为异步发送模式(提高吞吐量),如果想要使用同步模式,可以在获取到send(ProducerRecord producerRecord)方法的future返回之后,使用future.get()方法阻塞等待,如下:

Future future = producer.send(record); future.get();

对于中间件的运行机制和细节需要更深入的了解,才能快速定位问题。

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有