【Spark分布式内存计算框架 您所在的位置:网站首页 内存计算定义 【Spark分布式内存计算框架

【Spark分布式内存计算框架

2023-03-06 13:23| 来源: 网络整理| 查看: 265

3.3 Row

DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。 在这里插入图片描述 如何构建Row对象:要么是传递value,要么传递Seq,官方实例代码:

import org.apache.spark.sql._ // Create a Row from values. Row(value1, value2, value3, ...) // Create a Row from a Seq of values. Row.fromSeq(Seq(value1, value2, ...))

如何获取Row中每个字段的值呢????

方式一:下标获取,从0开始,类似数组下标获取 在这里插入图片描述

方式二:指定下标,知道类型 在这里插入图片描述

方式三:通过As转换类型, 此种方式开发中使用最多 在这里插入图片描述

3.4 RDD转换DataFrame

实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema。 官方文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

在这里插入图片描述 范例演示说明:使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。 在这里插入图片描述 字段信息:user id 、 item id、 rating 、 timestamp。

反射类型推断 当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

第一步、定义CaseClass样例类,封装电影评分数据

/** * 封装电影评分数据 * * @param userId 用户ID * @param itemId 电影ID * @param rating 用户对电影评分 * @param timestamp 评分时间戳 */ case class MovieRating( userId: String, itemId: String, rating: Double, timestamp: Long )

第二步、SparkContext读取电影评分数据封装到RDD中,转换数据类型

import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 采用反射的方式将RDD转换为DataFrame和Dataset */ object SparkRDDInferring { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象 val spark: SparkSession = SparkSession .builder() // 使用建造者模式构建对象 .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割 // user id | item id | rating | timestamp. val rawRatingsRDD: RDD[String] = spark.sparkContext .textFile("datas/ml-100k/u.data", minPartitions = 2) // 转换数据 val ratingsRDD: RDD[MovieRating] = rawRatingsRDD .filter(line => null != line && line.trim.split("\t").length == 4) .mapPartitions{iter => iter.map{line => // 拆箱操作, Python中常用 val Array(userId, itemId, rating, timestamp) = line.trim.split("\t") // 返回MovieRating实例对象 MovieRating(userId, itemId, rating.toDouble, timestamp.toLong) } } // 将RDD转换为DataFrame和Dataset val ratingsDF: DataFrame = ratingsRDD.toDF() /* root |-- userId: string (nullable = true) |-- itemId: string (nullable = true) |-- rating: double (nullable = false) |-- timestamp: long (nullable = false) */ ratingsDF.printSchema() ratingsDF.show(10) // 应用结束,关闭资源 spark.stop() } }

此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中属性名称。

自定义Schema 依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具体步骤如下:

第一步、RDD中数据类型为Row:RDD[Row]; 第二步、针对Row中数据定义Schema:StructType; 第三步、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;

范例演示代码:

import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * 自定义Schema方式转换RDD为DataFrame */ object SparkRDDSchema { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象 val spark: SparkSession = SparkSession .builder() // 使用建造者模式构建对象 .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割 // user id | item id | rating | timestamp. val ratingsRDD: RDD[String] = spark .sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2) // a. RDD[Row] val rowsRDD: RDD[Row] = ratingsRDD.mapPartitions{ iter => iter.map{line => // 拆箱操作, Python中常用 val Array(userId, itemId, rating, timestamp) = line.trim.split("\t") // 返回Row实例对象 Row(userId, itemId, rating.toDouble, timestamp.toLong) } } // b. schema val rowSchema: StructType = StructType( Array( StructField("userId", StringType, nullable = true), StructField("itemId", StringType, nullable = true), StructField("rating", DoubleType, nullable = true), StructField("timestamp", LongType, nullable = true) ) ) // c. 应用函数createDataFrame val ratingDF: DataFrame = spark.createDataFrame(rowsRDD, rowSchema) ratingDF.printSchema() ratingDF.show(10, truncate = false) // 应用结束,关闭资源 spark.stop() } }

此种方式可以更加体会到DataFrame = RDD[Row] + Schema组成,在实际项目开发中灵活的选择方式将RDD转换为DataFrame

3.5 toDF函数

除了上述两种方式将RDD转换为DataFrame以外,SparkSQL中提供一个函数:toDF,通过指定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。 在这里插入图片描述 范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。

import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame */ object SparkSQLToDF { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // TODO: 1、构建RDD,数据类型为三元组形式 val usersRDD: RDD[(Int, String, Int)] = spark.sparkContext.parallelize( Seq( (10001, "zhangsan", 23), (10002, "lisi", 22), (10003, "wangwu", 23), (10004, "zhaoliu", 24) ) ) // 将RDD转换为DataFrame val usersDF: DataFrame = usersRDD.toDF("id", "name", "age") usersDF.printSchema() usersDF.show(10, truncate = false) println("========================================================") val df: DataFrame = Seq( (10001, "zhangsan", 23), (10002, "lisi", 22), (10003, "wangwu", 23), (10004, "zhaoliu", 24) ).toDF("id", "name", "age") df.printSchema() df.show(10, truncate = false) // TODO: 应用结束,关闭资源 spark.stop() } }

运行程序结果如下截图: 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有