在Windows10中使用Kafka采集数据保存到Redis数据库中 您所在的位置:网站首页 kafkaconsumer 在Windows10中使用Kafka采集数据保存到Redis数据库中

在Windows10中使用Kafka采集数据保存到Redis数据库中

2023-03-15 19:57| 来源: 网络整理| 查看: 265

大数据学习路线图

作者:厦门大学计算机系林子雨副教授 说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。 版权声明:本博客内容,未经同意,禁止转载。 操作系统:Windows10 Redis:7.0.8

一、Redis简介

Redis是一个键值(key-value)存储系统,即键值对非关系型数据库,和Memcached类似,目前正在被越来越多的互联网公司采用。Redis作为一个高性能的键值数据库,不仅在很大程度上弥补了memcached这类键值存储的不足,而且在部分场合下可以对关系数据库起到很好的补充作用。Redis提供了Python、Ruby、Erlang、PHP客户端,使用很方便。   Redis支持存储的值(value)类型包括string(字符串)、list(链表)、set(集合)和zset(有序集合)。这些数据类型都支持push/pop、add/remove以及取交集、并集和差集等丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。与memcached一样,为了保证效率,Redis中的数据都是缓存在内存中的,它会周期性地把更新的数据写入磁盘,或者把修改操作写入追加的记录文件;此外,Redis还实现了主从(master-slave)同步。

二、安装Redis

到github网站下载Redis for Windows安装文件Redis-7.0.8-Windows-x64.tar.gz(下载地址:https://github.com/redis-windows/redis-windows/releases/tag/7.0.8 ),解压缩到指定目录,比如解压缩到C盘根目录下(如图1所示)。 图1 Redis安装目录 打开一个cmd窗口,输入如下命令:

> cd C:\Redis-7.0.8-Windows-x64 > redis-server redis.conf

执行上述命令以后,会出现如图1所示结果。这个cmd窗口不能关闭,如果关闭,Redis服务就停止了。 图2 Redis启动界面 新建一个cmd窗口,输入如下命令启动Redis客户端:

> cd C:\Redis-7.0.8-Windows-x64 > redis-cli.exe -h 127.0.0.1 -p 6379

启动以后的效果如图3所示。客户端连上服务器之后,会显示“127.0.0.1:6379>”的命令提示符信息,表示服务器的IP地址为127.0.0.1,端口为6379。 图3 Redis客户端启动后的效果

三、Redis操作实例

假设有三个表,即Student、Course和SC,三个表的字段(列)和数据如图4所示。 (a)Student表 (b)Course表 (c)SC表 图4 三个表的字段和数据 Redis数据库是以的形式存储数据,把三个表的数据存入Redis数据库时,key和value的确定方法如下:

key=表名:主键值:列名 value=列值

例如,把每个表的第一行记录保存到Redis数据中,需要执行的命令和执行结果如图5所示。 图5 向Redis中插入数据 可以执行类似的命令,把三个表所有数据都插入到Redis数据库中,完整命令如下:

set Student:95001:Sname 李勇 set Student:95001:Ssex 男 set Student:95001:Sage 22 set Student:95001:Sdept CS set Student:95002:Sname 刘晨 set Student:95002:Ssex 女 set Student:95002:Sage 19 set Student:95002:Sdept IS set Student:95003:Sname 王敏 set Student:95003:Ssex 女 set Student:95003:Sage 18 set Student:95003:Sdept MA set Student:95004:Sname 张立 set Student:95004:Ssex 男 set Student:95004:Sage 19 set Student:95004:Sdept IS set Course:1:Cname 数据库 set Course:1:Credit 4 set Course:2:Cname 数学 set Course:2:Credit 2 set Course:3:Cname 信息系统 set Course:3:Credit 4 set Course:4:Cname 操作系统 set Course:4:Credit 3 set Course:5:Cname 数据结构 set Course:5:Credit 4 set Course:6:Cname 数据处理 set Course:6:Credit 2 set Course:7:Cname PASCAL语言 set Course:7:Credit 4 set SC:95001:1:Grade 92 set SC:95001:2:Grade 85 set SC:95001:3:Grade 88 set SC:95002:2:Grade 90 set SC:95002:3:Grade 80

然后,针对这些已经录入的数据,下面将简单演示如何进行增删改查操作。Redis支持5种数据类型,不同数据类型,增删改查可能不同,这里用最简单的数据类型字符串作为演示。

1. 插入数据

向Redis插入一条数据,只需要先设计好key和value,然后用set命令插入数据即可。例如,在Course表中插入一门新的课程“算法”,4学分,操作命令和结果如图6所示。 图6 插入数据

2. 修改数据

Redis并没有修改数据的命令,所以,如果在Redis中修改一条数据,只能采用一种变通的方式,即在使用set命令时,使用同样的key,然后用新的value值来覆盖旧的数据。例如,把刚才新添加的“算法”课程名称修改为“编译原理”,操作命令和结果如图7所示。 图7 修改数据

从图7可以看出,当存入中文时,查询结果是一个编码,并不是中文。此时, 新建一个cmd窗口,使用如下命令把值存储到一个文本文件1.txt中,则在1.txt中又可以看到中文的“编译原理”:

> cd C:\Redis-7.0.8-Windows-x64 > redis-cli get Course:8:Cname > 1.txt 3. 删除数据

Redis有专门删除数据的命令——del命令,命令格式为“del 键”。所以,如果要删除之前新增的课程“编译原理”,只需输入命令“del Course:8:Cname” ,如图8所示,当输入“del Course:8:Cname” 时,返回“1”,说明成功删除一条数据,当再次输入get命令时,输出为空,说明删除成功。 图8 删除数据

4. 查询数据

Redis最简单的查询方式是使用get命令,上面几个操作中都已经使用过get命令,这里不再赘述。

四、使用Python操作Redis

要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下:

> pip install redis

打开IDLE,在命令提示符后面执行如下语句操作Redis:

>>> import redis >>> r = redis.Redis(host='localhost', port=6379, db=0) >>> r.set('foo', 'bar') True >>> r.get('foo') b'bar' >>>

也可以编写代码文件操作Redis,具体代码如下:

import redis r = redis.Redis(host='localhost', port=6379, db=0) r.set('university', 'XMU') print(str(r.get('university'),"utf-8")) 五、使用Kafka采集数据保存到Redis数据库中 (一)任务描述

在“C:\Python38\mycode”目录下有一个student.txt文件,里面包含如下两行内容:

95009,Xiaoming,Male,21,CS 95010,Xiaowang,Female,22,MA

现在需要编写程序,使用Kafka采集student.txt中的数据,并对数据进行解析,然后保存到Redis数据库中。

(二)实现代码

要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下:

> pip install redis

在“C:\Python38\mycode”目录下新建一个代码文件kafka-redis-producer.py,其内容如下:

from kafka import KafkaProducer import json print("this is producer") producer = KafkaProducer(bootstrap_servers=['localhost:9092']) txtFilePath = 'student.txt' data = [] with open(txtFilePath, "rb") as txtfile: data = txtfile.readlines() for rec in data: # Topic为'redisTopic' 消息内容为读取的student.txt文件的一行 list_data = str(rec,"utf-8").replace('\\n','').replace('\n','').replace('\r','').split(',') dict_data = {"sno":list_data[0],"sname":list_data[1],"ssex":list_data[2],"sage":list_data[3],"sdept":list_data[4]} print(dict_data) value=json.dumps(dict_data).encode(encoding="utf-8") producer.send('redisTopic', key=b'123', value=value)

在“C:\Python38\mycode”目录下新建一个代码文件Kafka-redis-consumer.py,其内容如下:

import json import redis from kafka import KafkaConsumer pool=redis.ConnectionPool(host='localhost',port=6379,db=0) r=redis.Redis(connection_pool=pool, charset='UTF-8', encoding='UTF-8') #写入Redis def dict2redis(d): print("store to redis") r.set("Student:"+d["sno"]+":Sname",d["sname"]) r.set("Student:"+d["sno"]+":Ssex",d["ssex"]) r.set("Student:"+d["sno"]+":Sage",d["sage"]) r.set("Student:"+d["sno"]+":Sdept",d["sdept"]) consumer = KafkaConsumer('redisTopic',bootstrap_servers=['localhost:9092']) for message in consumer: dict=json.loads(str(message.value.decode('utf-8'))) print(dict) dict2redis(dict) (三)执行过程

首先要确保Redis数据库服务已经启动。 在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务:

> cd c:\kafka_2.12-2.4.0 > .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

打开第2个cmd窗口,然后执行下面命令启动Kafka服务:

> cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-server-start.bat .\config\server.properties

打开第3个cmd窗口,执行如下命令运行kafka-redis-consumer.py:

> cd C:\Python38\mycode > python kafka-redis-consumer.py

打开IDLE,执行代码文件kafka-redis-producer.py,让生产者产生数据(如图9所示)。需要注意的是,运行kafka-redis-producer.py时,会自动创建redisTopic,不需要手动创建。这时,在消费者窗口,会打印出接收到的数据和程序执行情况(如图10所示)。 图9 生成数据 图10 消费者窗口 打开第4个cmd窗口,输入如下命令启动Redis客户端:

> cd C:\Redis-7.0.8-Windows-x64 > redis-cli.exe -h 127.0.0.1 -p 6379

在Redis数据库中执行get命令查询数据,如图11所示,数据已经成功更新到Redis数据库中。 图11 查询Redis数据库中的数据



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有