Spark操作HBase的数据,实现列值的计算 您所在的位置:网站首页 hbase创建表需要创建什么 Spark操作HBase的数据,实现列值的计算

Spark操作HBase的数据,实现列值的计算

2023-06-24 12:16| 来源: 网络整理| 查看: 265

本文将介绍如何使用Spark操作HBase的数据,实现列之间的计算,以特征值计算为例。特征值是指从原始数据中提取出来的具有代表性或判别性的数值,可以用于数据分析或机器学习等领域。本文将使用hbase-spark连接器,通过Spark RDD的方式,读取和写入HBase的表,实现对Sentinel-2卫星影像数据的特征值计算。

主要内容如下:

创建SparkSession和HBaseConfiguration对象。读取HBase表的数据,并转化成RDD。进行列式计算,得到特征值,并转化成RDD。写入HBase表的数据。验证HBase表的数据。

目录

一、环境准备

二、创建SparkSession和HBaseConfiguration对象

三、读取HBase表的数据,并转化成RDD

四、计算特征值,并转化成RDD

五、写入HBase表的数据

六、关闭SparkSession

七、验证HBase表的数据

 一、环境准备 安装Spark环境,HBase环境等,配置好集群。本文所用环境具体配置情况如下

分布式存储与并行处理环境配置:Hadoop、HBase和Spark等_runepic的博客-CSDN博客icon-default.png?t=N5F7https://blog.csdn.net/weixin_40694662/article/details/130030611

准备HBase的表,并导入一些测试数据。本文使用的是t3表和t3index表,分别存储了Sentinel-2卫星影像数据和时间索引。可以使用以下命令: create 't3', 'f1' create 't3index', 'f1' put 't3', '1000', 'f1:2019-01-01B2', '100' put 't3', '1000', 'f1:2019-01-01B3', '200' put 't3', '1000', 'f1:2019-01-01B4', '300' put 't3', '1000', 'f1:2019-01-01B8', '400' put 't3', '1000', 'f1:2019-01-01B11', '500' put 't3', '1000', 'f1:2019-01-01B12', '600' put 't3index', '0001', 'f1:td', ':2019-01-01B:'

 

二、创建SparkSession和HBaseConfiguration对象 导入需要的模块,包括org.apache.hadoop.hbase、org.apache.hadoop.hbase.mapreduce、org.apache.hadoop.io等。例如: import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io.NullWritable import org.apache.spark.sql.SparkSession 创建一个SparkSession对象,并设置应用名和运行模式。打包运行时需要注释.master("local") 例如: val spark = SparkSession.builder().appName("SparkHBaseRDDfeature") .master("local") .getOrCreate() 创建一个HBaseConfiguration对象,并设置Zookeeper的地址和端口。例如: val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop100:2181,hadoop200:2181,hadoop201:2181") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") 三、读取HBase表的数据,并转化成RDD 使用newAPIHadoopRDD方法,根据表名和列族名获取一个RDD。该RDD的元素类型为(ImmutableBytesWritable, Result),其中ImmutableBytesWritable是行键的封装,Result是行数据的封装。例如: val tablename = "t3" hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename) val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) 使用map方法,将RDD中的每个元素转换为一个元组,其中包含行键和列值。例如: val hBaseRDD2 = hBaseRDD.map{case (k,v) => val rowkey = Bytes.toString(k.get()) val b2 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B2".getBytes)).toDouble val b3 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B3".getBytes)).toDouble val b4 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B4".getBytes)).toDouble val b8 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B8".getBytes)).toDouble val b11 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B11".getBytes)).toDouble val b12 = Bytes.toString(v.getValue("f1".getBytes, "2019-01-01B12".getBytes)).toDouble (rowkey, b2, b3, b4, b8, b11, b12) } 四、计算特征值,并转化成RDD 定义一个函数,用于计算特征值。该函数接受六个参数,分别是Blue、Green、Red、NIR、SWIR_1和SWIR_2,分别对应Sentinel-2卫星影像的六个波段。该函数返回一个元组,包含三个特征值,分别是DVI、RVI和NDVI。例如: def calculateFeatures(Blue: Double, Green: Double, Red: Double, NIR: Double, SWIR_1: Double, SWIR_2: Double): (Double, Double, Double) = { val DVI = NIR - Red val RVI = NIR / Red val NDVI = (NIR - Red) / (NIR + Red) (DVI, RVI, NDVI) } 使用map方法,将RDD中的每个元素转换为一个(NullWritable, Put)类型的元组,用于写入HBase表。其中NullWritable是空键的封装,Put是写入操作的封装。例如: val hBaseRDDre = hBaseRDD2.map{case (rowkey, b2, b3, b4, b8, b11, b12) => // 创建一个Put对象,并设置行键 val put: Put = new Put(rowkey.getBytes) // 调用calculateFeatures函数,计算特征值 val (DVI, RVI, NDVI) = calculateFeatures(b2, b3, b4, b8, b11, b12) // 将特征值作为列值,添加到Put对象中 put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("DVI"), Bytes.toBytes(DVI.toString)) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RVI"), Bytes.toBytes(RVI.toString)) put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("NDVI"), Bytes.toBytes(NDVI.toString)) // 返回(NullWritable.get(), put)类型的元组 (NullWritable.get(), put) } 五、写入HBase表的数据 设置写入HBase表的表名。例如: hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "t3feature") 借助于mapreduce的Job对象添加参数配置。例如: val job: Job = Job.getInstance(hbaseConf) job.setOutputFormatClass(classOf[TableOutputFormat[NullWritable]]) job.setOutputKeyClass(classOf[NullWritable]) job.setOutputValueClass(classOf[Put]) 使用saveAsNewAPIHadoopDataset方法,将RDD写入HBase表。例如: hBaseRDDre.saveAsNewAPIHadoopDataset(job.getConfiguration)

 

六、关闭SparkSession 使用stop方法,关闭SparkSession。这样可以释放SparkContext占用的资源,避免资源浪费或冲突。例如: spark.stop() 七、验证HBase表的数据 使用exit命令,退出spark-shell。例如: exit 使用hbase shell命令,进入HBase交互式shell。例如: hbase shell 使用scan命令,扫描HBase表的数据,并查看特征值。例如: scan 't3feature' 使用exit命令,退出HBase shell。例如: exit

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有