【SpringBoot+HBase 】快速入门 您所在的位置:网站首页 HBASE介绍入门 【SpringBoot+HBase 】快速入门

【SpringBoot+HBase 】快速入门

2024-07-15 11:04| 来源: 网络整理| 查看: 265

SpringBoot+HBase 快速入门文档 前言一、HBase常用shell命令1.1 基础命令1.2 表的基本操作1.3 表的数据增删改查指令1.3.1 HBase数据模型1.3.2 表的增删改查指令1.3.3 过滤器条件查询 二、JavaAPI操作Hbase2.1 Maven依赖2.2 复制HBase和Hadoop配置文件2.3 创建Hbase连接以及admin管理对象2.4 使用JavaAPI创建一个表2.5 插入一条数据到HBase表中 三. 千万级数据hbase查询demo3.1 案例背景3.1 设计rowkey规则3.2 预分区3.3 模拟数据存储3.4 创建hbase表数据映射对象3.5 通过rowkey查询某一行数据3.6 获取表中所有数据3.7 过滤器组合查询3.7.1 模糊查询3.7.2 可降采样的查询指定传感器某一时间范围内的所有数据3.7.3 可降采样查询某一时间范围内多个传感器的值 四、其他小知识后记

前言

本文是在已经搭建好的hbase数据库基础上进行的JavaAPI开发。HBASE搭建可以参考下面这篇文章,如果在自己电脑上搭建,配置不够用,只搭建一个单节点的hbase也行。 WMware上搭建基于Ubuntu18.04的Zookeeper+Hadoop+HBase集群 本文是一个单节点的hbase,搭建在2核4G的Ubuntu18.04系统的云服务器上。如下面进程所示,Zookeeper+Hadoop+HBase都是搭建在该服务器上。

root@whut:~/bin# jps 1232 jar 3457 NameNode 4065 SecondaryNameNode 5251 JobHistoryServer 4452 ResourceManager 5892 HRegionServer 2967 QuorumPeerMain 4697 NodeManager 5626 HMaster 7340 Jps 27213 jar 3693 DataNode

通过http://ip:16010查看hbase的状态,可以发现该hbase只有一个节点。如果是云服务器搭建,记得开放相应16010端口。 在这里插入图片描述 题外话:hbase和hadoop的运行机制和原理比较复杂,但是作为入门来说,只需要把hbase当做一个像MySql一样的数据库,会使用即可,等入门以后在了解原理也不迟。

一、HBase常用shell命令

本章是通过hbase自带的shell命令,对数据表完成常见的增删改查操作,同时有助于后文对Hbase的JavaAPI的理解。

进入hbase命令行通过以下命令 root@whut:~/bin# hbase shell

输入指令后,进入如下所示界面 在这里插入图片描述

1.1 基础命令 查看hbase版本 hbase(main):001:0> version 1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr 6 19:36:54 PDT 2017 查看服务器状态 hbase(main):002:0> status 1 active master, 0 backup masters, 1 servers, 1.2 表的基本操作 查看所有表 hbase(main):001:0> list TABLE WD_TABLE 1 row(s) in 0.2690 seconds 创建表

命令行格式:create tablename 列族1,列族2,… 例如:创建表名:数据表(DATA_TABLE),温度数据(WD_TABLE),应变数据(YB_DATA)

hbase(main):001:0> create 'DATA_TABLE','WD_DATA','YB_DATA' 0 row(s) in 2.5610 seconds => Hbase::Table - DATA_TABLE 查看表的基本信息 hbase(main):003:0> describe 'DATA_TABLE' Table DATA_TABLE is ENABLED DATA_TABLE COLUMN FAMILIES DESCRIPTION {NAME => 'WD_DATA', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_S COPE => '0'} {NAME => 'YB_DATA', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_S COPE => '0'} 2 row(s) in 0.1220 seconds

可以看到两个列族,‘WD_DATA’,‘YB_DATA’

检查某个表是否存在 hbase(main):005:0> exists 'DATA_TABLE' Table DATA_TABLE does exist 0 row(s) in 0.0140 seconds 禁用/启用表 禁用表 disable ‘DATA_TABLE ’ 检查表是否被禁用 is_disabled ‘DATA_TABLE ’ 启用表 enable ‘DATA_TABLE ’ 检查表是否被启用 is_enabled ‘DATA_TABLE ’ 表的删除

删除表之前,需要先禁用表,否则会报error

hbase(main):008:0> drop 'DATA_TABLE' ERROR: Table DATA_TABLE is enabled. Disable it first. hbase(main):010:0> disable 'DATA_TABLE' 0 row(s) in 2.2430 seconds hbase(main):011:0> drop 'DATA_TABLE' 0 row(s) in 1.2460 seconds hbase(main):017:0> list TABLE 0 row(s) in 0.0140 seconds 清空表的数据

实际是做了以下3件事:禁用表-删除表-创建表(因此不会删除表结构,如列簇)

语法格式:truncate 表名

truncate 'DATA_TABLE' 1.3 表的数据增删改查指令

在进行增删改查之前,我们首先要简单了解一下hbase的数据表结构,hbase是非关系型数据库(NoSQL),数据存储以键值对形式。 这里以一个公司员工表为案例来讲解,此表中包含员工基本信息(员工姓名、年龄),员工详细信息(工资、角色),以及时间戳。整体表结构如下 在这里插入图片描述

如上,每一行有一个RowKey用于唯一地标识和定位行,各行数据按RowKey的字典序排列。其中ImployeeBasicInfoCLF和DetailInfoCLF是两个列族,列族下又有多个具体列。(员工基本信息列族:姓名、年龄。详细信息列族:薪水、角色)

1.3.1 HBase数据模型

命名空间 命名空间是对表的逻辑分组,不同的命名空间类似于关系型数据库中的不同的Database数据库。利用命名空间,在多租户场景下可做到更好的资源和数据隔离。

表 对应于关系型数据库中的一张张表,HBase以“表”为单位组织数据,表由多行组成。

行 行由一个RowKey和多个列族组成,一个行有一个RowKey,用来唯一标示。

列族 每一行由若干列族组成,每个列族下可包含多个列,如上ImployeeBasicInfoCLF和DetailInfoCLF即是两个列族。列族是列共性的一些体现。注意:物理上,同一列族的数据存储在一起的。

列限定符 列由列族和列限定符唯一指定,像如上的name、age即是ImployeeBasicInfoCLF列族的列限定符。

单元格 单元格由RowKey、列族、列限定符唯一定位,单元格之中存放一个值(Value)和一个版本号。

时间戳 单元格内不同版本的值按时间倒序排列,最新的数据排在最前面

1.3.2 表的增删改查指令 增加列簇

命令行格式:alter tablename 列族1,列族2,… 在DATA_TABLE原有的WD_DATA,YB_DATA的基础上再增加一个列簇ZD_DATA

alter 'DATA_TABLE','ZD_TABLE' 删除列簇

命令行格式:alter tablename { NAME => '列族',METHOD => 'delete ' }

alter 'DATA_TABLE',{ NAME =>'YB_DATA', METHOD => 'delete' } 插入数据

命令行格式:put ‘表名’,行键名’,‘列族:列名’,‘值’ 注:如果待插入数据的表名、行键值、列族名、列名与原有数据相同,则是更新操作

put 'DATA_TABLE', 'rowkey_one','WD_DATA:value','22.3' put 'DATA_TABLE', 'rowkey_one','WD_DATA:time','2021-01-01' put 'DATA_TABLE', 'rowkey_one','YB_DATA:value','25.6' put 'DATA_TABLE', 'rowkey_one','YB_DATA:time','2021-01-02' put 'DATA_TABLE', 'rowkey_two','WD_DATA:value','25.6' put 'DATA_TABLE', 'rowkey_two','WD_DATA:time','2021-01-11' put 'DATA_TABLE', 'rowkey_two','YB_DATA:value','14.5' put 'DATA_TABLE', 'rowkey_two','YB_DATA:localtion','2021-02-02' 查看表中所有数据

插入完数据后,使用scan 表名查看数据。

hbase(main):017:0> scan 'DATA_TABLE' ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3 rowkey_one column=YB_DATA:time, timestamp=1635040790205, value=2021-01-02 rowkey_one column=YB_DATA:value, timestamp=1635040789577, value=25.6 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11 rowkey_two column=WD_DATA:value, timestamp=1635040680494, value=25.6 rowkey_two column=YB_DATA:localtion, timestamp=1635040794691, value=2021-02-02 rowkey_two column=YB_DATA:value, timestamp=1635040793862, value=14.5

scan是扫描整个表(包括行键rowkey和所有的列簇以及里面的key-value数据),当数据量比较大时候,scan效率较低,想要统计表中的数据行数,可以用count 表名来统计。

hbase(main):030:0> count 'DATA_TABLE' 2 row(s) in 0.0220 seconds

或者可以使用LIMIT限制扫描条数

scan 'DATA_TABLE',{ LIMIT => 1} 查询一行记录 获得指定行的数据

命令行格式:get 表名, 行键

hbase(main):036:0> get 'DATA_TABLE','rowkey_one' COLUMN CELL WD_DATA:time timestamp=1635040680226, value=2021-01-01 WD_DATA:value timestamp=1635040670088, value=22.3 YB_DATA:time timestamp=1635040790205, value=2021-01-02 YB_DATA:value timestamp=1635040789577, value=25.6 获得指定行和列簇的数据

命令行格式:get 表名, 行键,列簇

hbase(main):002:0> get 'DATA_TABLE','rowkey_one','WD_DATA' COLUMN CELL WD_DATA:time timestamp=1635040680226, value=2021-01-01 WD_DATA:value timestamp=1635040670088, value=22.3 1 row(s) in 0.0130 seconds 获得指定列簇的数据

命令行格式:get 表名, { COLUMN => 列簇}

hbase(main):003:0> scan 'DATA_TABLE',{ COLUMN => 'WD_DATA' } ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11 rowkey_two column=WD_DATA:value, timestamp=1635040680494, value=25.6 1.3.3 过滤器条件查询

hbase的过滤器种类非常多,将不同的过滤器组合使用可以实现非常丰富的功能,这里只通过hbase shell指令实现几个简单的过滤器,以便更好的理解后文中JavaAPI操作hbase。下图是hbase常见的过滤器 在这里插入图片描述

过滤器语法: scan '表名', { Filter => "过滤器(比较运算符, '比较器表达式')” } 上一小节使用的get指令只能获得一行数据,如果想要获得多行数据,需要用scan命令来对表进行扫描。 在这里插入图片描述

值过滤器 hbase(main):004:0> scan 'DATA_TABLE',FILTER=>"ValueFilter(=,'binary:22.3')" ROW COLUMN+CELL rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3 查询列名前缀模糊过滤 hbase(main):006:0> scan 'DATA_TABLE',FILTER=>"ColumnPrefixFilter('tim')" ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=YB_DATA:time, timestamp=1635040790205, value=2021-01-02 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11 2 row(s) in 0.0250 seconds FILTER中支持多个过滤条件通过括号、AND和OR进行组合: hbase(main):002:0> scan 'DATA_TABLE',FILTER=>"ColumnPrefixFilter('valu') AND ValueFilter(=,'binary:22.3')" ROW COLUMN+CELL rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3

更多过滤器知识,可以查看以下博客: Hbase过滤器 HBase过滤器笔记

二、JavaAPI操作Hbase 2.1 Maven依赖

其中Hbase的Maven依赖最好和其版本号一致

org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.apache.hbase hbase-server 2.3.6 org.apache.hbase hbase-client 2.3.6 2.2 复制HBase和Hadoop配置文件

使用finalShell或者其他远程工具,将以下三个配置文件复制到Springboot项目下的resource目录中,这3个文件在hadoop的安装目录下的 $HADOOP_HOME/etc/hbase/conf中。

hbase-site.xmlcore-site.xmllog4j.properties root@whut:/opt/module/hbase/conf# sz core-site.xml hbase-site.xml log4j.properties

注意:请确认配置文件中的服务器节点hostname/ip地址配置正确。如下图使用了whut代替ip,要在修改本机的hosts文件中做好映射。 hosts文件在:C:\Windows\System32\drivers\etc目录下。 在这里插入图片描述

2.3 创建Hbase连接以及admin管理对象

实现步骤:

使用HbaseConfiguration.create()创建Hbase配置使用ConnectionFactory.createConnection()创建Hbase连接要创建表,需要基于Hbase连接获取admin管理对象使用admin.close、connection.close关闭连接

使用单例模式,创建一个HBaseConfig类管理admin()对象和Table对象。

@Slf4j public class HBaseConfig { public static Configuration configuration =null; public static Connection connection =null; public static Admin admin=null; /* 这里使用单例模式 * 因为配置只需要读取一次就够了,使用static代码块,读取一次, * HBaseConfiguration.create() 该方法会自动读取 resources:下的hbase-site.xml文件 * */ static { try { configuration=HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); admin=connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); log.warn("HBase 连接失败"); } } public static Admin getAdmin() { return admin; } public static Connection getConnection() { return connection; } public static Configuration getConfiguration() { return configuration; } /* * 关闭资源 * */ public static void close(){ if(admin!=null) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } if(connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }

Connection是连接对象,可以获得HBase的管理对象Admin,以及’Table’表管理对象 Admin操作对象是数据库表。用于对表的创建、修改、删除。Admin对象可以获得表数据管理对象Table Table操作对象是表里的数据。用于对表的数据进行增删改查。

2.4 使用JavaAPI创建一个表 首先创建一个工具类,该工具类包含3个方法: 判断表是否存在创建一个表向指定表的指定列簇中添加一行数据 /** * @author WZH * @create 2021-10-24 18:17 * @desc hbase工具类 **/ @Slf4j public class HBaseUtils { private static Admin admin =HBaseConfig.getAdmin(); private static Connection connection =HBaseConfig.getConnection(); /** * 判断表是否存在 * * @return {{@link null}} * @author WZH * @date 2021/9/14 11:00 */ public static boolean isTableExist(String tableName){ try { return admin.tableExists(TableName.valueOf(tableName)); } catch (IOException e) { e.printStackTrace(); log.warn(tableName+"不存在"); return false; } } /** * * @param tableName * @param columnFamily * @return {} * @author WZH * @date 2021/9/14 11:05 */ public static void createTable(String tableName, String... columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //判断表是否存在 if(isTableExist(tableName)){ System.out.println("表" + tableName + "已存在"); }else{ //创建表属性对象,表名需要转字节 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); //创建多个列族 for(String cf : columnFamily){ descriptor.addFamily(new HColumnDescriptor(cf)); } //根据对表的配置,创建表 admin.createTable(descriptor); System.out.println("表" + tableName + "创建成功!"); } } /** * 插入一行数据 * @param tableName 表名 * @param rowKey 行键 * @param columnFamily 列簇 * @param column 列名 * @param value 值 * @return {} * @author WZH * @date 2021/9/14 11:14 */ public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException{ //创建Table对象 Table table = connection.getTable(TableName.valueOf(tableName)); //向表中插入数据 Put put = new Put(Bytes.toBytes(rowKey)); //向Put对象中组装数据 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); // 添加数据到table table.put(put); table.close(); } } 在main()方法,测试创建一个包含一个info列簇的Student表 public static void main(String[] args) { System.setProperty("hadoop.home.dir", "E:\\test"); try { HBaseUtils.createTable("Students","info"); } catch (IOException e) { e.printStackTrace(); } }

然后在hbase shell脚本中,通过list语句,查看表Student是否创建成功。

hbase(main):004:0> list TABLE DATA_TABLE Students 2 row(s) in 0.0140 seconds => ["DATA_TABLE", "Students"]

通过运行结果发现,表格已经成功创建。 注:程序运行可能会报HADOOP_HOME没有配置的error,这是因为HADOOP不是在win10本地,如果不影响表创建,可以不去管它。

2.5 插入一条数据到HBase表中 public static void main(String[] args) { try { HBaseUtils.addRowData("DATA_TABLE", "WD001", "WD_DATA", "value", "20"); } catch (IOException e) { e.printStackTrace(); } }

执行完程序后,在hbase shell中输入 scan 'DATA_TABLE'检查是否有插入的数据。

hbase(main):005:0> scan 'DATA_TABLE' ROW COLUMN+CELL WD001 column=WD_DATA:value, timestamp=1635077761411, value=20 三. 千万级数据hbase查询demo

为了更快上手hbase的操作,本文通过一个千万级数据查询demo来讲解。虽然可能并没有存千万级的数据,但是本文的demo是通过一个小项目提取出来的,该项目是达到了亿级的数据量,因此对于千万级的数据查询,下面的方法也是适用的。 demo代码通过以下地址可以获取: https://gitee.com/whutwzh/hbase-getting-started.git

3.1 案例背景

现有A厂有5种温度传感器,传感器编号分别是:WD000,WD001,WD002,WD003,WD004。 现在需要对传感器数据进行存储,并暴露查询接口,给前端进行数据展示。 传感器采集的每一条数据包含:温度的时间戳timeStamp,温度的值value,以及传感器IDsensorId

3.1 设计rowkey规则

一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的region 中。 当数据量到达一定程度时候,如一千万条数据,通过key-value来查询,hbase会扫描整个表的key-value,这是一个非常耗时间的操作,可能会达到30秒甚至更多。一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的region 中。 而合理设计一个rowkey,将要查询的’key-value’信息存入到rowkey中(比如将传感器的ID存在rowkey中),然后仅仅扫描rowkey这一列,效率会非常高,仅需0.几秒就可以检索到满足条件的数据。

本文rowkey=传感器IDsensorID+时间time

3.2 预分区 什么是分区? 当一个HBase表的存储region过大(达到默认10GB)时,表将会进行split,分裂为2个分区。表在进行split的时候,会耗费大量的资源,频繁的分区对HBase的性能有巨大的影响。什么是预分区? 简单来说,就是在hbase数据表,刚建立的时候就进行分区。预分区原理 hbase存储数据是按照rowkey的字典顺序存储,每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护为什么要预分区? 防止数据热写 集中写到某一台或者几台机器上,给服务器造成太大压力以及更严重后果减少hbase表的split带来的资源消耗增加数据读写效率负载均衡,防止数据倾斜方便集群容灾调度region优化Map数量

hbase表预分区shell指令语法: create '表名','列簇1','列簇2'...,SPLITS => ['rowkey1','rowkey2','rowkey3'...]

# 通过rowket的前缀,分为5个范围 WD000-WD001 WD001-WD002 WD002-WD003 WD004-WD005 create 'WD_TABLE','WD_DATA',SPLITS => ['WD000','WD001','WD002','WD003','WD004']

创建完后,可以通过http://hbase所在的ip:16010查看创建的hbase表。 在这里插入图片描述 点击WD_DATA,查看表的分区信息,可以发现表已经按照rowkey分成了几个Region了。 在这里插入图片描述

由于本文是一个节点,所以所有RegionServer都在whut这个节点上,当hbase是集群搭建时候,就会把不同的RegionServer分布在不同的节点上。比如下图所示3节点的hbase集群,10个RegionServer被自动分布在3个节点。 在这里插入图片描述

3.3 模拟数据存储

首先模拟一个生产者,向hbase数据表中存入数据,通过main()方法调用一下类

/** * 生产HBase需要的数据 * @author WZH * @create 2021-10-24 20:58 * @desc **/ public class DataProducer { public static final String TABLE_NAME = "WD_TABLE"; public static final String Column_FAMILY = "WD_DATA"; private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS"); public void dataProducer() { // 5种温度传感器 WD000 WD001 WD002 WD003 WD004 int SensorType =5; int count=1000; for (int j = 0; j String timeStamp = simpleDateFormat.format(new Date()); String sensorId ="WD00"+j; String rowkey = sensorId + timeStamp; try { HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"sensorId",sensorId); HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"time",timeStamp); HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"value",String.valueOf(20+Math.random()*3)); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { DataProducer dataProducer = new DataProducer(); dataProducer.dataProducer(); } }

执行上诉的main()方法后,在hbase shell中通过count命令查看数据表中的数据

hbase(main):062:0> count 'WD_TABLE' Current count: 1000, row: WD00020211025111818932 Current count: 2000, row: WD00120211025111906576 Current count: 3000, row: WD00220211025111953843 Current count: 4000, row: WD00320211025112041904 Current count: 5000, row: WD00420211025112130291 5000 row(s) in 0.3090 seconds => 5000 3.4 创建hbase表数据映射对象

该类用于hbase查询时候封装数据。

/** * 传感器数据封装对象 * @author WZH * @create 2021-10-25 11:20 * @desc **/ @Data public class DataPojo { private String SensorId; private String value; private String time; } 3.5 通过rowkey查询某一行数据

首先创建一个HBaseDao类,用于存放相关的增删改查方法。后续方法如果没有特殊说明,都是在这个类中。

/** * HBaseDao 存放各种查询方法 * @author WZH * @create 2021-10-24 20:53 * @desc **/ @Slf4j @Repository public class HBaseDao { private Connection connection=HBaseConfig.getConnection(); /** * 通过rowke查询某一行的数据 * @param * @return {} * @author WZH */ public void queryByRowKey(String tableName,String rowKey) throws IOException { // 1. 获取操作表的table对象 Table table = connection.getTable(TableName.valueOf(tableName)); // 2. 创建get对象 Get get = new Get(Bytes.toBytes(rowKey)); // 3. 执行get请求 Result result = table.get(get); // 4. 查看result List cells = result.listCells(); for (Cell cell : cells) { // 打印列蔟名 System.out.print("行键:"+ Bytes.toString(CellUtil.cloneRow(cell))+" "); System.out.print("列簇:"+Bytes.toString(CellUtil.cloneFamily(cell))+" "); System.out.print("列:"+ Bytes.toString(CellUtil.cloneQualifier(cell))+" "); System.out.println("值:"+ Bytes.toString(CellUtil.cloneValue(cell))+" "); } table.close(); } }

然后在main()函数中测试

public static void main(String[] args) { HBaseDao hBaseDao =new HBaseDao(); try { hBaseDao.queryByRowKey("WD_TABLE","WD00020211025111818932"); } catch (IOException e) { e.printStackTrace(); } }

输出结果如下:

行键:WD00020211025111818932 列簇:WD_DATA 列:time 值:20211025111818932 行键:WD00020211025111818932 列簇:WD_DATA 列:value 值:21.913362703717333 3.6 获取表中所有数据 /** * 获取所有数据 * @param tableName * @return {} * @author WZH * @date 2021/9/14 12:40 */ public List getAllRows(String tableName) throws IOException{ Table hTable = connection.getTable(TableName.valueOf(tableName)); //得到用于扫描region的对象 Scan scan = new Scan(); //使用HTable得到resultcanner实现类的对象 ResultScanner resultScanner = hTable.getScanner(scan); List dataList = DataProcessUtils.getDataList(resultScanner); return dataList; }

这里为了简化代码,将数据封装的工作,通过一个DataProcessUtils类进行封装。

/** * @author WZH * @create 2021-09-19 14:12 * @desc HBase数据处理、封装工具类 **/ public class DataProcessUtils { public static List getDataList(ResultScanner scanner) { //创建一个List用于保存数据 List dataList = new ArrayList(); for (Result result : scanner) { List cellList = result.listCells(); DataPojo dataPojo = new DataPojo(); // 迭代单元格列表 for (Cell cell : cellList) { // 打印列蔟名 String cell_key = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); String cell_value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); if ("sensorId".equals(cell_key)) { dataPojo.setSensorId(cell_value); } if ("time".equals(cell_key)) { dataPojo.setTime(cell_value); } if ("value".equals(cell_key)) { dataPojo.setValue(String.format("%.3f", Float.valueOf(cell_value))); } } dataList.add(dataPojo); } return dataList; } }

在main()方法中进行测试

public static void main(String[] args) { HBaseDao hBaseDao =new HBaseDao(); try { List dataList = hBaseDao.getAllRows("WD_TABLE"); System.out.println(dataList.get(0)); } catch (IOException e) { e.printStackTrace(); } } }

这里我们只打印数组长度和第一个元素,打印结果如下:

一共查询到了:5000个数据 DataPojo(sensorId=WD000, value=20.282, time=20211025150229253) 3.7 过滤器组合查询 3.7.1 模糊查询

通过RowFileter和正则匹配过滤器,实现模糊查询符合条件的数据。

/** * 通过rowkey 模糊查询某个时间的某个表格的数据 * @param time :时间字符串 形式:yyyyMMddHH... 2021091913 * @return {{@link List}} * @author WZH * @date 2021/9/19 14:08 */ public List fuzzyQueryByRowKey(String time,String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); // 通过正则匹配 rowkey RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*" + time + ".*")); scan.setFilter(rowFilter); ResultScanner scanner = table.getScanner(scan); //通过工具类来封装hbase查询到的数据 List dataList = DataProcessUtils.getDataList(scanner); return dataList; }

main()方法中测试:

public static void main(String[] args) { HBaseDao hBaseDao =new HBaseDao(); try { //查询2021-10-25 15:03:46的数据 List dataList = hBaseDao.fuzzyQueryByRowKey("20211025150346", "WD_TABLE"); System.out.println("一共查询到了:"+dataList.size()+"个数据"); System.out.println(dataList.get(0)); } catch (IOException e) { e.printStackTrace(); } }

查询结果:

一共查询到了:14个数据 DataPojo(sensorId=WD000, value=20.709, time=20211025150346028) 3.7.2 可降采样的查询指定传感器某一时间范围内的所有数据 /** * 通过rowkey 查询某一时间段的数据 * @return {{@link List}} * @author WZH * @date 2021/9/22 15:21 */ public List randomDownSampleQuery(String startTime, String endTime, String tableName, String sensorId, float SampleRate) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); //设置查询范围 scan.withStartRow(Bytes.toBytes(sensorId+startTime)); scan.withStopRow(Bytes.toBytes(sensorId+endTime)); // 随机过滤,实现降采样 需要传入一个 0-1之间的float型数字. RandomRowFilter randomRowFilter = new RandomRowFilter(new Float(SampleRate)); scan.setFilter(randomRowFilter); ResultScanner scanner = table.getScanner(scan); List dataList = DataProcessUtils.getDataList(scanner); // 关闭资源 scanner.close(); table.close(); log.info("一共查询了"+dataList.size()+"记录"); return dataList; }

这里的降采样是通过一个随机过滤器RandomRowFilter,通过阅读源码可知,该过滤器需要传入一个0-1之间的float型数字,这个数字就是筛选的比例。

当小于0就是不过滤,直接全部返回当大于1,所有的都pass掉当在0-1之间,它回通过random产生一个随机数,通过对比随机数和传入的数字大小,来决定是否过滤。

在main()方法中测试,查询202110251503-202110251504时间范围内WD001传感器的数据:

public static void main(String[] args) { HBaseDao hBaseDao =new HBaseDao(); try { // hBaseDao.queryByRowKey("WD_TABLE","WD00020211025111818932"); // List dataList = hBaseDao.getAllRows("WD_TABLE"); //查询2021-10-25 15:03:46的数据 // List dataList = hBaseDao.fuzzyQueryByRowKey("20211025150346", "WD_TABLE"); List dataList = hBaseDao.randomDownSampleQuery("202110251503" , "202110251504" , "WD_TABLE" , "WD001" , 0.5f); } catch (IOException e) { e.printStackTrace(); } }

执行结果:

15:48:51.379 [main] INFO com.example.demo.dao.HBaseDao - 本次降查询一共耗时2秒 15:48:51.379 [main] INFO com.example.demo.dao.HBaseDao - 一共查询了100记录 3.7.3 可降采样查询某一时间范围内多个传感器的值 /** * 可降采样查询某一时间范围内多个传感器的值 * @param startTime 开始时间 * @param endTime 结束时间 * @param tableName HBASE表名称 * @param siteList 存放带查询的测点的集合 * @param sampleRate 降采样频率,在 0-1 之间,若 < 0,则不进行降采样 * @return {{@link List}} * @author WZH * @date 2021/10/20 20:49 */ public List multiSiteQuery( String startTime, String endTime,String tableName , List siteList , float sampleRate ) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); long time1 =new Date().getTime(); //创建一个 RowRange 的 List 实现多值过滤 ArrayList rowRanges = new ArrayList(); for (String site : siteList) { MultiRowRangeFilter.RowRange rowRange = new MultiRowRangeFilter.RowRange( site+startTime, true, site+endTime, true); rowRanges.add(rowRange); } MultiRowRangeFilter multiRowRangeFilter = new MultiRowRangeFilter(rowRanges); // 过滤器组合列表 所有过滤器,与操作 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,multiRowRangeFilter); 随机过滤,实现降采样 if (sampleRate 0f) { RandomRowFilter randomRowFilter = new RandomRowFilter(sampleRate); filterList.addFilter(randomRowFilter); } scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); List dataList = DataProcessUtils.getDataList(scanner); // 关闭资源 scanner.close(); table.close(); long time2 = new Date().getTime(); long l = (time2 - time1) / 1000L; log.info("本次降查询一共耗时"+l+"秒"); log.info("一共查询了"+dataList.size()+"记录"); log.info("本次查询使用了"+filterList.size()+"个过滤器"); return dataList; }

在main()方法中测试,查询202110251503-202110251504时间范围内,WD000 WD002 WD0033个传感器的数据。

public static void main(String[] args) { HBaseDao hBaseDao =new HBaseDao(); try { ArrayList siteList = new ArrayList(); siteList.add("WD000"); siteList.add("WD002"); siteList.add("WD003"); List dataList = hBaseDao.multiSiteQuery("202110251503" , "202110251504" , "WD_TABLE" , siteList, 0.01f ); for (DataPojo dataPojo : dataList) { System.out.println(dataPojo); } } catch (IOException e) { e.printStackTrace(); } } 16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 本次降查询一共耗时1秒 16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 一共查询了9记录 16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 本次查询使用了2个过滤器 DataPojo(sensorId=WD000, value=20.591, time=20211025150303725) DataPojo(sensorId=WD000, value=20.174, time=20211025150304734) DataPojo(sensorId=WD000, value=21.907, time=20211025150308801) DataPojo(sensorId=WD000, value=21.004, time=20211025150309758) DataPojo(sensorId=WD000, value=22.650, time=20211025150314625) DataPojo(sensorId=WD000, value=20.768, time=20211025150326350) DataPojo(sensorId=WD000, value=21.067, time=20211025150330299) DataPojo(sensorId=WD000, value=21.403, time=20211025150331220) DataPojo(sensorId=WD000, value=20.037, time=20211025150336603)

更多过滤器请查考一下文章: https://www.jianshu.com/p/bcc54f63abe4

四、其他小知识 比count更快的统计行数的指令。

在 hbase安装目录下的$HBASE_HOME/bin 命令执行:

hbase org.apache.hadoop.hbase.mapreduce.RowCounter '表名' 后记

本文仅仅对hbase的入门知识作了一个简单介绍,在完成这个入门案例后,在去学习一些更深入的habse知识,比如和hive结合等,都会更加轻松。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有