RDD转换为DataFrame的两种方式详解 您所在的位置:网站首页 中国菜命名的方法有哪两种 RDD转换为DataFrame的两种方式详解

RDD转换为DataFrame的两种方式详解

2023-08-21 01:56| 来源: 网络整理| 查看: 265

Spark支持两种方法将存在的RDD转换为DataFrame(SchemaRDD),后面附完整样例代码

元数据:person.txt

1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 jerry 40

1.使用反射来推断包含特定对象类型的RDD的模式(schema)

在你写spark程序的同时,当你已经知道了模式,这种基于反射的 方法可以使代码更简洁并且程序工作得更好.

Spark SQL的Scala接口支持将包含样本类的RDD自动转换SchemaRDD。这个样本类定义了表的模式。给样本类的参数名字通过反射来读取,然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表,表可以在后续的 sql语句中使用。

关键代码: 在这里插入图片描述

2.通过编程接口

这个接口允许你构造一个模式,然后在存在的RDDs上使用它。虽然这种方法更冗长,但是它允许你在运行期之前不知道列以及列 的类型的情况下构造SchemaRDD。

当样本类不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个SchemaRDD可以通过三步来创建。

从原来的RDD创建一个行的RDD创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配在行RDD上通过createDataFrame方法应用模式 关键代码: 在这里插入图片描述 两种模式完整样例代码:

利用反射推断模式:

import org.apache.spark.sql.SparkSession /** * @Author 海龟 * @Date 2021/4/7 8:39 * @Desc RDD->DataFrame (case类能定义)通过反射机制推断包含特定类型对象的Schema信息 */ //样例类定义字段跟属性 case class Person(id:Int, name:String,age:Int) object CaseClassSchema{ def main(args: Array[String]): Unit ={ //1.构建SparkSession val spark = SparkSession.builder().appName("RDDToFDataFrame.CaseClassSchema") .master("local[*]").getOrCreate() //2.获取SparkContext val sc = spark.sparkContext //设置日志打印级别 sc.setLogLevel("WARN") //3.读取文件 val data = sc.textFile("D://Spark_IN&OUT/person.txt").map(_.split(" ")) //4,将RDD与样例类关联 val personRDD = data.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //5.获取DataFrame //手动导入隐式转化 import spark.implicits._ val personDF = personRDD.toDF println("-"*20 + "DSL风格操作开始") //1.显示DataFrame(personDF)的数据,默认显示20行 personDF.show() //2.显示DataFrame(personDF)的Schema信息 personDF.printSchema() //3.统计DataFrame(personDF)的年龄大于30岁的人数 val result3 = personDF.filter(personDF("age") >= 30).count() println(result3) println("-"*20 + "DSL风格操作结束") println("-"*20 + "SQL风格操作开始") //1,将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") //2.显示表所有信息 spark.sql("select * from (t_person)").show() //3.显示姓名为zhangsan的信息 spark.sql("select * from (t_person) where name ='zhangsan'").show() println("-"*20 + "SQL风格操作结束") //关闭资源 sc.stop spark.stop() } }

编程方式:

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** * @Author 海龟 * @Date 2021/4/7 9:32 * @Desc 编程方式定义Schema */ object SpaekSQLSchema { def main(args: Array[String]): Unit = { //1.构建SparkSession val spark = SparkSession.builder().appName("RDDToFDataFrame.CaseClassSchema") .master("local[*]").getOrCreate() //2.获取SparkContext val sc = spark.sparkContext //设置日志打印级别 sc.setLogLevel("WARN") //3.读取文件 val data = sc.textFile("D://Spark_IN&OUT/person.txt").map(_.split(" ")) //4,将RDD与样例类关联 val personRDD = data.map(x => Row(x(0).toInt, x(1), x(2).toInt)) //5.创建Schema val schema = StructType(Seq( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("age", IntegerType, false) )) //6.利用personRDD与Schema创建DataFrame val personDF = spark.createDataFrame(personRDD, schema) println("-" * 20 + "DSL风格操作开始") //1.显示DataFrame(personDF)的数据,默认显示20行 personDF.show() //2.显示DataFrame(personDF)的Schema信息 personDF.printSchema() //3.统计DataFrame(personDF)的年龄大于30岁的人数 val result3 = personDF.filter(personDF("age") >= 30).count() println(result3) println("-" * 20 + "DSL风格操作结束") println("-" * 20 + "SQL风格操作开始") //1,将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") //2.显示表所有信息 spark.sql("select * from (t_person)").show() //3.显示姓名为zhangsan的信息 spark.sql("select * from (t_person) where name ='zhangsan'").show() println("-" * 20 + "SQL风格操作结束") //关闭资源 sc.stop spark.stop() } }

感谢大家观看!!



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有