Spark核心组件、运行架构及RDD创建 | 您所在的位置:网站首页 › ajax程序执行的核心流程有哪些内容 › Spark核心组件、运行架构及RDD创建 |
目录
Spark核心组件Spark运行架构RDDDAG:有向无环图RDD创建RDD创建方式一:parallelizeRDD创建方式二:makeRDD分区设置textFile创建RDDlinux中创建RDD
Spark核心组件
在解释Spark架构之前,我们先来了解一下Spark的几个核心组件,弄清楚它们的作用分别是什么。 1、Application:Spark应用程序 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码。它由一个或多个作业Job组成。 如图: 5、Executor:执行器 Application运行在Worker上的一个进程。该进程负责Task的运行,并负责将数据存在内存或者磁盘上。每个Application都有各自独立的一批Executor。 6、Task: 被送到某个Executor进程上的工作单元; 7、Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job。 8、Stage 每个Job会被拆分成多组Task,作为一个TaskSet,即Stage 了解完Spark的核心组件后,我们再来看下Spark的运行架构。如图所示: RDD(Resillient Distributed Dataset):弹性分布式数据集。是Spark的核心,主要数据抽象。是Spark的基本计算单元。 简单的解释: RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存中,并执行正确的操作。 复杂的解释: RDD是用于数据转换的接口; RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等),或在故障或缓存收回时重新计算其他RDD分区中的数据。 分布式数据集: RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上; RDD并不存储真正的数据,只是对数据和操作的描述; 弹性: RDD默认存放在内存中,当内存不足时,Spark自动将RDD写入磁盘; 容错性: 根据数据血统,可以自动从节点失败中恢复分区。 RDD的特性 一系列的分区(分片)信息,每个任务处理一个分区;每个分区上都有compute函数,计算该分区中的数据;RDD之间有一系列的依赖(宽依赖和窄依赖);分区函数决定数据(Key-Value)分配到哪个分区;最佳位置列表,将计算人物分派到其所在处理数据块的存储位置。 DAG:有向无环图它和RDD一样,都是Spark提供的核心抽象,反应了RDD之间的依赖关系: 我们通过Idea的Maven来创建一个Spark工程。 Maven的创建可以参考我前面的一篇文章: Hadoop与Java的交互 1、下载依赖包: org.scala-lang scala-library 2.11.8 org.apache.spark spark-core_2.11 2.1.1 log4j log4j 1.2.17 org.slf4j slf4j-api 1.7.21 org.apache.spark spark-sql_2.11 2.1.12、安装完成后,在main文件夹下新建一个文件scala
1、建立一个wordCount对象,演示单词统计。 import org.apache.spark.{SparkConf, SparkContext} object wordCount { def main(args: Array[String]): Unit = { //local[2]是线程数(分区数),setAppName("worldCount")给Application命名 val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) //创建一个rdd val rdd1 = sc.parallelize(List("hello world","hello scala","hello spark","scala")) //实现单词统计 rdd1.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println) } }此时结果显示会产生很多的提示信息,影响结果的查看。我们可以通过log4j来去除这些提示。
把log4j文件复制到创建的资源文件夹中去,修改下名称: 上面我们采用的是parallelize的方式来创建的,还有一种方式是使用makeRDD。在创建之前,我们看下这两个方法的源码: def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) (implicit evidence$1 : scala.reflect.ClassTag[T]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ } def makeRDD[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ })(implicit evidence$2 : scala.reflect.ClassTag[T]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }这两个方法的代码其实是一样的,只是方法名不一样,这里就不再演示。 分区设置除了在setMaster()中指定分区外,我们也可以在创建RDD时设置分区 此时分区数是2: val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) val rdd2 = sc.makeRDD(List("hello world","hello scala","hello spark","scala")) rdd2.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println) //分区 val partition = rdd2.partitions println("分区数是:"+partition.length)
此时分区数变为了3。 本地创建 我们也可以通过加载文件的方式创建一个RDD。 文件内容: hello world hello scala hello spark scala 文件放到idea上。 import org.apache.spark.{SparkConf, SparkContext} object wordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) //相对路径 val words = sc.textFile("data/word.txt") //绝对路径 //val words = sc.textFile("C:/data/java/Scala/SparkDemo/data/word.txt") words.collect.foreach(println) } }
两种创建的方式其实都一样的,只是路径不同。主机名hadoop01可以换成ip地址,当我们上面配置的hosts文件中有其他同名主机名时可以通过ip地址寻址来完成查询。 也可以通过模式匹配的方式,来读取想要获得的文件内容: val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/*.txt") linux中创建RDD在linux中创建RDD比在idea上要简化很多。进入到spark的shell界面,直接输入创建语句即可: val rdd=sc.textFile("/data/hello/hello1.txt") //file可以省略 val rdd2=sc.textFile("file:///data/hello/hello1.txt") |
CopyRight 2018-2019 实验室设备网 版权所有 |