CDH中kafka配置 | 您所在的位置:网站首页 › kafka的配置文件在哪 › CDH中kafka配置 |
文章目录
导入离线包开始安装flume消费kafka数据到hdfs上
导入离线包
主节点创建/opt/cloudera/csd目录 mkdir -p /opt/cloudera/csd 上传KAFKA-1.2.0.jar到/opt/cloudera/csd目录,并修改所有者和所有者的组 [root@hadoop001 csd]# chown cloudera-scm:cloudera-scm /opt/cloudera/csd/ -R 上传KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel、KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1到/opt/cloudera/parcel-repo目录,并修改KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1名称为KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha [root@hadoop001 parcel-repo]# mv /opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha1 /opt/cloudera/parcel-repo/KAFKA-4.0.0-1.4.0.0.p0.1-el6.parcel.sha ClouderManager中选择Parcel->检查Parcel->Kafka点击分配->激活 开始安装点击CM–> 添加服务 --> 选择kafka–>主机全选 进入这个页面的时候不要着急 点击CM,这时看到kafka,修改Kafka的堆大小为256M,不然不够用,默认是50M,修改全部为256M 查看kafka的topic [root@hadoop001 parcel-repo]# /opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/bin/kafka-topics --zookeeper hadoop001:2181 --list 进入到/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1目录下分别创建:启动日志主题、事件日志主题 创建启动日志主题 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --create --replication-factor 1 --partitions 1 --topic topic_start WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore (’_’) could collide. To avoid issues it is best to use either, but not both. Created topic “topic_start”. 创建事件日志主题 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --create --replication-factor 1 --partitions 1 --topic topic_event WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore (’_’) could collide. To avoid issues it is best to use either, but not both. Created topic “topic_event” 查看某个Topic的详情 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --zookeeper hadoop001:2181 --describe --topic topic_event Topic:topic_event PartitionCount:1 ReplicationFactor:1 Configs: Topic: topic_event Partition: 0 Leader: 46 Replicas: 46 Isr: 46 删除刚刚创建的topic [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --delete --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --topic topic_start [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-topics --delete --zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181 --topic topic_event 生产消息 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-producer --broker-list hadoop001:9092 --topic topic_start hello nihao 消费消息 [root@hadoop002 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-consumer --bootstrap-server hadoop001:9092 --from-beginning --topic topic_start [2019-07-21 18:07:45,771] WARN [Consumer clientId=consumer-1, groupId=console-consumer-39633] Error while fetching metadata with correlation id 2 : {topic_start=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) hello nihao 测试与flume是否连通 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# mkdir -p /opt/module/flume [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# cd /opt/module/flume/ [root@hadoop001 flume]# ls [root@hadoop001 flume]# touch log_position.json [root@hadoop001 flume]# chmod 777 log_position.json [root@hadoop001 flume]# cd … [root@hadoop001 module]# xsync flume/ 将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop001的/opt/module目录下 分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar [root@hadoop001 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 选择一个窗口启动消费者 [root@hadoop001 KAFKA-4.0.0-1.4.0.0.p0.1]# bin/kafka-console-consumer --bootstrap-server hadoop001:9092 --from-beginning --topic topic_start flume消费kafka数据到hdfs上在CM管理页面hadoop003上Flume的配置中找到代理名称,修改hadoop003,为a1,配置文件为 ## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2 |
CopyRight 2018-2019 实验室设备网 版权所有 |