pyspark 您所在的位置:网站首页 sortby怎么读 pyspark

pyspark

2024-01-25 15:39| 来源: 网络整理| 查看: 265

❝ spark中的RDD是一个核心概念,RDD是一种弹性分布式数据集,spark计算操作都是基于RDD进行的,本文介绍RDD的基本操作。 ❞Spark 初始化

Spark初始化主要是要创建一个SprakContext实例,该实例表示与spark集群的连接。可以通过多种方式创建。

SparkContext

直接使用SparkContext类创建一个spark上下文,主要参数是指定master和appName。

from pyspark import SparkContext sc = SprakContext(master = 'local[*]',appName='test') SprakContext的属性# spark版本 sc.version '2.4.5' # python版本 sc.pythonVer '3.7' # master地址 sc.master 'local[*]' # 应用名字 sc.appName 'test' # 应用id sc.applicationId 'local-1596522649115' SparkConf

还可以通过调用SparkConf配置类来生成spark上下文。

from pyspark import SparkConf, SprakContext conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=conf) 创建RDD

RDD是spark中的主要数据格式,名称为弹性分布式数据集,可以序列化python对象来得到RDD,或者读取文件。

序列化# parallelize方法序列化python对象为RDD rdd = sc.parallelize([('a', 7), ('a', 2), ('b', 2)]) rdd1 = sc.parallelize([2,5,1,8]) rdd2 = sc.parallelize([('a', 2), ('d', 1), ('b', 1)]) rdd3 = sc.parallelize(range(100)) rdd4 = sc.parallelize([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])]) 读取文件# 读取本地json文件,返回RDD text_file = sc.textFile("e:/a.json") 获取RDD信息基本信息# 获取rdd的分区数 rdd.getNumPartitions() 12 # 获取rdd的key rdd.keys().collect() ['a', 'a', 'b'] # 获取rdd的value rdd.values().collect() [7, 2, 2] # 判断rdd是否为空 rdd.isEmpty() False sc.parallelize([]).isEmpty() True 统计信息

统计信息包含了基本的统计计算值,如最大值、最小值、平均数、描述统计等。

# 求和 rdd3.sum() 4950 # 最大值 rdd3.max() 99 # 最小值 rdd3.min() 0 # 均值 rdd3.mean() 49.5 # 标准差 rdd3.stdev() 28.86607004772212 # 方差 rdd3.variance() 833.25 # 分区间计数 rdd3.histogram(3) ([0, 33, 66, 99], [33, 33, 34]) # 描述统计 rdd3.stats() (count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0) 处理RDD切片/collect# 获取rdd里的所有元素,返回list rdd.collect() [('a', 7), ('a', 2), ('b', 2)] # 获取rdd里的元素,返回字典 rdd.collectAsMap() {'a': 2, 'd': 1, 'b': 1} # 获取开始的2个元素 rdd.take(2) [('a', 7), ('a', 2)] # 获取第一个位置的元素 rdd.first() ('a', 7) # 获取降序排序的前3个元素 rdd3.top(3) [99, 98, 97] 计数/count# 统计rdd里的元素个数 rdd.count() 3 # 按key统计rdd里的元素个数 rdd.countByKey() defaultdict(, {'a': 2, 'b': 1}) # 按value统计rdd里的元素个数 rdd.countByValue() defaultdict(, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1}) 重采样/sample# 对rdd进行重采样 rdd3.sample(False,0.1,81).collect() [4, 27, 28, 41, 49, 53, 58, 85, 93] 过滤/filter# 根据key过滤 rdd.filter(lambda x:'a' in x).collect() [('a', 7), ('a', 2)] 去重/distinct# 对rdd元素去重 rdd5.distinct().collect() ['a', 7, 2, 'b'] 排序/sortBy# 升序排序(默认) rdd1.sortBy(lambda x:x).collect() [1, 2, 5, 8] # 降序排序 rdd1.sortBy(lambda x:x,ascending=False).collect() [8, 5, 2, 1] # 对键值对rdd按照key排序 rdd2.sortByKey().collect() [('a', 2), ('b', 1), ('d', 1)] 映射/map# map方法对每个元素应用函数 rdd.map(lambda x: x+(x[0],x[1])).collect() [('a', 7, 'a', 7), ('a', 2, 'a', 2), ('b', 2, 'b', 2)] # flatMap方法,返回的结果会扁平化 rdd5 = rdd.flatMap(lambda x: x+(x[0],x[1])) rdd5.collect() ['a', 7, 'a', 7, 'a', 2, 'a', 2, 'b', 2, 'b', 2] # flatMapValues方法 rdd4.flatMapValues(lambda x:x).collect() [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 迭代/foreachdef g(x):print(x) # foreach方法对所有元素应用函数 rdd.foreach(x) ('a', 7) ('a', 2) ('b', 2) 简化/reduce# reduce方法对rdd进行合并 rdd.reduce(lambda x,y:x+y) ('a', 7, 'a', 2, 'b', 2) # reduceByKey方法根据key对value进行合并 rdd.reduceByKey(lambda v1,v2:v1+v2).collect() [('a', 9), ('b', 2)] 分组/groupBy# groupBy方法对rdd的元素分组 rdd1.groupBy(lambda x:x%2).mapValues(list).collect() [(0, [2, 8]), (1, [5, 1])] # groupByKey方法对rdd的元素根据key分组 rdd.groupByKey().mapValues(list).collect() [('a', [7, 2]), ('b', [2])] 聚合/aggregate# 定义两个聚合函数 seq_op=lambda x,y:(x[0]+y,x[1]+1) comb_op=lambda x,y:(x[0]+y[0],x[1]+y[1]) # aggregate方法聚合rdd rdd1.aggregate((0,0),seq_op,comb_op) (16, 4) # aggregateByKey方法根据key聚合rdd rdd.aggregateByKey((0,0),seq_op,comb_op).collect() [('a', (9, 2)), ('b', (2, 1))] # fold方法聚合rdd rdd1.fold(0,lambda x,y:x+y) 16 # foldByKey方法根据key聚合rdd rdd.foldByKey(0,lambda x,y:x+y).collect() [('a', 9), ('b', 2)] 合并/union# 调用sc的union方法按顺序合并多个rdd sc.union([rdd,rdd2]).collect() [('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)] 集合/intersection,union,subtract# 两个rdd的交集 rdd.intersection(rdd2).collect() [('a', 2)] # 两个rdd的并集(包含重复元素) rdd.union(rdd2).collect() [('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)] # rdd对rdd2的补集 rdd.subtract(rdd2).collect() [('a', 7), ('b', 2)] # 根据key求rdd2对rdd的补集) rdd2.subtractByKey(rdd).collect() [('d', 1)] # 两个rdd计算笛卡尔积 rdd1.cartesian(rdd1).collect() [(2, 2), (2, 5), (2, 1), (2, 8), (5, 2), (5, 5), (5, 1), (5, 8), (1, 2), (1, 5), (1, 1), (1, 8), (8, 2), (8, 5), (8, 1), (8, 8)] 保存RDD# 保存rdd到本地 rdd.saveAsTextFile('rdd.txt') 关闭spark# 使用stop方法关闭spark context实例 sc.stop() 运行

进入spark安装目录下,通过sprak-submit命令运行py文件。

./bin/spark-submit example/src/main/python/pi.py

另外,本地开发,可直接通过pyCharm运行。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有