【愚公系列】2023年03月 MES生产制造执行系统 您所在的位置:网站首页 kafkamanager安装 【愚公系列】2023年03月 MES生产制造执行系统

【愚公系列】2023年03月 MES生产制造执行系统

#【愚公系列】2023年03月 MES生产制造执行系统| 来源: 网络整理| 查看: 265

文章目录 前言一、Kafka的使用1.安装包2.注入3.封装3.1 IKafkaConsumer和IKafkaProducer3.2 KafkaConsumer和KafkaProducer3.3 KafkaConfig配置类3.4 KafkaHelper帮助类 4.使用

前言

Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。

Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。

Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。

Kafka有四个核心的API:

The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

Kafka官网:https://kafka.apache.org/ 在这里插入图片描述 Kafka中文文档:https://kafka.apachecn.org/ 在这里插入图片描述

一、Kafka的使用 1.安装包 Confluent.Kafka

在这里插入图片描述

2.注入 if (AppSetting.Kafka.UseConsumer) builder.RegisterType().As().SingleInstance(); if (AppSetting.Kafka.UseProducer) builder.RegisterType().As().SingleInstance();

在这里插入图片描述

3.封装 3.1 IKafkaConsumer和IKafkaProducer

1、IKafkaConsumer

public interface IKafkaConsumer : IDisposable { /// /// 订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 void Consume(Func Func, string Topic); /// /// 批量订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题集合 void ConsumeBatch(Func Func, List Topics); /// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条) /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 最多单次消费行数 默认值:100行 /// 待消费数据 List ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100); /// /// 单笔消费模式-单行消费 /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 待消费数据 ConsumeResult ConsumeOneRow(string Topic, int TimeOut = 300); }

在这里插入图片描述 2、IKafkaProducer

public interface IKafkaProducer { /// /// 生产 /// /// /// /// void Produce(TKey Key, TValue Value, string Topic); /// /// 生产 异步 /// /// /// /// /// Task ProduceAsync(TKey Key, TValue Value, string Topic); }

在这里插入图片描述

3.2 KafkaConsumer和KafkaProducer

1、KafkaConsumer

/// /// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string) /// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式 /// /// Message.Key 的数据类型 /// Message.Value 的数据类型 public class KafkaConsumer : KafkaConfig, IKafkaConsumer { /// /// Kafka地址(包含端口号) /// public string Servers { get { return ConsumerConfig.BootstrapServers; } set { ConsumerConfig.BootstrapServers = value; } } /// /// 消费者群组 /// public string GroupId { get { return ConsumerConfig.GroupId; } set { ConsumerConfig.GroupId = value; } } /// /// 自动提交 默认为 false /// public bool EnableAutoCommit { get { return ConsumerConfig.EnableAutoCommit ?? false; } set { ConsumerConfig.EnableAutoCommit = value; } } /// /// 订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 public void Consume(Func Func, string Topic) { Task.Factory.StartNew(() => { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); builder.SetErrorHandler((_, e) => { Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}"); }).SetStatisticsHandler((_, json) => { Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }).SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-分配的kafka分区:{partitionsStr}"); }).SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-回收了kafka的分区:{partitionsStr}"); }); using var consumer = builder.Build(); consumer.Subscribe(Topic); while (AppSetting.Kafka.IsConsumerSubscribe) //true { ConsumeResult result = null; try { result = consumer.Consume(); if (result.IsPartitionEOF) continue; if (Func(result)) { if (!(bool)ConsumerConfig.EnableAutoCommit) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(result); } } } catch (ConsumeException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace); } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace); } } }); } /// /// 批量订阅回调模式-消费(持续订阅) /// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交 /// 主题 public void ConsumeBatch(Func Func, List Topics) { Task.Factory.StartNew(() => { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); builder.SetErrorHandler((_, e) => { Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}"); }).SetStatisticsHandler((_, json) => { Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }).SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-分配的kafka分区:{partitionsStr}"); }).SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($"-回收了kafka的分区:{partitionsStr}"); }); using var consumer = builder.Build(); consumer.Subscribe(Topics); while (AppSetting.Kafka.IsConsumerSubscribe) //true { ConsumeResult result = null; try { result = consumer.Consume(); if (result.IsPartitionEOF) continue; if (Func(result)) { if (!(bool)ConsumerConfig.EnableAutoCommit) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(result); } } } catch (ConsumeException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace); } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace); } } }); } /// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条) /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 最多单次消费行数 默认值:100行 /// 待消费数据 public List ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100) { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); using var consumer = builder.Build(); consumer.Subscribe(Topic); List Res = new List(); while (true) { try { var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut)); if (result == null) break; else { Res.Add(result); //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(); } if (Res.Count > MaxRow) break; } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace); return null; } } return Res; } /// /// 单笔消费模式-单行消费 /// /// 主题 /// 持续监听时间,单位ms 默认值:300ms /// 待消费数据 public ConsumeResult ConsumeOneRow(string Topic, int TimeOut = 300) { var builder = new ConsumerBuilder(ConsumerConfig); //设置反序列化方式 builder.SetValueDeserializer(new KafkaDConverter()); using var consumer = builder.Build(); consumer.Subscribe(Topic); try { var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut)); if (result != null) { //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法 consumer.Commit(); } return result; } catch (Exception ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace); return null; } } public void Dispose() { //if (_cache != null) // _cache.Dispose(); GC.SuppressFinalize(this); } }

在这里插入图片描述

2、KafkaProducer

/// /// 生产者 控制器或Service里面构造函数注入即可调用 /// Message.Key的数据类型为string、Message.Value的数据类型为string /// /// Message.Key 的数据类型 /// Message.Value 的数据类型 public class KafkaProducer : KafkaConfig, IKafkaProducer { /// /// 构造生产者 /// public KafkaProducer() { } /// /// Kafka地址(包含端口号) /// public string Servers { get { return ProducerConfig.BootstrapServers; } set { ProducerConfig.BootstrapServers = value; } } /// /// 生产 /// /// Message.Key 做消息指定分区投放有用的 /// Message.Value /// 主题 public void Produce(TKey Key, TValue Value, string Topic) { var producerBuilder = new ProducerBuilder(ProducerConfig); producerBuilder.SetValueSerializer(new KafkaConverter());//设置序列化方式 using var producer = producerBuilder.Build(); try { producer.Produce(Topic, new Message { Key = Key, Value = Value }, (result) => { if (result.Error.IsError) Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}", null, $"Delivery Error:{result.Error.Reason}"); });//Value = JsonConvert.SerializeObject(value) } catch (ProduceException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace); } } /// /// 生产异步 /// /// Message.Key /// Message.Value /// 主题 /// public async Task ProduceAsync(TKey Key, TValue Value, string Topic) { var producerBuilder = new ProducerBuilder(ProducerConfig); producerBuilder.SetValueSerializer(new KafkaConverter()); using var producer = producerBuilder.Build(); try { var dr = await producer.ProduceAsync(Topic, new Message { Key = Key, Value = Value }); //Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException ex) { Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace); } } }

在这里插入图片描述

3.3 KafkaConfig配置类 /// /// 配置类 /// public class KafkaConfig { /// /// 构造配置类 /// protected KafkaConfig() { ProducerConfig = new ProducerConfig() { BootstrapServers = AppSetting.Kafka.ProducerSettings.BootstrapServers,// "192.168.20.241:9092", }; ConsumerConfig = new ConsumerConfig() { BootstrapServers = AppSetting.Kafka.ConsumerSettings.BootstrapServers, GroupId = AppSetting.Kafka.ConsumerSettings.GroupId, AutoOffsetReset = AutoOffsetReset.Earliest,//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 EnableAutoCommit = false, //Kafka配置安全认证 //SecurityProtocol = SecurityProtocol.SaslPlaintext, //SaslMechanism = SaslMechanism.Plain, //SaslUsername = AppSetting.Kafka.ConsumerSettings.SaslUsername, //SaslPassword = AppSetting.Kafka.ConsumerSettings.SaslPassword, }; } /// /// 消费者配置文件 /// public ConsumerConfig ConsumerConfig; /// /// 生产者配置文件 /// public ProducerConfig ProducerConfig; }

在这里插入图片描述

3.4 KafkaHelper帮助类 namespace KafkaManager { /// /// 辅助类 /// public class KafkaHelper { /// /// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用) /// /// public static string GetApplicationName() { try { return Assembly.GetEntryAssembly().GetName().Name; } catch { return "Kafka_Test"; } } /// /// 获取服务器名称 /// /// public static string GetServerName() { return Dns.GetHostName(); } /// /// 获取服务器IP /// /// public static string GetServerIp() { IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName()); foreach (var ip in ips.AddressList) { if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$")) { return ip.ToString(); }; } return "127.0.0.1"; } /// /// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级) /// /// long public static long GetTimeStamp() { DateTime time = DateTime.Now; long t = (time.Ticks - 621356256000000000) / 10000; return t; } } #region 实现消息序列化和反序列化 public class KafkaConverter : ISerializer { /// /// 序列化数据成字节 /// /// /// /// public byte[] Serialize(T data, SerializationContext context) { var json = JsonConvert.SerializeObject(data); return Encoding.UTF8.GetBytes(json); } } public class KafkaDConverter : IDeserializer { /// /// 反序列化字节数据成实体数据 /// /// /// /// /// public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) { if (isNull) return default(T); var json = Encoding.UTF8.GetString(data.ToArray()); try { return JsonConvert.DeserializeObject(json); } catch { return default(T); } } } #endregion #region 日志类 /// /// 默认日志类 可自行构造使用 /// public class KafkaLogModel { /// /// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion) /// public KafkaLogModel() { ServerIp = KafkaHelper.GetServerIp(); ServerName = KafkaHelper.GetServerName(); TimeStamp = DateTime.Now; ApplicationName = KafkaHelper.GetApplicationName(); ApplicationVersion = "V1.0.0"; } /// /// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka) /// public string ApplicationName { get; set; } /// /// 程式版本(默认为V1.0.0) /// public string ApplicationVersion { get; set; } /// /// 发生时间(默认为当前时间) /// public DateTime TimeStamp { get; set; } /// /// 开始时间 /// public DateTime BeginDate { get; set; } /// /// 结束时间 /// public DateTime EndDate { get; set; } /// /// 服务器IP(默认抓取当前服务器IP) /// public string ServerIp { get; set; } /// /// 服务器名称(默认抓取当前服务器名称) /// public string ServerName { get; set; } /// /// 客户端IP /// public string ClientIp { get; set; } /// /// 模块(页面路径) /// public string Module { get; set; } /// /// 操作人 /// public string Operator { get; set; } /// /// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义 /// public string OperationType { get; set; } /// /// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义 /// public string Status { get; set; } /// /// 其他信息 /// public string Message { get; set; } } #endregion }

在这里插入图片描述

4.使用 #region kafka使用 if (AppSetting.Kafka.UseConsumer) { using var scope = host.Services.CreateScope(); var testConsumer = scope.ServiceProvider.GetService(); testConsumer.Consume(res => { Console.WriteLine($"recieve:{DateTime.Now.ToLongTimeString()} value:{res.Message.Value}"); bool bl = DataHandle.AlarmData(res.Message.Value); return bl; }, AppSetting.Kafka.Topics.TestTopic); } #endregion


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有