CDH中kafka配置 您所在的位置:网站首页 kafka的配置文件在哪 CDH中kafka配置

CDH中kafka配置

2024-07-12 02:12| 来源: 网络整理| 查看: 265

文章目录 导入离线包开始安装flume消费kafka数据到hdfs上

导入离线包

主节点创建/opt/cloudera/csd目录

mkdir -p /opt/cloudera/csd

上传KAFKA-1.2.0.jar到/opt/cloudera/csd目录,并修改所有者和所有者的组

[root@hadoop001 csd]# chown cloudera-scm:cloudera-scm /opt/cloudera/csd/ -R

上传KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel、KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1到/opt/cloudera/parcel-repo目录,并修改KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1名称为KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha

[root@hadoop001 parcel-repo]# mv /opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1 /opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha

ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活

开始安装

点击CM–> 添加服务 --> 选择kafka–>主机全选

进入这个页面的时候不要着急 在这里插入图片描述

点击CM,这时看到kafka,修改Kafka的堆大小为256M,不然不够用,默认是50M,修改全部为256M

在这里插入图片描述

查看kafka的topic

[root@hadoop001 parcel-repo]# /opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/bin/kafka-topics --zookeeper hadoop001:2181 --list

进入到/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1目录下分别创建:启动日志主题、事件日志主题

创建启动日志主题

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --create --replication-factor 1 --partitions 1 --topic topic_start WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore (’_’) could collide. To avoid issues it is best to use either, but not both. Created topic “topic_start”.

创建事件日志主题

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --create --replication-factor 1 --partitions 1 --topic topic_event WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore (’_’) could collide. To avoid issues it is best to use either, but not both. Created topic “topic_event”

查看某个Topic的详情

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181 --describe --topic topic_event Topic:topic_event PartitionCount:1 ReplicationFactor:1 Configs: ​ Topic: topic_event Partition: 0 Leader: 46 Replicas: 46 Isr: 46

删除刚刚创建的topic

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --delete --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --topic topic_start

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --delete --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --topic topic_event

生产消息

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-producer --broker-list hadoop001:9092 --topic topic_start

hello nihao

消费消息

[root@hadoop002 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-consumer --bootstrap-server hadoop001:9092 --from-beginning --topic topic_start [2019-07-21 18:07:45,771] WARN [Consumer clientId=consumer-1, groupId=console-consumer-39633] Error while fetching metadata with correlation id 2 : {topic_start=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) hello nihao

测试与flume是否连通

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# mkdir -p /opt/module/flume [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# cd /opt/module/flume/ [root@hadoop001 flume]# ls [root@hadoop001 flume]# touch log_position.json [root@hadoop001 flume]# chmod 777 log_position.json [root@hadoop001 flume]# cd … [root@hadoop001 module]# xsync flume/

将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop001的/opt/module目录下

分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

[root@hadoop001 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

选择一个窗口启动消费者

[root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-consumer --bootstrap-server hadoop001:9092 --from-beginning --topic topic_start

flume消费kafka数据到hdfs上

在CM管理页面hadoop003上Flume的配置中找到代理名称,修改hadoop003,为a1,配置文件为

## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有