Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道 您所在的位置:网站首页 logstash停止读取文件 Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道

Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道

2023-12-02 14:55| 来源: 网络整理| 查看: 265

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我构建了从 Beats => Kafka => Logstash => Elasticsearch 的管道。在今天的文章中,我将描述从 Nodejs => Kafka => Logstash => Elasticsearch 这样的一个数据流。在之前的文章 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch” 中,我也展示了使用 Python 的方法。我的配置如下:

在上面的架构中,有几个重要的组件:

Kafka Server:这就是数据首先发布的地方。 Producer:扮演将数据发布到 Kafka topic 的角色。 在现实世界中,你可以具有任何可以为 kafka 主题生成数据的实体。 在我们的示例中,我们将生成伪造的用户注册数据。 Elasticsearch:这将充当将用户注册数据存储到其自身的数据库,并提供搜索及分析。 Logstash:Logstash 将扮演中间人的角色,在这里我们将从 Kafka topic 中读取数据,然后将其插入到 Elasticsearch 中。 Kibana:Kibana 将扮演图形用户界面的角色,它将以可读或图形格式显示数据。

为了演示的方便,你可以在地址下载演示文件 GitHub - liu-xiao-guo/data-pipeline8。我的文件目录是这样的:

1. $ pwd 2. /Users/liuxg/data/data-pipeline8 3. $ tree -L 3 4. . 5. ├── README.md 6. ├── docker-elk 7. │   ├── docker-compose.yml 8. │   └── logstash_pipeline 9. │   └── kafka-elastic.conf 10. ├── docker-kafka 11. │   └── kafka-docker-compose.yml 12. └── kafka_producer.js 1. $ pwd 2. /Users/liuxg/data/data-pipeline8/docker-elk 3. $ ls -al 4. total 16 5. drwxr-xr-x 5 liuxg staff 160 May 14 2021 . 6. drwxr-xr-x 8 liuxg staff 256 Mar 5 07:36 .. 7. -rw-r--r-- 1 liuxg staff 29 May 7 2021 .env 8. -rw-r--r-- 1 liuxg staff 1064 May 13 2021 docker-compose.yml 9. drwxr-xr-x 3 liuxg staff 96 May 13 2021 logstash_pipeline 10. $ vi .env 11. $ cat .env 12. ELASTIC_STACK_VERSION=8.6.2

上面的其它文件将在我下面的章节中介绍。如果你自己想通过手动的方式部署 Kafka 请参阅我的另外一篇文章 “使用 Kafka 部署 Elastic Stack”。

安装 Kafka,Zookeeper 及 Kafka Manager

我将使用 docker-compose 来进行安装。一旦安装好,我们可以看到:

Kafka 在 PORT 9092 侦听 Zookeeper 在 PORT 2181 侦听 Kafka Manager 侦听 PORT 9000 侦听

kafka-docker-compose.yml

1. version: "3" 2. services: 3. zookeeper: 4. image: zookeeper 5. restart: always 6. container_name: zookeeper 7. hostname: zookeeper 8. ports: 9. - 2181:2181 10. environment: 11. ZOO_MY_ID: 1 12. kafka: 13. image: wurstmeister/kafka 14. container_name: kafka 15. ports: 16. - 9092:9092 17. environment: 18. KAFKA_ADVERTISED_HOST_NAME: 192.168.0.3 19. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 20. kafka_manager: 21. image: hlebalbau/kafka-manager:stable 22. container_name: kakfa-manager 23. restart: always 24. ports: 25. - "9000:9000" 26. environment: 27. ZK_HOSTS: "zookeeper:2181" 28. APPLICATION_SECRET: "random-secret" 29. command: -Dpidfile.path=/dev/null

我们可以使用如下的命令来进行启动(在 Docker 运行的前提下):

docker-compose -f kafka-docker-compose.yml up

 一旦运行起来后,我们可以使用如下的命令来进行查看:

docker ps 1. $ docker ps 2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3. a4acc0730467 zookeeper "/docker-entrypoint.…" About a minute ago Up About a minute 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper 4. 02ec8e8a1e30 hlebalbau/kafka-manager:stable "/kafka-manager/bin/…" About a minute ago Up About a minute 0.0.0.0:9000->9000/tcp kakfa-manager 5. a85c32c0c08e wurstmeister/kafka "start-kafka.sh" About a minute ago Up About a minute 0.0.0.0:9092->9092/tcp kafka

我们发现 Kafka Manager 运行于 9000 端口。我们打开本地电脑的 9000 端口:

在上面它显示了一个默认的 topic,虽然不是我们想要的。

 

这样,我们就把 Kafka 上的 kafka_logstash topic 创建好了。

我们可以登录 kafka 容器来验证我们已经创建的 topic。我们使用如下的命令来找到 kafka 容器的名称:

docker ps -s 1. $ docker ps -s 2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES SIZE 3. de7453250529 hlebalbau/kafka-manager:stable "/kafka-manager/bin/…" 9 minutes ago Up 9 minutes 0.0.0.0:9000->9000/tcp kakfa-manager 117kB (virtual 427MB) 4. 65eba68350f1 zookeeper "/docker-entrypoint.…" 9 minutes ago Up 9 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper 33kB (virtual 288MB) 5. 3394868b23e9 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:9092->9092/tcp kafka 210kB (virtual 457MB)

上面显示 kafka 的容器名称为 wurstmeister/kafka。我们使用如下的命令来进行登录:

docker exec -it wurstmeister/kafka /bin/bash

然后我们在容器里 打入如下的命令:

1. $ docker exec -it kafka /bin/bash 2. root@3394868b23e9:/# kafka-topics.sh --list -zookeeper zookeeper:2181 3. __consumer_offsets 4. kafka_logstash

上面的命令显示已经存在的被创建的 kafka_logstash topic。我们可以使用如下的命令来向这个被创建的 topic 来发送数据:

kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning root@3394868b23e9:/# kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning Elastic Stack 安装

 我们接下来安装 Elastic Stack。同样地,我使用 docker-compose 来部署 Elasticsearch, Logstash 及 Kibana。你们可以参考我之前的文章 “Logstash:在 Docker 中部署 Logstash”。为了能够把数据传入到 Elasticsearch 中,我们需要在 Logstash 中配置一个叫做 kafka-elastic.conf 的配置文件:

kafka-elastic.conf

1. input { 2. kafka { 3. bootstrap_servers => "192.168.0.3:9092" 4. topics => ["kafka_logstash"] 5. } 6. } 8. output { 9. elasticsearch { 10. hosts => ["elasticsearch:9200"] 11. index => "kafka_logstash" 12. workers => 1 13. } 14. }

请注意:在上面的 192.168.0.3 为我自己电脑的本地 IP 地址。为了说明问题的方便,我们没有对来自 kafka 里的 registered_user 这个 topic 做任何的数据处理,而直接发送到 Elasticsearch 中。

我们的 docker-compose.yml 配置文件如下:

docker-compose.yml

1. version: "3.9" 2. services: 3. elasticsearch: 4. image: elasticsearch:${ELASTIC_STACK_VERSION} 5. container_name: elasticsearch 6. environment: 7. - discovery.type=single-node 8. - ES_JAVA_OPTS=-Xms1g -Xmx1g 9. - xpack.security.enabled=false 10. volumes: 11. - type: volume 12. source: es_data 13. target: /usr/share/elasticsearch/data 14. ports: 15. - target: 9200 16. published: 9200 17. networks: 18. - elastic 20. kibana: 21. image: kibana:${ELASTIC_STACK_VERSION} 22. container_name: kibana 23. ports: 24. - target: 5601 25. published: 5601 26. depends_on: 27. - elasticsearch 28. networks: 29. - elastic 31. logstash: 32. image: logstash:${ELASTIC_STACK_VERSION} 33. container_name: logstash 34. ports: 35. - 5200:5200 36. volumes: 37. - type: bind 38. source: ./logstash_pipeline/ 39. target: /usr/share/logstash/pipeline 40. read_only: true 41. networks: 42. - elastic 44. volumes: 45. es_data: 46. driver: local 48. networks: 49. elastic: 50. name: elastic 51. driver: bridge

为方便起见,在我的安装中,我没有配置安全。如果你需要为 Elasticsearch 设置安全的话,请参考我之前的文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x”。

我们使用如下的命令来启动 Elastic Stack。在 docker-compose.yml 所在的目录中打入如下的命令:

1. $ pwd 2. /Users/liuxg/data/data-pipeline8/docker-elk 3. $ ls 4. docker-compose.yml logstash_pipeline 5. $ docker-compose up

 等所有的 Elastic Stack 运行起来后,我们再次通过如下的命令来进行查看:

docker ps 1. $ docker ps 2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3. 3db5e4e6e23e kibana:8.6.2 "/bin/tini -- /usr/l…" About a minute ago Up About a minute 0.0.0.0:5601->5601/tcp kibana 4. 210b673dd89a logstash:8.6.2 "/usr/local/bin/dock…" About a minute ago Up About a minute 5044/tcp, 9600/tcp, 0.0.0.0:5200->5200/tcp logstash 5. 05c434edd823 elasticsearch:8.6.2 "/bin/tini -- /usr/l…" About a minute ago Up About a minute 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch 6. de7453250529 hlebalbau/kafka-manager:stable "/kafka-manager/bin/…" 51 minutes ago Up 51 minutes 0.0.0.0:9000->9000/tcp kakfa-manager 7. 65eba68350f1 zookeeper "/docker-entrypoint.…" 51 minutes ago Up 51 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper 8. 3394868b23e9 wurstmeister/kafka "start-kafka.sh" 51 minutes ago Up 51 minutes 0.0.0.0:9092->9092/tcp kafka

我们可以看到 Elasticsearch 运用于 9000 端口,Kibana 运行于 5601 端口,而 Logstash 运行 5000 端口。 我们可以访问 Kibana 的端口地址 5601: 

运行 Nodejs 应用导入模拟数据

我们接下来建立一个 Nodejs 的应用来模拟一些数据。首先,我们需要安装如下的包:

npm install kafkajs uuid randomstring random-mobile

我们在根目录下打入如下的命令:

npm init -y 1. $ npm init -y 2. Wrote to /Users/liuxg/data/data-pipeline8/package.json: 4. { 5. "dependencies": { 6. "kafkajs": "^2.2.4" 7. }, 8. "name": "data-pipeline8", 9. "description": "This is a sample code showing how to realize the following data pipeline:", 10. "version": "1.0.0", 11. "main": "kafka_producer.js", 12. "devDependencies": {}, 13. "scripts": { 14. "test": "echo \"Error: no test specified\" && exit 1" 15. }, 16. "repository": { 17. "type": "git", 18. "url": "git+https://github.com/liu-xiao-guo/data-pipeline8.git" 19. }, 20. "keywords": [], 21. "author": "", 22. "license": "ISC", 23. "bugs": { 24. "url": "https://github.com/liu-xiao-guo/data-pipeline8/issues" 25. }, 26. "homepage": "https://github.com/liu-xiao-guo/data-pipeline8#readme" 27. }

上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。

kafka_producer.js

1. // import { Kafka, logLevel } from "kafkajs"; 2. const { Kafka } = require('kafkajs'); 3. const logLevel = require("kafkajs"); 5. // import { v4 as uuidv4 } from "uuid"; 6. const { v4: uuidv4 } = require('uuid'); 8. console.log(uuidv4()); 10. const kafka = new Kafka({ 11. clientId: "random-producer", 12. brokers: ["localhost:9092"], 13. connectionTimeout: 3000, 14. }); 16. var randomstring = require("randomstring"); 17. var randomMobile = require("random-mobile"); 18. const producer = kafka.producer({}); 19. const topic = "kafka_logstash"; 21. const produce = async () => { 22. await producer.connect(); 23. let i = 0; 25. setInterval(async () => { 26. var event = {}; 27. try { 28. event = { 29. globalId: uuidv4(), 30. event: "USER-CREATED", 31. data: { 32. id: uuidv4(), 33. firstName: randomstring.generate(8), 34. lastName: randomstring.generate(6), 35. country: "China", 36. email: randomstring.generate(10) + "@gmail.com", 37. phoneNumber: randomMobile(), 38. city: "Hyderabad", 39. createdAt: new Date(), 40. }, 41. }; 43. await producer.send({ 44. topic, 45. acks: 1, 46. messages: [ 47. { 48. value: JSON.stringify(event), 49. }, 50. ], 51. }); 53. // if the message is written successfully, log it and increment `i` 54. console.log("writes: ", event); 55. i++; 57. } catch (err) { 58. console.error("could not write message " + err); 59. } 60. }, 5000); 61. }; 63. produce().catch(console.log)

我们运行上面的 Nodejs 代码:

npm start

 我们接下来在 Kibana 中来查看索引 kafka_logstash:

GET kafka_logstash/_count 1. { 2. "count": 103, 3. "_shards": { 4. "total": 1, 5. "successful": 1, 6. "skipped": 0, 7. "failed": 0 8. } 9. }

我们可以看到文档的数值在不断地增加。我们可以查看文档:

很显然我们收到了数据。从上面的结果中,我们可以看出来是一些非结构化的数据。我们可以针对 Logstash 的 pipeline 进行修改:

kafka-elastic.conf

1. input { 2. kafka { 3. bootstrap_servers => "192.168.0.3:9092" 4. topics => ["kafka_logstash"] 5. } 6. } 8. filter { 9. json { 10. source => "message" 11. } 13. mutate { 14. add_field => { 15. "id" => "%{[data][id]}" 16. } 17. add_field => { 18. "firstName" => "%{[data][firstName]}" 19. } 20. add_field => { 21. "lastName" => "%{[data][lastName]}" 22. } 23. add_field => { 24. "city" => "%{[data][city]}" 25. } 26. add_field => { 27. "country" => "%{[data][country]}" 28. } 29. add_field => { 30. "email" => "%{[data][email]}" 31. } 32. add_field => { 33. "phoneNumber" => "%{[data][phoneNumber]}" 34. } 35. add_field => { 36. "createdAt" => "%{[data][createdAt]}" 37. } 38. remove_field => ["data", "@version", "@timestamp", "message", "event", "globalId"] 39. } 40. } 42. output { 43. elasticsearch { 44. hosts => ["elasticsearch:9200"] 45. index => "kafka_logstash" 46. workers => 1 47. } 48. }

我们在 Kibana 中删除 kafka_logstash:

DELETE kafka_logstash

我们停止运行 Nodejs 应用。我们把运行 Elastic Stack 的 docker-compose 关掉,并再次重新启动它:

1. docker-compose down 2. docker-compose up

我们再次运行 Nodejs 应用:

 我们再次到 Kibana 中进行查看:

很显然,这次,我们看到结构化的输出文件。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有