【愚公系列】2023年03月 MES生产制造执行系统 | 您所在的位置:网站首页 › kafkamanager安装 › 【愚公系列】2023年03月 MES生产制造执行系统 |
文章目录
前言一、Kafka的使用1.安装包2.注入3.封装3.1 IKafkaConsumer和IKafkaProducer3.2 KafkaConsumer和KafkaProducer3.3 KafkaConfig配置类3.4 KafkaHelper帮助类
4.使用
前言
Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。 Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。 Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。 Kafka有四个核心的API: The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。Kafka官网:https://kafka.apache.org/ 1、IKafkaConsumer public interface IKafkaConsumer : IDisposable { /// /// 订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 void Consume(Func Func, string Topic); /// /// 批量订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题集合 void ConsumeBatch(Func Func, List Topics); /// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条) /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 最多单次消费行数 默认值:100行 /// 待消费数据 List ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100); /// /// 单笔消费模式-单行消费 /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 待消费数据 ConsumeResult ConsumeOneRow(string Topic, int TimeOut = 300); }
1、KafkaConsumer /// /// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string) /// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式 /// /// Message.Key 的数据类型 /// Message.Value 的数据类型 public class KafkaConsumer : KafkaConfig, IKafkaConsumer { /// /// Kafka地址(包含端口号) /// public string Servers { get { return ConsumerConfig.BootstrapServers; } set { ConsumerConfig.BootstrapServers = value; } } /// /// 消费者群组 /// public string GroupId { get { return ConsumerConfig.GroupId; } set { ConsumerConfig.GroupId = value; } } /// /// 自动提交 默认为 false /// public bool EnableAutoCommit { get { return ConsumerConfig.EnableAutoCommit ?? false; } set { ConsumerConfig.EnableAutoCommit = value; } } /// /// 订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 public void Consume(Func Func, string Topic) { Task.Factory.StartNew(() => { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); builder.SetErrorHandler((_, e) => { Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}"); }).SetStatisticsHandler((_, json) => { Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }).SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-分配的kafka分区:{partitionsStr}"); }).SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-回收了kafka的分区:{partitionsStr}"); }); using var consumer = builder.Build(); consumer.Subscribe(Topic); while (AppSetting.Kafka.IsConsumerSubscribe) //true { ConsumeResult result = null; try { result = consumer.Consume(); if (result.IsPartitionEOF) continue; if (Func(result)) { if (!(bool)ConsumerConfig.EnableAutoCommit) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(result); } } } catch (ConsumeException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace); } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace); } } }); } /// /// 批量订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 public void ConsumeBatch(Func Func, List Topics) { Task.Factory.StartNew(() => { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); builder.SetErrorHandler((_, e) => { Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}"); }).SetStatisticsHandler((_, json) => { Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }).SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-分配的kafka分区:{partitionsStr}"); }).SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-回收了kafka的分区:{partitionsStr}"); }); using var consumer = builder.Build(); consumer.Subscribe(Topics); while (AppSetting.Kafka.IsConsumerSubscribe) //true { ConsumeResult result = null; try { result = consumer.Consume(); if (result.IsPartitionEOF) continue; if (Func(result)) { if (!(bool)ConsumerConfig.EnableAutoCommit) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(result); } } } catch (ConsumeException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace); } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace); } } }); } /// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条) /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 最多单次消费行数 默认值:100行 /// 待消费数据 public List ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100) { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); using var consumer = builder.Build(); consumer.Subscribe(Topic); List Res = new List(); while (true) { try { var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut)); if (result == null) break; else { Res.Add(result); //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(); } if (Res.Count > MaxRow) break; } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace); return null; } } return Res; } /// /// 单笔消费模式-单行消费 /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 待消费数据 public ConsumeResult ConsumeOneRow(string Topic, int TimeOut = 300) { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); using var consumer = builder.Build(); consumer.Subscribe(Topic); try { var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut)); if (result != null) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(); } return result; } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace); return null; } } public void Dispose() { //if (_cache != null) // _cache.Dispose(); GC.SuppressFinalize(this); } }2、KafkaProducer /// /// 生产者 控制器或Service里面构造函数注入即可调用 /// Message.Key的数据类型为string、Message.Value的数据类型为string /// /// Message.Key 的数据类型 /// Message.Value 的数据类型 public class KafkaProducer : KafkaConfig, IKafkaProducer { /// /// 构造生产者 /// public KafkaProducer() { } /// /// Kafka地址(包含端口号) /// public string Servers { get { return ProducerConfig.BootstrapServers; } set { ProducerConfig.BootstrapServers = value; } } /// /// 生产 /// /// Message.Key 做消息指定分区投放有用的 /// Message.Value /// 主题 public void Produce(TKey Key, TValue Value, string Topic) { var producerBuilder = new ProducerBuilder(ProducerConfig); producerBuilder.SetValueSerializer(new KafkaConverter());//设置序列化方式 using var producer = producerBuilder.Build(); try { producer.Produce(Topic, new Message { Key = Key, Value = Value }, (result) => { if (result.Error.IsError) Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}", null, $"Delivery Error:{result.Error.Reason}"); });//Value = JsonConvert.SerializeObject(value) } catch (ProduceException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace); } } /// /// 生产异步 /// /// Message.Key /// Message.Value /// 主题 /// public async Task ProduceAsync(TKey Key, TValue Value, string Topic) { var producerBuilder = new ProducerBuilder(ProducerConfig); producerBuilder.SetValueSerializer(new KafkaConverter()); using var producer = producerBuilder.Build(); try { var dr = await producer.ProduceAsync(Topic, new Message { Key = Key, Value = Value }); //Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace); } } } |
CopyRight 2018-2019 实验室设备网 版权所有 |