在Windows10中使用Kafka采集数据保存到Redis数据库中 | 您所在的位置:网站首页 › kafkaconsumer › 在Windows10中使用Kafka采集数据保存到Redis数据库中 |
![]() 作者:厦门大学计算机系林子雨副教授 说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。 版权声明:本博客内容,未经同意,禁止转载。 操作系统:Windows10 Redis:7.0.8 一、Redis简介Redis是一个键值(key-value)存储系统,即键值对非关系型数据库,和Memcached类似,目前正在被越来越多的互联网公司采用。Redis作为一个高性能的键值数据库,不仅在很大程度上弥补了memcached这类键值存储的不足,而且在部分场合下可以对关系数据库起到很好的补充作用。Redis提供了Python、Ruby、Erlang、PHP客户端,使用很方便。 Redis支持存储的值(value)类型包括string(字符串)、list(链表)、set(集合)和zset(有序集合)。这些数据类型都支持push/pop、add/remove以及取交集、并集和差集等丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。与memcached一样,为了保证效率,Redis中的数据都是缓存在内存中的,它会周期性地把更新的数据写入磁盘,或者把修改操作写入追加的记录文件;此外,Redis还实现了主从(master-slave)同步。 二、安装Redis到github网站下载Redis for Windows安装文件Redis-7.0.8-Windows-x64.tar.gz(下载地址:https://github.com/redis-windows/redis-windows/releases/tag/7.0.8 ),解压缩到指定目录,比如解压缩到C盘根目录下(如图1所示)。
执行上述命令以后,会出现如图1所示结果。这个cmd窗口不能关闭,如果关闭,Redis服务就停止了。
启动以后的效果如图3所示。客户端连上服务器之后,会显示“127.0.0.1:6379>”的命令提示符信息,表示服务器的IP地址为127.0.0.1,端口为6379。
假设有三个表,即Student、Course和SC,三个表的字段(列)和数据如图4所示。
例如,把每个表的第一行记录保存到Redis数据中,需要执行的命令和执行结果如图5所示。
然后,针对这些已经录入的数据,下面将简单演示如何进行增删改查操作。Redis支持5种数据类型,不同数据类型,增删改查可能不同,这里用最简单的数据类型字符串作为演示。 1. 插入数据向Redis插入一条数据,只需要先设计好key和value,然后用set命令插入数据即可。例如,在Course表中插入一门新的课程“算法”,4学分,操作命令和结果如图6所示。
Redis并没有修改数据的命令,所以,如果在Redis中修改一条数据,只能采用一种变通的方式,即在使用set命令时,使用同样的key,然后用新的value值来覆盖旧的数据。例如,把刚才新添加的“算法”课程名称修改为“编译原理”,操作命令和结果如图7所示。
从图7可以看出,当存入中文时,查询结果是一个编码,并不是中文。此时, 新建一个cmd窗口,使用如下命令把值存储到一个文本文件1.txt中,则在1.txt中又可以看到中文的“编译原理”: > cd C:\Redis-7.0.8-Windows-x64 > redis-cli get Course:8:Cname > 1.txt 3. 删除数据Redis有专门删除数据的命令——del命令,命令格式为“del 键”。所以,如果要删除之前新增的课程“编译原理”,只需输入命令“del Course:8:Cname” ,如图8所示,当输入“del Course:8:Cname” 时,返回“1”,说明成功删除一条数据,当再次输入get命令时,输出为空,说明删除成功。
Redis最简单的查询方式是使用get命令,上面几个操作中都已经使用过get命令,这里不再赘述。 四、使用Python操作Redis要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下: > pip install redis打开IDLE,在命令提示符后面执行如下语句操作Redis: >>> import redis >>> r = redis.Redis(host='localhost', port=6379, db=0) >>> r.set('foo', 'bar') True >>> r.get('foo') b'bar' >>>也可以编写代码文件操作Redis,具体代码如下: import redis r = redis.Redis(host='localhost', port=6379, db=0) r.set('university', 'XMU') print(str(r.get('university'),"utf-8")) 五、使用Kafka采集数据保存到Redis数据库中 (一)任务描述在“C:\Python38\mycode”目录下有一个student.txt文件,里面包含如下两行内容: 95009,Xiaoming,Male,21,CS 95010,Xiaowang,Female,22,MA现在需要编写程序,使用Kafka采集student.txt中的数据,并对数据进行解析,然后保存到Redis数据库中。 (二)实现代码要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下: > pip install redis在“C:\Python38\mycode”目录下新建一个代码文件kafka-redis-producer.py,其内容如下: from kafka import KafkaProducer import json print("this is producer") producer = KafkaProducer(bootstrap_servers=['localhost:9092']) txtFilePath = 'student.txt' data = [] with open(txtFilePath, "rb") as txtfile: data = txtfile.readlines() for rec in data: # Topic为'redisTopic' 消息内容为读取的student.txt文件的一行 list_data = str(rec,"utf-8").replace('\\n','').replace('\n','').replace('\r','').split(',') dict_data = {"sno":list_data[0],"sname":list_data[1],"ssex":list_data[2],"sage":list_data[3],"sdept":list_data[4]} print(dict_data) value=json.dumps(dict_data).encode(encoding="utf-8") producer.send('redisTopic', key=b'123', value=value)在“C:\Python38\mycode”目录下新建一个代码文件Kafka-redis-consumer.py,其内容如下: import json import redis from kafka import KafkaConsumer pool=redis.ConnectionPool(host='localhost',port=6379,db=0) r=redis.Redis(connection_pool=pool, charset='UTF-8', encoding='UTF-8') #写入Redis def dict2redis(d): print("store to redis") r.set("Student:"+d["sno"]+":Sname",d["sname"]) r.set("Student:"+d["sno"]+":Ssex",d["ssex"]) r.set("Student:"+d["sno"]+":Sage",d["sage"]) r.set("Student:"+d["sno"]+":Sdept",d["sdept"]) consumer = KafkaConsumer('redisTopic',bootstrap_servers=['localhost:9092']) for message in consumer: dict=json.loads(str(message.value.decode('utf-8'))) print(dict) dict2redis(dict) (三)执行过程首先要确保Redis数据库服务已经启动。 在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务: > cd c:\kafka_2.12-2.4.0 > .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties打开第2个cmd窗口,然后执行下面命令启动Kafka服务: > cd c:\kafka_2.12-2.4.0 > .\bin\windows\kafka-server-start.bat .\config\server.properties打开第3个cmd窗口,执行如下命令运行kafka-redis-consumer.py: > cd C:\Python38\mycode > python kafka-redis-consumer.py打开IDLE,执行代码文件kafka-redis-producer.py,让生产者产生数据(如图9所示)。需要注意的是,运行kafka-redis-producer.py时,会自动创建redisTopic,不需要手动创建。这时,在消费者窗口,会打印出接收到的数据和程序执行情况(如图10所示)。
在Redis数据库中执行get命令查询数据,如图11所示,数据已经成功更新到Redis数据库中。
|
CopyRight 2018-2019 实验室设备网 版权所有 |