Spark大数据 您所在的位置:网站首页 基于python电影票房分析 Spark大数据

Spark大数据

2024-07-05 05:20| 来源: 网络整理| 查看: 265

TMDB电影数据分析(spark-scala版,pyspark版本)

基于kaggle的TMDB电影数据集的数据分析,该数据集包含大约 5000 部电影的相关数据,TMDB数据下载。分析电影关键词的分布、电影投入以及收益评价等之间的关系,并使用Python web进行可视化。

一.环境要求(仅供参考)

centos7、hadoop、spark、bottle(一种基于Python的web框架)。

二.数据预处理

tmdb_5000_movies.csv 数据包含以下字段: 字段名称 解释 例子 budget 预算 10000000 genres 体裁 “[{“”id””: 18, “”name””: “”Drama””}]” homepage 主页 “” id id 268238 keywords 关键词 “[{“”id””: 14636, “”name””: “”india””}]” original_language 原始语言 en original_title 原标题 The Second Best Exotic Marigold Hotel overview 概览 As the Best Exotic Marigold Hotel … popularity 流行度 17.592299 production_companies 生产公司 “[{“”name””: “”Fox Searchlight Pictures””, “”id””: 43}, …]” production_countries 生产国家 “[{“”iso31661″”: “”GB””, “”name””: “”United Kingdom””}, …]” release_date 发行日期 2015-02-26 revenue 盈收 85978266 runtime 片长 122 spoken_languages 语言 “[{“”iso6391″”: “”en””, “”name””: “”English””}]” status 状态 Released tagline 宣传语 “” title 标题 The Second Best Exotic Marigold Hotel vote_average 平均分 6.3 vote_count 投票人数 272

对数据去除标题行。数据中某些字段包含 json 数据,直接使用 DataFrame 进行读取会出现分割错误,如果要创建 DataFrame,先读取文件生成 RDD,再将 RDD 转为 DataFrame。为了便于处理上传数据至hdfs文件系统: hdfs dfs -put tmdb_5000_movies.csv 三、使用 Spark 将数据转为 DataFrame

为了创建 DataFrame,首先需要将 HDFS 上的数据加载成 RDD,再将 RDD 转化为 DataFrame。下面代码段完成从文件到 RDD 再到 DataFrame 的转化:

// 创建sparksession import org.apache.spark.sql.SparkSession val spark=SparkSession.builder().getOrCreate() import spark.implicits._ // 使用编程方式定义RDD模式 import org.apache.spark.sql.types._ import org.apache.spark.sql.Row // 定义一个模式字符串 val schemaString="budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count" // 根据字符串生成模式 val fields=schemaString.split(",").map(fieldName => StructField(fieldName,StringType,nullable=true)) val schema=StructType(fields) val path = "hdfs://192.168.1.30:9000/user/root/tmdb_5000_movies.csv" // 由于tmdb的csv数据某些字段中包含 val mdf=spark.read.format("com.databricks.spark.csv") .schema(schema) .option("inferSchema", value = false) .option("header", value = true) .option("nullValue", "\\N") .option("escape", "\"") // 设置用于在已引用的值内转义引号的单个字符。详情见 spark 读取 csv 官网介绍 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/DataFrameReader.html#option(java.lang.String,%20boolean) .option("quoteAll","true") .option("sep", ",") .csv(path) mdf.select("genres").show(2,false) 四、使用 Spark 进行数据分析

Spark 处理得到的 DataFrame mdf 进行数据分析,首先对数据中的主要字段单独进行分析(概览小节),然后再分析不同字段间的关系(关系小节)。为了方便进行数据可视化,每个不同的分析,都将分析结果导出为 json 文件存储到static目录下,由 web 页面读取并进行可视化。

1.概览

这个部分对数据进行整体的分析。

1.1.TMDb 电影中的体裁分布

从上面的数据字典描述可以看出,电影的体裁字段是一个 json 格式的数据,因此,为了统计不同体裁的电影的数量,需要首先解析 json 数据,从中取出每个电影对应的体裁数组,然后使用词频统计的方法统计不同体裁出现的频率,即可得到电影的体裁分布。 首先实现一个函数 countByJson(field) ,该函数实现解析 json 格式字段从中提取出 name 并进行词频统计的功能:

// (1)TMDb 电影中的体裁分布 import org.apache.spark.sql.functions._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import java.io.PrintWriter //这行是Scala解释器执行上面语句后返回的结果 // // 寻找tmdb中最常见的10中预算数,对电影预算频率进行统计 // // 使用sql语句分析 // 或者使用spark的转换函数进行分析 def countByJson(field:String):org.apache.spark.rdd.RDD[(String,Int)] ={ val jsonSchema =ArrayType(new StructType().add("id", IntegerType).add("name",StringType)) mdf.select(mdf.col(field)) .filter(mdf.col(field).isNotNull) // 此处是单条中包含多个数据的json,按照jsonSchema的格式进行解析,并生成多条单个数据,explode是将数组组生成为列。 .select(explode(from_json(mdf.col(field), jsonSchema)).as(field)) // 解决$"genres.name"的变量问题 .select(field.concat(".name")) .rdd .map(name=>(name.toString(),1)) .repartition(1) .reduceByKey((x,y) => x + y) }

该函数返回一个 RDD,整个过程如下图所示。 在这里插入图片描述 基于这个函数实现 countByGenres 用来生成不同体裁的电影数统计结果,接着,使用下面代码进行数据导出至 genres.json 方便之后进行可视化:

def countByGenres():String={ val genresRDD=countByJson("genres") val jsonString =genresRDD.collect().toList.map { case(genre,count) => (("genre" ->genre.replace("[","").replace("]",""))~("count" ->count)) } val mdfJson=compact(render(jsonString)) mdfJson } def save(str:String,path:String,hdfspath:String):Unit={ // 写入本地文件 val out = new PrintWriter(path) out.println(str) out.close() // 写入hdfs文件系统 val str_rdd=spark.sparkContext.parallelize(List(str)) str_rdd.saveAsTextFile(hdfspath) } val str=countByGenres() println(str) val path="/home/chenbengang/ziyu_bigdata/quick_learn_spark/movie_genres_word_count.txt" val hdfspath="hdfs://192.168.1.30:9000/user/root/movie_genres_word_count.txt" save(str,path,hdfspath) 1.2. 前 100 个常见关键词

该项分析电影关键词中出现频率最高的前一百个。由于关键词字段也是 json 格式数据,因此调用 countByJson 进行频率统计,同时对于统计结果进行降序排序并取前 100 项即可:

// 2. 前 100 个常见关键词 def countByKeywords():String={ // 对rdd排序 val keywordsRDD=countByJson("keywords").sortBy(x=>(-x._2)) val jsonString =keywordsRDD.take(100).toList.map { case(keywords,count) => (("keywords" ->keywords.replace("[","").replace("]",""))~("count" ->count)) } val mdfJson=compact(render(jsonString)) mdfJson } val str=countByKeywords() println(str) val path="/home/chenbengang/ziyu_bigdata/quick_learn_spark/movie_keywords_word_count.txt" val hdfspath="hdfs://192.168.1.30:9000/user/root/movie_keywords_word_count.txt" save(str,path,hdfspath) 1.3. TMDb 中最常见的 10 种预算数

这一项探究电影常见的预算数是多少,因此需要对电影预算进行频率统计。首先,需要对预算字段进行过滤,去除预算为 0 的项目,然后根据预算聚合并计数,接着根据计数进行排序,并将结果导出为 json 字符串,为了统一输出,这里将 json 字符串转为 python 对象,最后取前 10 项作为最终的结果。

// 3. TMDb 中最常见的 10 种预算数 def countByBudget(order:String,ascending:Boolean):Array[String]={ if (ascending){ mdf.filter(!$"budget".equalTo(0)).groupBy("budget").count().orderBy(order).toJSON.take(10) }else{ mdf.filter(!$"budget".equalTo(0)).groupBy("budget").count().orderBy(desc(order)).toJSON.take(10) } } val budgetTop10Arr=countByBudget("count",false) val movie_budgetTop10 = new StringBuilder movie_budgetTop10 ++= "[" for (v // 后一个filter之前是dataset有两列一列runtime,一列为count mdf.filter(!$"runtime".equalTo(0)).groupBy("runtime").count().filter("count>=100").toJSON.collect() } val runtimeOfCountOver100Arr=distrbutionOfRuntime("count",false) val movie_runtimeOfCountOver100 = new StringBuilder movie_runtimeOfCountOver100 ++= "[" for (v val production_companiesRDD=countByJson("production_companies").sortBy(x=>(-x._2)) val jsonString =production_companiesRDD.take(10).toList.map { case(company,count) => (("company" ->company.replace("[","").replace("]",""))~("count" ->count)) } val mdfJson=compact(render(jsonString)) mdfJson } val movie_countByCompanies=countByCompanies() println(movie_countByCompanies) val path="/home/chenbengang/ziyu_bigdata/quick_learn_spark/movie_countByCompanies.txt" val hdfspath="hdfs://192.168.1.30:9000/user/root/movie_countByCompanies.txt" save(str,path,hdfspath) 1.6. TMDb 中的 10 大电影语言

该项统计 TMDb 中出现最多的语言,与前面类似,该字段也是 JSON 数据,因此首先对每个项目进行词频统计,然后过滤掉语言为空的项目,最后排序取前十即可。

// 6. TMDb 中的 10 大电影语言 def countByLanguageRDD():String={ val countByLanguageRDD=countByJson("spoken_languages").sortBy(x=>(-x._2)) val jsonString =countByLanguageRDD.take(10).toList.map { case(language,count) => (("language" ->language.replace("[","").replace("]",""))~("count" ->count)) } val mdfJson=compact(render(jsonString)) mdfJson } val movie_countByLanguage=countByLanguageRDD() println(movie_countByLanguage) val path="/home/chenbengang/ziyu_bigdata/quick_learn_spark/movie_countByLanguage.txt" val hdfspath="hdfs://192.168.1.30:9000/user/root/movie_countByLanguage.txt" save(movie_countByLanguage,path,hdfspath) 2.关系

这个部分考虑数据之间的关系。

2.1.预算与评价的关系

这部分考虑预算与评价之间的关系,因此对于每个电影,需要导出如下的数据:

[电影标题,预算,评价] // 2.关系 // 这个部分考虑数据之间的关系。 // 1. 预算与评价的关系 def budgetVote():Array[String]={ mdf.select($"title",$"budget",$"vote_average").filter(!$"budget".equalTo(0)).filter(mdf.col("vote_count")>100).toJSON.collect() } val budgetVoteArr=budgetVote() // println(budgetVoteArr.length) val budgetVoteSB = new StringBuilder budgetVoteSB ++= "[" for (v mdf.select($"release_date",$"vote_average",$"title") .filter(mdf.col("release_date").isNotNull).filter(mdf.col("vote_count")>100).toJSON.collect() } val dateVoteArr=dateVote() // println(budgetVoteArr.length) val dateVoteSB = new StringBuilder dateVoteSB ++= "[" for (v mdf.select($"title",$"popularity",$"vote_average").filter(!$"popularity".equalTo(0)).filter(mdf.col("vote_count")>100).toJSON.collect() } val popVoteArr=popVote() println(budgetVoteArr.length) val popVoteSB = new StringBuilder popVoteSB ++= "[" for (v val jsonSchema =ArrayType(new StructType().add("id", IntegerType).add("name",StringType)) val jsonString=mdf.filter(mdf.col("production_companies").isNotNull) .filter(mdf.col("vote_count")>100) .select(explode(from_json(mdf.col("production_companies"), jsonSchema)).as("production_companies"),$"vote_average") .select($"production_companies.name",$"vote_average") .rdd .map(v => (v(0).toString,(v(1).toString.toFloat,1))) .repartition(1) .reduceByKey((x,y) => (x._1+y._1 , x._2+y._2)) .mapValues(x => (x._1/x._2,x._2)) .collect() .toList.map { case(company,(average,count)) => (("company" ->company.replace("[","").replace("]","")) ~("average" ->average) ~("count" ->count) )} val mdfJson=compact(render(jsonString)) mdfJson } val str=moviesVote() println(str) val path="/home/chenbengang/ziyu_bigdata/quick_learn_spark/movie_moviesVote.txt" val hdfspath="hdfs://192.168.1.30:9000/user/root/movie_moviesVote.txt" save(str,path,hdfspath) 2.5. 电影预算和营收的关系

这部分考虑电影的营收情况,因此对于每个电影,需要导出如下的数据:

[电影标题,预算,收入] 基于 DataFrame 对数据进行字段过滤即可,过滤掉预算,收入为 0 的数据。: // 5. 电影预算和营收的关系 // 3. 流行度和评价的关系 def budgetRevenue():Array[String]={ mdf.select($"title",$"budget",$"revenue").filter(!$"budget".equalTo(0)).filter(!$"revenue".equalTo(0)).toJSON.collect() } val budgetRevenueArr=budgetRevenue() println(budgetVoteArr.length) val budgetRevenueSB = new StringBuilder budgetRevenueSB ++= "[" for (v


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有