尚硅谷Kafka 您所在的位置:网站首页 尚硅谷课程时间 尚硅谷Kafka

尚硅谷Kafka

2023-06-09 23:38| 来源: 网络整理| 查看: 265

目录

一、Kafka 概述

1.1 消息队列

1.1.1 传统消息队列的应用场景

1.1.2 消息队列的两种模式

2.1 Kafka 基础架构

二、Kafka 操作

2.1 Kafka 命令行操作

2.1.1 主题命令行操作

2.2.1 生产者命令行操作

2.3.1 消费者者命令行操作

三、Kafka API

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送API

3.2.1 异步发送

 3.2.2 带回调函数的异步发送

3.3 同步发送API

一、Kafka 概述

Kafka是最初由Linkedin公司开发,是⼀个分布式、⽀持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景:⽐如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx⽇志、访问⽇志,消息服务等等,⽤scala语⾔编写,Linkedin于2010年贡献给了Apache基⾦会并成为顶级开源项⽬。

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域. 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。 Kafka最新定义 : Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

什么是流式平台呢? 流式平台有以下几种特性:

1、可以发布或订阅流式记录,类似MQ或消息系统。

2、可以存储流式记录,并有较好的容错性。

3、可以实时处理流式记录。

什么又是流式数据?

1、流数据是指由数千个数据源持续生成的数据,通常也同时以数据记录的形式发送,规模较小(约几千字节)。

2、流数据(或数据流)是指在时间分布和数量上无限的一系列动态数据集合体,数据的价值随着时间的流逝而降低,因此必须实时计算给出秒级响应。  

1.1 消息队列 1.1.1 传统消息队列的应用场景 传统的消息队列的主要应用场景包括: 缓存 / 消峰 、 解耦 和 异步通信。

异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

 

1.1.2 消息队列的两种模式

2.1 Kafka 基础架构

 Producer:消息生产者,就是向 Kafka broker 发消息的客户端。 Consumer:消息消费者,向 Kafka broker 取消息的客户端。 Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消 费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不 影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。 Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。 Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服 务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。 Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。 Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数 据的对象都是 Leader。 Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

二、Kafka 操作 2.1 Kafka 命令行操作

2.1.1 主题命令行操作 1 )查看操作主题命令参数 [hsw@hadoop102 kafka]$ bin/kafka-topics.sh

 查看所有主题:

创建主题:

查看主题详细描述:

2.2.1 生产者命令行操作 1 )查看操作生产者命令参数 [hsw@hadoop102 kafka]$ bin/kafka-console-producer.sh

 发送消息:

2.3.1 消费者者命令行操作 1 )查看操作消费者命令参数 [hsw@hadoop102 kafka]$ bin/kafka-console-consumer.sh

消费 first 主题中的数据:

把主题中所有的数据都读取出来(包括历史数据):

三、Kafka API 3.1 生产者消息发送流程 3.1.1 发送原理 在消息发送的过程中,涉及到了 两个线程—— main 线程和 Sender 线程 。在 main 线程 中创建了 一个双端队列 RecordAccumulator 。 main 线程将消息发送给 RecordAccumulator , Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 。

3.2 异步发送API 3.2.1 异步发送

代码编写:

public class CustomProducer { public static void main(String[] args) { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 3. 创建 kafka 生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord("first","wyq " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }

 3.2.2 带回调函数的异步发送 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元 数据信息( RecordMetadata )和异常信息( Exception ),如果 Exception 为 null ,说明消息发 送成功,如果 Exception 不为 null ,说明消息发送失败。

代码编写:

public class CustomProducer { public static void main(String[] args) { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 3. 创建 kafka 生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord("first", "wyq " + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception==null){ System.out.println("主题:"+metadata.topic()+" 分区: "+metadata.partition()); } } }); } // 5. 关闭资源 kafkaProducer.close(); } }

3.3 同步发送API

public class CustomProducerSync { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建 kafka 生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord("first", "wyq " + i)).get(); } // 5. 关闭资源 kafkaProducer.close(); } }



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有