详解RDD基本概念、RDD五大属性 您所在的位置:网站首页 软件产品的两大主要属性 详解RDD基本概念、RDD五大属性

详解RDD基本概念、RDD五大属性

2024-07-07 09:50| 来源: 网络整理| 查看: 265

一、RDD是什么

        RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD是spark core的底层核心。

Dataset:

RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数;

RDD 也可以缓存起来, 相当于存储具体数据。

Distributed:

        RDD 支持分区, 可以运行在集群中。

Resilient:

RDD 支持高效的容错;

RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中。

1.RDD的特点: 弹性 容错的弹性:数据丢失可以自动恢复;存储的弹性:内存与磁盘的自动切换;计算的弹性:计算出错重试机制;分片的弹性:可根据需要重新分片。分布式:数据存储在集群不同节点上/计算分布式。数据集: RDD封装了计算逻辑,并不保存数据。数据抽象: RDD是一个抽象类,需要子类具体实现。不可变: RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。可分区、并行计算。

二、RDD 为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。

RDD 如何解决迭代计算非常低效的问题呢?

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中。 

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:

// 线性回归 val points = sc.textFile(...) .map(...) .persist(...) val w = randomValue for (i p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y) .reduce(_ + _) w -= gradient }

在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 谁快谁慢一窥便知。

三、结合案例深入了解RDD

需求:

给定一个网站的访问记录, 俗称 Access log

计算其中出现的独立 IP, 以及其访问的次数

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val result = sc.textFile("dataset/access_log_sample.txt") .map(item => (item.split(" ")(0), 1)) .filter(item => StringUtils.isNotBlank(item._1)) .reduceByKey((curr, agg) => curr + agg) .sortBy(item => item._2, false) .take(10) result.foreach(item => println(item))

针对这个小案例, 我们问出互相关联但是又方向不同的六个问题:

1.假设要针对整个网站的历史数据进行处理, 量有 1T, 如何处理?

放在集群中, 利用集群多台计算机来并行处理。

2.如何放在集群中运行?

简单来讲, 并行计算就是同时使用多个计算资源解决一个问题, 有如下四个要点

要解决的问题必须可以分解为多个可以并发计算的部分;

每个部分要可以在不同处理器上被同时执行;

需要一个共享内存的机制;

需要一个总体上的协作机制来进行调度。

 3.如果放在集群中的话, 可能要对整个计算任务进行分解, 如何分解?

概述:

对于 HDFS 中的文件, 是分为不同的 Block 的;

在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元。

扩展:

RDD 并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可;

RDD 至少是需要可以 分片 的, 因为HDFS中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片也意味着 可以并行计算。

4.移动数据不如移动计算是一个基础的优化, 如何做到?

每一个计算单元需要记录其存储单元的位置, 尽量调度过去。

5.在集群中运行, 需要很多节点之间配合, 出错的概率也更高, 出错了怎么办?

 

RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决:

缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制;

记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存。

如何通过父级 RDD 来恢复?

记录 RDD2 的父亲是 RDD1;

记录 RDD2 的计算函数, 例如记录 RDD2 = RDD1.map(…​), map(…​) 就是计算函数;

当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2。

6.假如任务特别复杂, 流程特别长, 有很多 RDD 之间有依赖关系, 如何优化?

上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态。

在 Spark 中有两个手段可以做到:

缓存

Checkpoint

 

四、RDD的五大属性 /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value * pairs, such as `groupByKey` and `join`; * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of * Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) * through implicit. * * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) * * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for * reading data from a new storage system) by overriding these functions. Please refer to the * Spark paper * for more details on RDD internals. */

从上面源码中,可以得到RDD的五大属性:

1.分区列表( a list of partitions)

        Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个 partiton,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数(分区)。

        如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个 partition),如果是从HDFS文件创建,默认为文件的 Block数。

/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * * The partitions in this array must satisfy the following property: * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` */ protected def getPartitions: Array[Partition] 2.每一个分区都有一个计算函数( a function for computing each split)

        每个分区都会有计算函数, Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,不需要保存每次计算的结果。RDD中的分片是并行的,所以是分布式并行计算。

        有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,如reduceByKey等这些操作时划分成 Stage, Stage内部的操作都是通过 Pipeline进行的,在具体处理数据时它会通过 Blockmanager来获取相关的数据,因为具体的 split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split都会映射成 BlockManager的Block,而具体的split会被函数处理,函数处理的具体形式是以任务的形式进行的。

/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] 3.依赖关系( a list of dependencies on other RDDS)

由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线一样的前后依赖关系,当然宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片,这个时候数据分片就不进行内存中的 Pipeline,一般都是跨机器的,因为有前后的依赖关系,所以当有分区的数据丢失时, Spark会通过依赖关系进行重新计算,从而计算出丢失的数据,而不是对RDD所有的分区进行重新计算。

RDD之间的依赖有两种:窄依赖( Narrow Dependency)和宽依赖( Wide Dependency)。

窄依赖(Narrow Dependency)

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;

对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,这种转换不会引起shuffle操作,速度快!

宽依赖(Wide Dependency)

宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;

这种转换会引起shuffle操作,速度慢!

Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps 4.key- value数据类型的RDD分区器( a Partitioner for key- alue RDDS)

一个Partitioner,即RDD的分区函数(可选项),Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

当前Spark中实现了两种类型的分区函数,

基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值基于范围的RangePartitioner。

什么会有Partitioner?

只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;非key-value的RDD(RDD[String])的Parititioner的值是None。

Option类型:可以表示有值或者没有值,它有2个子类:

Some:表示封装了值None:表示没有值 /** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None 5每个分区都有一个优先位置列表,即首选位置( a list of preferred locations to compute each split on)

存储每个切片优先(preferred location)位置的列表。 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置.。按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。

/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有