基于 Spark RDD 的电影点评数据分析 您所在的位置:网站首页 评分评分最高的电影 基于 Spark RDD 的电影点评数据分析

基于 Spark RDD 的电影点评数据分析

2024-07-16 13:53| 来源: 网络整理| 查看: 265

基于 Spark RDD 的电影点评数据分析¶

学习 Spark 的基本用法,利用 Spark 的 map、sort、join、reduce 等功能,对电影点评数据进行简单的分析。

初始化 Spark Context¶ Pythonimport findspark # 初始化 spark 环境,系统全局环境变量已经设置 SPARK_HOME, PYSPARK_PYTHON 则不需要给此函数传入环境变量。 findspark.init() from pyspark.conf import SparkConf from pyspark.sql import SparkSession appname = "rdd movies - 1. top 10 - user_47" # 任务名称(将 user_xx 替换成你的号码) master = "yarn" # spark 集群地址 # spark 资源配置 conf = SparkConf().setAppName(appname).setMaster(master) spark = SparkSession.builder.config(conf=conf).getOrCreate() # 获取作业上下文 sc = spark.sparkContext 准备数据¶ Python# 读取 HDFS 文件 movies = sc.textFile("hdfs://node1:8020/text/ml_1m/movies.dat") ratings = sc.textFile("hdfs://node1:8020/text/ml_1m/ratings.dat") users = sc.textFile("hdfs://node1:8020/text/ml_1m/users.dat") Python# 过滤异常数据 movies = movies.filter( lambda movie: (len(movie.strip()) > 0) and (len(movie.split("::")) == 3) ) movies.cache() ratings = ratings.filter( lambda rating: (len(rating.strip()) > 0) and (len(rating.split("::")) == 4) ) ratings.cache() users = users.filter( lambda user: (len(user.strip()) > 0) and (len(user.split("::")) == 5) ) users.cache() 输出所有电影中评分最高的前 10 个电影名称和平均评分¶ Python# 定义函数,输入【电影 ID 和评分】、【电影 ID 和电影名称】和【评分前 n 位】,输出【评分最高的电影名称和平均评分】 def top(ratings_data, movies_data, n): # 从 ratings_data 中提取出【电影 ID】和【评分】,并在 value 中补充“1”作为【评分数量】 movieID_rating = ratings_data.map( lambda x: (int(x.split("::")[1]), (int(x.split("::")[2]), 1)) ) # 对同一部电影,加总【评分】和【评分数量】,得到【评分和】和【评分数量和】 movieID_rating_reduced = movieID_rating.reduceByKey( lambda x, y: (x[0] + y[0], x[1] + y[1]) ) # 对同一部电影,用【评分和】除以【评分数量和】,得到【平均评分】 movieID_rating_average = movieID_rating_reduced.mapValues(lambda x: x[0] / x[1]) # 根据每一部电影的【平均评分】从大到小排序 movieID_rating_descending = movieID_rating_average.sortBy(lambda x: x[1], False) # 提取【平均评分】最高的 n 部电影的【电影 ID】和【平均评分】,并存入一个 RDD movieID_rating_top_n = sc.parallelize(movieID_rating_descending.take(n)) # 从 movies_data 中提取出【电影 ID】和【电影名称】 movieID_movieName = movies_data.map( lambda x: (int(x.split("::")[0]), x.split("::")[1]) ) # 将【电影名称】合并到【平均评分】最高的 n 部电影,并提取【电影名称】和【平均评分】 movieName_rating_top_n = movieID_rating_top_n.join(movieID_movieName).map( lambda x: (x[1][1], x[1][0]) ) return movieName_rating_top_n Pythontop10 = top(ratings, movies, 10).collect() Pythonfor line in top10: print(line[0], ">>>rating=", line[1], sep="")

输出所有电影中最受男性喜爱的电影 Top 10 及其平均评分¶ Python# 从 users 中提取出【用户 ID】和【用户性别】 userID_gender = users.map(lambda x: (int(x.split("::")[0]), x.split("::")[1])) Python# 将 ratings 和 userID_gender 合并后,过滤得到男性的评分数据 male_ratings = ( ratings.map( lambda x: ( int(x.split("::")[0]), (int(x.split("::")[1]), int(x.split("::")[2])), ) ) .join(userID_gender) .filter(lambda x: x[1][1] == "M") .map(lambda x: "{}::{}::{}".format(x[0], x[1][0][0], x[1][0][1])) ) Pythontop10_male = top(male_ratings, movies, 10).collect() Pythonfor line in top10_male: print(line[0], ">>>rating=", line[1], sep="")

输出所有电影中最受中年人(45


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有