Spark核心组件、运行架构及RDD创建 您所在的位置:网站首页 ajax程序执行的核心流程有哪些内容 Spark核心组件、运行架构及RDD创建

Spark核心组件、运行架构及RDD创建

2024-07-10 05:47| 来源: 网络整理| 查看: 265

目录 Spark核心组件Spark运行架构RDDDAG:有向无环图RDD创建RDD创建方式一:parallelizeRDD创建方式二:makeRDD分区设置textFile创建RDDlinux中创建RDD

Spark核心组件

在解释Spark架构之前,我们先来了解一下Spark的几个核心组件,弄清楚它们的作用分别是什么。 1、Application:Spark应用程序 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码。它由一个或多个作业Job组成。 如图: 在这里插入图片描述 2、Driver program:驱动程序 Application中的main()函数并创建SparkContext。创建SparkContext是为了准备Spark的运行环境。在Spark中由SparkContext负责和ClusterManager建立通信,申请资源、任务的分配和监控等。当Executor运行结束后,Driver再将SparkContext关闭。往往SparkContext就代表了Driver。 在这里插入图片描述 3、Cluster Manager:集群管理(资源管理) 负责在集群上获取外部资源。常用的资源管理器有: Standalone:Spark原生的资源管理器,由Master负责; Mesos:由Mesos Master负责资源管理; YARN:Hadoop中的YARN,由ResourceManager负责 4、Worker Node:工作节点(计算节点) 集群中任何可以运行Application代码的节点。我们可以把每个Worker Node看成一台独立的电脑。与YARN中的NodeManager节点类似。在我们启动Spark的Standalone模式或者YARN模式的时候,就需要启动这一节点。在我们安装Spark配置文件时,在Slave文件中配置各个节点的主机名。当我们在YARN模式运行时,这一节点指的就是NodeManager节点。在Messos模式中指的是Messos Slave节点。 在这里插入图片描述

5、Executor:执行器 Application运行在Worker上的一个进程。该进程负责Task的运行,并负责将数据存在内存或者磁盘上。每个Application都有各自独立的一批Executor。 6、Task: 被送到某个Executor进程上的工作单元; 7、Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job。 8、Stage 每个Job会被拆分成多组Task,作为一个TaskSet,即Stage 在这里插入图片描述 也可以这样理解: 在这里插入图片描述

Spark运行架构

了解完Spark的核心组件后,我们再来看下Spark的运行架构。如图所示: 在这里插入图片描述 1、Driver Program创建SparkContext与ClusterManager建立通信,向ClusterManager申请资源,获取任务的分配和监控; 2、Cluster Manager负责申请和管理在Worker Node上运行所需要的资源。我们在前面介绍核心组件时提到过。其所使用的资源调度器有三种: Spark原生的:Cluster Manager; YARN:ResourceManager Mesos:基于Master。 3、Cluster Manager把得到的Job分成几个Stage分发到不同的Worker Node上。通过Executor进行计算处理; 4、每个Worker Node上的Executor服务于不同的Application,它们之间是不可共享数据的。 5、Spark的采用的是移动计算而非移动数据。当一个Job分成不同的Task之后,Driver会把我们所要执行的计算移动到各个Task所在的节点上,同时进行运算,加快了效率。

RDD

RDD(Resillient Distributed Dataset):弹性分布式数据集。是Spark的核心,主要数据抽象。是Spark的基本计算单元。 简单的解释: RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存中,并执行正确的操作。 复杂的解释: RDD是用于数据转换的接口; RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等),或在故障或缓存收回时重新计算其他RDD分区中的数据。 分布式数据集: RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上; RDD并不存储真正的数据,只是对数据和操作的描述; 弹性: RDD默认存放在内存中,当内存不足时,Spark自动将RDD写入磁盘; 容错性: 根据数据血统,可以自动从节点失败中恢复分区。 RDD的特性

一系列的分区(分片)信息,每个任务处理一个分区;每个分区上都有compute函数,计算该分区中的数据;RDD之间有一系列的依赖(宽依赖和窄依赖);分区函数决定数据(Key-Value)分配到哪个分区;最佳位置列表,将计算人物分派到其所在处理数据块的存储位置。 DAG:有向无环图

它和RDD一样,都是Spark提供的核心抽象,反应了RDD之间的依赖关系: 在这里插入图片描述

RDD创建

我们通过Idea的Maven来创建一个Spark工程。 Maven的创建可以参考我前面的一篇文章: Hadoop与Java的交互 1、下载依赖包:

org.scala-lang scala-library 2.11.8 org.apache.spark spark-core_2.11 2.1.1 log4j log4j 1.2.17 org.slf4j slf4j-api 1.7.21 org.apache.spark spark-sql_2.11 2.1.1

2、安装完成后,在main文件夹下新建一个文件scala

在这里插入图片描述 3、把scala文件设为Source Root 在这里插入图片描述 4、设置Scala SDK 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 5、完成后,我们即可在创建的scala文件中创建scala: 在这里插入图片描述

RDD创建方式一:parallelize

1、建立一个wordCount对象,演示单词统计。

import org.apache.spark.{SparkConf, SparkContext} object wordCount { def main(args: Array[String]): Unit = { //local[2]是线程数(分区数),setAppName("worldCount")给Application命名 val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) //创建一个rdd val rdd1 = sc.parallelize(List("hello world","hello scala","hello spark","scala")) //实现单词统计 rdd1.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println) } }

此时结果显示会产生很多的提示信息,影响结果的查看。我们可以通过log4j来去除这些提示。

在这里插入图片描述 2、在工程根目录下建立一个资源文件: 在这里插入图片描述 在External Libraries的下列路径找到log4j文件 在这里插入图片描述 在这里插入图片描述

把log4j文件复制到创建的资源文件夹中去,修改下名称: 在这里插入图片描述 将rootCategory=INFO改成ERROR 在这里插入图片描述 在这里插入图片描述 此时再运行,会发现提示信息没了: 在这里插入图片描述

RDD创建方式二:makeRDD

上面我们采用的是parallelize的方式来创建的,还有一种方式是使用makeRDD。在创建之前,我们看下这两个方法的源码:

def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) (implicit evidence$1 : scala.reflect.ClassTag[T]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ } def makeRDD[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ })(implicit evidence$2 : scala.reflect.ClassTag[T]) : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

这两个方法的代码其实是一样的,只是方法名不一样,这里就不再演示。

分区设置

除了在setMaster()中指定分区外,我们也可以在创建RDD时设置分区 此时分区数是2:

val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) val rdd2 = sc.makeRDD(List("hello world","hello scala","hello spark","scala")) rdd2.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println) //分区 val partition = rdd2.partitions println("分区数是:"+partition.length)

在这里插入图片描述 在创建RDD时,在语句后面加个3,分区数设为3.

val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) val rdd2 = sc.makeRDD(List("hello world","hello scala","hello spark","scala"),3) rdd2.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println) //分区 val partition = rdd2.partitions println("分区数是:"+partition.length)

此时分区数变为了3。

在这里插入图片描述

textFile创建RDD

本地创建 我们也可以通过加载文件的方式创建一个RDD。 文件内容:

hello world hello scala hello spark scala

文件放到idea上。

import org.apache.spark.{SparkConf, SparkContext} object wordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) //相对路径 val words = sc.textFile("data/word.txt") //绝对路径 //val words = sc.textFile("C:/data/java/Scala/SparkDemo/data/word.txt") words.collect.foreach(println) } }

在这里插入图片描述 hdfs上传 在创建RDD之前,我们首先要进行一下文件的配置: 在C:\Windows\System32\drivers\etc路径下,打开hosts文件。在文件中添加主机ip地址和主机名: 在这里插入图片描述 保存退出后打开cmd,输入ping hadoop01。(hdfs要启动) 在这里插入图片描述 成功后,即可通过HDFS上传文件,创建RDD。如果该文件中已经配置有其他主机,则可以通过ping主机ip地址的方式检查是否成功。 还有一个方法是在虚拟机中配置:

vi /etc/hosts

在这里插入图片描述 创建RDD: 在hdfs上找到要运行的文件路径。

import org.apache.spark.{SparkConf, SparkContext} object wordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount") val sc = SparkContext.getOrCreate(conf) //hdfs上文件的路径。 val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/word.txt") wordsHdfs.collect.foreach(println) } }

两种创建的方式其实都一样的,只是路径不同。主机名hadoop01可以换成ip地址,当我们上面配置的hosts文件中有其他同名主机名时可以通过ip地址寻址来完成查询。 在这里插入图片描述 当我们加载文件时,可以选择其所在的父目录,这样会把父目录下的文件全部读出:

val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/")

也可以通过模式匹配的方式,来读取想要获得的文件内容:

val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/*.txt") linux中创建RDD

在linux中创建RDD比在idea上要简化很多。进入到spark的shell界面,直接输入创建语句即可:

val rdd=sc.textFile("/data/hello/hello1.txt") //file可以省略 val rdd2=sc.textFile("file:///data/hello/hello1.txt")


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有