MySQL同步数据到HBase 您所在的位置:网站首页 mysql数据迁移到hbase MySQL同步数据到HBase

MySQL同步数据到HBase

2023-05-13 12:34| 来源: 网络整理| 查看: 265

前言

这个问题在网上已经一搜一大把了,为什么要自己亲手总结一下仅仅是因为最近自己公司新上了HBase然后使用Spark去读取MySQL的数据写入HBase的时候遇到了一些问题,也困扰了挺久

现在就详细描述一下我去编写这个程序的流程,代码是如何去变化的

一、开发过程记录

我们现在就需要做两件事情,一个是MySQL中的表需要迁移过来HBase,这部分是全量同步,还有就是做数据的增量同步,这个现在不列入我们的需求之中

1.1 Spark SQL读取MySQL中的数据

我下面的代码是scala代码,非常简单,首先和MySQL取得连接,然后通过一个DataFrame去接收它就好了

val url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false" val connectProperties = new Properties() connectProperties.setProperty("user","xxx") connectProperties.setProperty("password","xxx") connectProperties.setProperty("driver","com.mysql.jdbc.Driver") connectProperties.setProperty("partitionColumn","xxx") val columnName = "Id" val lowerBound = 1 val upperBound = 30 val numPartitions = 2 val tableName = "xxx" def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("JuejinDemo").setMaster("local[2]") val spark = SparkSession.builder().config(sparkConf).getOrCreate() val jdbcDF = spark.read.jdbc(url, tableName,columnName,lowerBound,upperBound,numPartitions,connectProperties) //展示表结构 jdbcDF.printSchema() //展示数据 jdbcDF.show() } 复制代码

几行代码的事,简单说明一下参数

// MySQL的URL var url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false" // MySQL表名 val tableName = "xxx" // 连接配置 var connectProperties = new Properties() // MySQL用户名 connectProperties.setProperty("user","xxx") // MySQL密码 connectProperties.setProperty("password","xxx") // 驱动 connectProperties.setProperty("driver","com.mysql.jdbc.Driver") val columnName = "Id" // 从id为1开始读 val lowerBound = 1 // 下方解释 val upperBound = 100 val numPartitions = 2 复制代码

打印的结果:

现在有的数据:

注意:upperBound和numPartitions两个参数是有关联的,upperBound / numPartitions = 每个分区需要写入多少条数据,所以最好就是搞清楚数据总量是多少,因为笔者就遇到这么一个问题,本来总量是4000W条数据,笔者设置upperBound = 3千万,numPartitions = 300,那么每个分区就需要写入10W条数据。

而这个分区的规则是,前面299的分区都写入10W条数据,但是最后第300个分区就会写入10W+4000W-3000W = 1010W条数据,导致程序OOM了好几回而且找不出原因,设置executor-memory = 3G都不够吃,所以一定要注意

当然此时你也可以用SQL的方式去查询MySQL,然后把查询出来的结果当做你要写入的数据,因为使用jdbc方法的时候,是固定要把MySQL的整张表给读完的,所以会不可控,代码如下

// 连接MySQL读取数据 val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://" +host+ ":xxx/" +dbName+ "?characterEncoding=utf-8&useSSL=false") .option("driver", "com.mysql.jdbc.Driver") .option("user", username) .option("password", password) .option("query","select * from " +tableName).load() 复制代码

这里是使用了format方法,此时你需要提供的参数也很简单,和上面的jdbc是一样的

1.2 对数据的处理部分

这个时候我们就已经成功能读到数据了,和数据源的整合就是我们开发的第一步,之后就是对数据进行处理和发送到对应的下游,而对应的下游其实很多都是由API去提供支持,我们只需要把数据转成这个整合的API需要的格式即可

这里我使用了dtype属性,这里.var直接出来的是一个Array(String,String)

从输出的结果可以看到,我们这个type数组是记录了这张MySQL表中的字段和字段类型,所以这个时候我就可以用循环去遍历它并且对每个字段的数据进行处理了,让我们先拿到数据

这时我把DataFrame转换成了RDD进行处理,我先使用foreach输出一下,之后我会用map代替

现在拿到数据了,我们需要和HBase进行映射,那HBase的存储刚刚也说过了是列式存储,就是一个rowkey,对应多个列族的多个字段。这里我假设只有1个列族info。现在我们需要的条件就是,rowkey,columnFamily = info,字段,字段值即可

这里我简单把MySQL的Id作为rowkey的值,而且定义了一个getString方法,就是按照这条数据row的不同字段value_type去取得这个字段所对应的值,因为字段存在多个,所以它们是作为一个数组存在的,i就是这个数组的下标

/** * 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式 * 解决数字导入到HBase中变成乱码的问题 * * @param value_type * @param row * @param i * @return */ def getString(value_type: String, row: Row, i: Int): String = { if (row != null && row.length != 0) { var str = "" if ("IntegerType" == value_type) { str = row.getInt(i).toString } else if ("StringType" == value_type) { str = row.getString(i) } else if ("FloatType" == value_type) { str = row.getFloat(i).toString } else if ("DoubleType" == value_type) { str = row.getDouble(i).toString } else if ("TimestampType" == value_type) { str = row.getTimestamp(i).toString } str } else "" } 复制代码

然后我们再在刚刚的rdd中去调用上面的方法,把Id取出来赋值给rowkey

// 遍历所有的字段 for (j { (array) }}) //--------------------处理逻辑完成------------------------ //----------------------写入HDFS-------------------------- val hBaseContext = new HBaseContext(sc, hbaseConf) hBaseContext.bulkLoad(data.map(record => { val put = new Put(record._1) record._2.foreach(putValue => put.addColumn(putValue._1, putValue._2, putValue._3)) put }), TableName.valueOf(tableName), (t : Put) => putForLoad(t), path) //----------------------写入HDFS-------------------------- //----------------------写入HBase---------------------------- val conn = ConnectionFactory.createConnection(hbaseConf) val hbTableName = TableName.valueOf(hbaseTableName.getBytes()) val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn)) val realTable = conn.getTable(hbTableName) HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator) // bulk load start val loader = new LoadIncrementalHFiles(hbaseConf) val admin = conn.getAdmin() loader.doBulkLoad(new Path(path),admin,realTable,regionLocator) sc.stop() //----------------------写入HBase---------------------------- } /** * 获取 HBase相关参数 * * @param hadoopConf * @return */ def getHBaseConf(hadoopConf: Configuration,runType:String): Configuration = { val hbaseConf = HBaseConfiguration.create(hadoopConf) if (runType == "dev"){ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.DEV_ZOOKEEPER_QUORUM) hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT) } else { hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.PRO_ZOOKEEPER_QUORUM) hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT) } hbaseConf } /** * 创建HBase表 * @param tableName 表名 */ def createHTable(tableName: String, hBaseConf : Configuration) = { val connection = ConnectionFactory.createConnection(hBaseConf) val hBaseTableName = TableName.valueOf(tableName) val admin = connection.getAdmin if (!admin.tableExists(hBaseTableName)) { val tableDesc = new HTableDescriptor(hBaseTableName) tableDesc.addFamily(new HColumnDescriptor("info".getBytes)) admin.createTable(tableDesc) } connection.close() } /** * 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式 * 解决数字导入到HBase中变成乱码的问题 * * @param value_type * @param row * @param i * @return */ def getString(value_type: String, row: Row, i: Int): String = { if (row != null && row.length != 0) { var str = "" if ("IntegerType" == value_type) { str = row.getInt(i).toString } else if ("StringType" == value_type) { str = row.getString(i) } else if ("FloatType" == value_type) { str = row.getFloat(i).toString } else if ("DoubleType" == value_type) { str = row.getDouble(i).toString } else if ("TimestampType" == value_type) { str = row.getTimestamp(i).toString } str } else "" } /** * Prepare the Put object for bulkload function. * @param put The put object. * @throws java.io.IOException * @throws java.lang.InterruptedException * @return Tuple of (KeyFamilyQualifier, bytes of cell value) */ @throws(classOf[IOException]) @throws(classOf[InterruptedException]) def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList() import scala.collection.JavaConversions._ for (cells


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有