Spark 基本RDD操作 您所在的位置:网站首页 spark的转换操作包括 Spark 基本RDD操作

Spark 基本RDD操作

#Spark 基本RDD操作| 来源: 网络整理| 查看: 265

存在一些转化操作和行动操作,受任意数据类型的RDD支持,即基本RDD操作

——针对各个元素的转化操作

最常用的转化操作map() 和 filter() 转化操作map()接收一个函数,把这个函数用于RDD的每一个元素,将函数的返回结果作为结果设置为RDD中对应元素的值,而转化操作filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中

map()操作可以用于做各种事情,可以把URL集合中的每个URL对应的主机名提取出来,也可以简单的只对各个数字求平方,map()的返回值类型不需要和输入类型一样,这样如果存在一个字符串RDD 并且map()函数是用于来把字符串解析并返回长度,那么输入类型即为RDD 而输出类型为 RDD

使用map()转化RDD中每一个元素

JavaRDD rdd=sc.parallelize(Arrays.aslist(1,2,3,4)); JavaRDD result=rdd.map(new Function() { public Integer call(Integer x){return x*x}; } );

若希望每个输入元素生成多个输出元素,可采用flatmap() 和map()类似 flatmap()函数被应用到RDD的每一个元素上,不过不再是返回一个元素,而是一个返回值序列的迭代器,输出的RDD不是由迭代器元素组成,得到是由迭代器访问到的所有元素组合而成的RDD

使用flatmap()将每个RDD元素(字符串)切分成多个输出RDD元素(单词)

JavaRDD lines=sc.parallelize(Arrays.aslist("hello world","hi")); JavaRDD words=lines.flatmap(new FlatMapFunction() { public Iterablecall(String line) { return Arrays.aslist(line.split(" ")); } } ); words.first();//将输出 hello

——伪集合操作 

RDD本身不是严格意义上的集合,但它支持数学的集合操作,如合并和相交操作,这些操作建立在RDD是相同数据类型

RDD与集合区别之一是RDD允许重复的元素,可以采用RDD.distinct()操作来生成一个包含不同元素的RDD ,但这个操作开销很大,因为需要借助网络对数据进行混洗,以确保每个元素只有一份

对数据{1,2,3} 和 {3,4,5} 两个RDD进行转化操作

操作含义例子结果union()生成一个包含2个RDD所有运算的RDDrdd.union(other){1,2,3,3,4,5}intersection()求2个RDD中相同的元素rdd.intersection(other){3}subtract()移除一个RDD中的内容rdd.subtract(other){1,2}cartesian()2个RDD的笛卡儿积rdd.cartesian(other){(1,3),(1,4)....}

——行动操作

最基本的RDD行动操作reduce() 它接收一个函数为参数,这个函数要操作2个相同元素类型的RDD数据并返回一个同样类型的新元素,如最简单的函数+ 可以用于对RDD进行累加,使用reduce() 可以方便的计算出RDD所有元素的和 元素的个数等

Integer sun=rdd.reduce(new Function() { public Integer call(Integer x,Integer y) { return x+y; } } );

aggregate()函数 聚集 需要提供期待返回类型的初始值,然后通过一个函数把RDD的元素放入累加器,因为每个节点都是本地进行累加,最终还需要提供一个函数将累加器两两合并

class AvgCount implements Serializable { public AvgCount(int total,int num) { this.total=total; this.num=num; } public int total; public int num; public double avg() { return total/(double)num; } } //单个节点的统计 Function2 addAndCount= new Function2() { public AvgCount call(AvgCount a,Integer x) { a.total+=x; a.num+=1; } }; //不同节点 结果合并 Function2 combine= new Function2() { public AvgCount call(AvgCount a,AvgCount b) { a.total+=b.total; a.num+=b.num; return a; } }; AvgCount init=new AvgCount(0,0); //调用aggregate聚集 AvgCount result=rdd.aggregate(init,addAndCount,combine);

RDD行动操作会以普通集合或值得形式将RDD的部分或全部数据返回给驱动器程序

最简单的返回操作是coolect()  它将整个RDD内容返回,但是要求结果能装入驱动器程序机器的内存中

take(n)  top(n)操作可以返回指定部分RDD元素

但是,对于部分行动操作,可能不需要将结果返回给驱动器,如用JSON数据发送到一个服务器中,或者把数据存入数据库

此时可以采用foreach()行动操作对每个元素进行操作,而不是将RDD返回到本地

 

——对一个{1,2,3,4} RDD进行基本RDD行动操作

操作含义例子结果collect()返回RDD中所有元素rdd.collect(){1,2,3,4}count()返回RDD中元素个数rdd.count()4countByValue()各个元素出现次数rdd.countByValue(){(1,1),(2,1)....}take(num)从RDD中返回num个元素rdd.take(2){1,3}top(num)从RDD中返回前num个元素rdd.top(2){1,2}reduce(func)并行整合RDD中所有数据rdd.recuce((x,y)=>x+y)9foreach(func)对RDD中每个元素进行特定逻辑rdd.foreach(myFunc)将每个元素存入redis

 

 

 

 

 

 

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有