Flume之KafkaSink的自定义分区写入 您所在的位置:网站首页 flume翻译 Flume之KafkaSink的自定义分区写入

Flume之KafkaSink的自定义分区写入

2024-01-13 14:54| 来源: 网络整理| 查看: 265

场景

Kafka接收MySQL BinLog日志,同一个表的同一个主键需要按照顺序来消费。

如果数据一条数据实际顺序是先create,再delete,消费是也必须按照这个顺序。

但是kafka只保证了同一分区内的数据是有序的。

所以需要将同一个主键的数据放到一个Kafka分区中。

可以按照表名.主键值作为Kafka的分区key。

下面使用flume模拟数据发送到Kafka。

Flume Source的配置 kafka_key.sources = sources1 kafka_key.channels = channel1 kafka_key.sinks = sink1 ##source 配置 kafka_key.sources.sources1.type = TAILDIR kafka_key.sources.sources1.positionFile = /var/log/flume/taildir_position.json kafka_key.sources.sources1.filegroups = f1 kafka_key.sources.sources1.filegroups.f1 = /home/data/kafkaKey/test.log kafka_key.sources.sources1.batchSize = 100 kafka_key.sources.sources1.backoffSleepIncrement = 1000 kafka_key.sources.sources1.maxBackoffSleep = 5000 kafka_key.sources.sources1.channels = memorychannel Flume Source拦截器配置 # source 拦截器 kafka_key.sources.sources1.interceptors = i1 kafka_key.sources.sources1.interceptors.i1.type = regex_extractor kafka_key.sources.sources1.interceptors.i1.regex = .*?\\|(.*?)\\|.* kafka_key.sources.sources1.interceptors.i1.serializers = s1 kafka_key.sources.sources1.interceptors.i1.serializers.s1.name = key

该拦截器(Regex Extractor Interceptor)用于从原始日志中抽取key值,访问到events header中,header名字为key。

Flume channel 配置 kafka_key.channels.channel1.type = memory kafka_key.channels.channel1.capacity = 1000 kafka_key.channels.channel1.transactionCapacity = 100 Flume Kafka Sink配置 # sink 1 配置 kafka_key.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink kafka_key.sinks.sink1.brokerList = bigdata-001:9092,bigdata-002:9092,bigdata-003:9092 kafka_key.sinks.sink1.topic = test_key kafka_key.sinks.sink1.channel = channel1 kafka_key.sinks.sink1.batch-size = 100 kafka_key.sinks.sink1.requiredAcks = -1 # kafka_key.sinks.sink1.kafka.partitioner.class = com.lxw1234.flume17.SimplePartitioner 数据 time1|key1|ip1 time2|key2|ip2 time3|key3|ip3 time4|key1|ip4 time5|key3|ip5 time6|key3|ip6 time7|key1|ip7 time8|key2|ip8 time9|key2|ip9 time10|key2|ip10 time11|key1|ip11 time12|key3|ip12 Java消费代码 package com.woods.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.kafka.common.utils.Utils; public class MyConsumer { public static void main(String[] args) { String topic = "test_key"; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); Map consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); while(it.hasNext()) { MessageAndMetadata mam = it.next(); String msg = new String(mam.message()); if(msg.length() private final ConcurrentMap topicCounterMap = new ConcurrentHashMap(); public void configure(Map configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }

欢迎访问个人博客!



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有