sparkshell 执行 sql sparksql命令行 您所在的位置:网站首页 sparkshell执行sql文件 sparkshell 执行 sql sparksql命令行

sparkshell 执行 sql sparksql命令行

2024-07-13 23:20| 来源: 网络整理| 查看: 265

spark-sql实践一、安装anaconda二、配置jupyter notebook三、案例分析1.格式转换2.读取文件生成DataFrame3.进行数据分析(1)计算每日的累计确诊病例数和死亡数(2)计算每日较昨日的新增确诊病例数和死亡病例数(3)统计截止5.19日 美国各州的累计确诊人数和死亡人数(4)找出美国确诊最多的10个州(5)找出美国死亡最多的10个州(6)找出美国确诊最少的10个州(7)找出美国死亡最少的10个州(8)统计截止5.19全美和各州的病死率4.数据可视化(1)画出每日的累计确诊病例数和死亡数——>双柱状图(2)画出每日的新增确诊病例数和死亡数——>折线图(3)画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格(4)画出美国确诊最多的10个州——>词云图(5)画出美国死亡最多的10个州——>象柱状图(6)找出美国确诊最少的10个州——>词云图(7)找出美国死亡最少的10个州——>漏斗图(8)美国的病死率--->饼状图四、遇到的问题1.找不到spark2.找不到python

一、安装anaconda

安装包链接 链接:https://pan.baidu.com/s/1dvNVT4VW34SW4EVoZRqNgA 提取码:batk

使用bash命令运行安装包

sparkshell 执行 sql sparksql命令行-e_大数据

一直回车,遇到选择选yes即可

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_02

sparkshell 执行 sql sparksql命令行-e_python_03

安装成功

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_04

配置环境变量

export PATH=$PATH:/root/anaconda3/bin

sparkshell 执行 sql sparksql命令行-e_大数据_05

可以看出安装成功

sparkshell 执行 sql sparksql命令行-e_可视化_06

二、配置jupyter notebook

配置环境变量后使用下面命令生成jupyter notebook配置文件

jupyter notebook --generate-config

sparkshell 执行 sql sparksql命令行-e_spark_07

使用下面命令设置jupyter密码并记住sha1值,后面配置要用

python -c "import IPython; print(IPython.lib.passwd())"

sparkshell 执行 sql sparksql命令行-e_python_08

在刚刚生成的配置文件中添加下面语句

# 允许所有IP登录 c.NotebookApp.ip = '*' # 使用刚刚生成的sha1值 c.NotebookApp.password = 'sha1:679a04c48eec:050346283252410f864ddfbf397a5aa64dd2ae09' # 是否自动打开浏览器 c.NotebookApp.open_browser = False # 允许使用root用户登录 c.NotebookApp.allow_root =True # 设置访问jupyter notebook的端口为4040 c.NotebookApp.port = 4040 c.ContentsManager.root_dir = '/usr/jupyter' c.NotebookApp.notebook_dir = '/usr/jupyter'

sparkshell 执行 sql sparksql命令行-e_大数据_09

启动jupyter notebook

jupyter notebook

sparkshell 执行 sql sparksql命令行-e_python_10

输入密码成功登录

sparkshell 执行 sql sparksql命令行-e_python_11

sparkshell 执行 sql sparksql命令行-e_python_12

三、案例分析

代码下载链接: 链接:https://pan.baidu.com/s/1Zjb-prt6v2Nbhhy6M1ZoUQ 提取码:cgnc

数据下载链接:

链接:https://pan.baidu.com/s/1UgkbxCDS_ne2zFqHxFAn8w 提取码:qskr

本案例使用的数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至2020-05-19的相关数据

sparkshell 执行 sql sparksql命令行-e_可视化_13

1.格式转换

原始数据集是以.csv文件组织的,为了方便spark读取生成RDD或者DataFrame,首先将us-counties.csv转换为.txt格式文件us-counties.txt

import pandas as pd #.csv->.txt data = pd.read_csv('us-counties.csv') with open('us-counties.txt','a+',encoding='utf-8') as f: for line in data.values: f.write((str(line[0])+'\t'+str(line[1])+'\t' +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

然后将数据上传到hdfs上

hdfs dfs -put us-counties.txt /test4

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_14

2.读取文件生成DataFrame

这里读取的路径都是hdfs路径

import findspark findspark.init()from pyspark import SparkConf,SparkContext from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql import SparkSession from datetime import datetime import pyspark.sql.functions as funcdef toDate(inputStr): newStr = "" if len(inputStr) == 8: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7] newStr = s1+"-"+"0"+s2+"-"+"0"+s3 else: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7:] newStr = s1+"-"+"0"+s2+"-"+s3 date = datetime.strptime(newStr, "%Y-%m-%d") return datespark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False), StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),] schema = StructType(fields) rdd0 = spark.sparkContext.textFile("/test4/us-counties.txt") rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema) shemaUsInfo.createOrReplaceTempView("usInfo")3.进行数据分析

这里存储的路径都是hdfs路径

(1)计算每日的累计确诊病例数和死亡数df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc()) #列重命名 df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths") df1.repartition(1).write.json("/test4/result1") #注册为临时表供下一步使用 df1.createOrReplaceTempView("ustotal")

sparkshell 执行 sql sparksql命令行-e_spark_15

(2)计算每日较昨日的新增确诊病例数和死亡病例数df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)") df2.sort(df2["date"].asc()).repartition(1).write.json("/test4/result2")

sparkshell 执行 sql sparksql命令行-e_spark_16

(3)统计截止5.19日 美国各州的累计确诊人数和死亡人数df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state") df3.sort(df3["totalCases"].desc()).repartition(1).write.json("/test4/result3") #写入hdfs df3.createOrReplaceTempView("eachStateInfo")

sparkshell 执行 sql sparksql命令行-e_大数据_17

(4)找出美国确诊最多的10个州df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10") df4.repartition(1).write.json("/test4/result4")

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_18

(5)找出美国死亡最多的10个州df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10") df5.repartition(1).write.json("/test4/result5")

sparkshell 执行 sql sparksql命令行-e_大数据_19

(6)找出美国确诊最少的10个州df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10") df6.repartition(1).write.json("/test4/result6")

sparkshell 执行 sql sparksql命令行-e_python_20

(7)找出美国死亡最少的10个州df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10") df7.repartition(1).write.json("/test4/result7")

sparkshell 执行 sql sparksql命令行-e_大数据_21

(8)统计截止5.19全美和各州的病死率df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache() df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("/test4/result8")

sparkshell 执行 sql sparksql命令行-e_可视化_22

4.数据可视化

导入所需库

from pyecharts import options as opts from pyecharts.charts import Bar from pyecharts.charts import Line from pyecharts.components import Table from pyecharts.charts import WordCloud from pyecharts.charts import Pie from pyecharts.charts import Funnel from pyecharts.charts import Scatter from pyecharts.charts import PictorialBar from pyecharts.options import ComponentTitleOpts from pyecharts.globals import SymbolType import json

将hdfs生成结果放在本地,因为可视化部分不需要使用集群,下面使用的路径均为本地路径

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_23

(1)画出每日的累计确诊病例数和死亡数——>双柱状图root = "test4/result1/part-00000-35b0ecb6-8abe-4342-90ce-bd9b86acc054-c000.json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) date.append(str(js['date'])) cases.append(int(js['cases'])) deaths.append(int(js['deaths'])) d = ( Bar() .add_xaxis(date) .add_yaxis("累计确诊人数", cases, stack="stack1") .add_yaxis("累计死亡人数", deaths, stack="stack1") .set_series_opts(label_opts=opts.LabelOpts(is_show=False)) .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数")) ) d.load_javascript() d.render_notebook()

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_24

(2)画出每日的新增确诊病例数和死亡数——>折线图root = "test4/result2/part-00000-6a74a9a3-dc2c-4d6b-997c-a74762a27bd0-c000.json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) date.append(str(js['date'])) cases.append(int(js['caseIncrease'])) deaths.append(int(js['deathIncrease'])) L1 = ( Line(init_opts=opts.InitOpts(width="1600px", height="800px")) .add_xaxis(xaxis_data=date) .add_yaxis( series_name="新增确诊", y_axis=cases, markpoint_opts=opts.MarkPointOpts( data=[ opts.MarkPointItem(type_="max", name="最大值") ] ), markline_opts=opts.MarkLineOpts( data=[opts.MarkLineItem(type_="average", name="平均值")] ), ) .set_global_opts( title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""), tooltip_opts=opts.TooltipOpts(trigger="axis"), toolbox_opts=opts.ToolboxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) ) L1.load_javascript() L1.render_notebook()

sparkshell 执行 sql sparksql命令行-e_python_25

L2 = ( Line(init_opts=opts.InitOpts(width="1600px", height="800px")) .add_xaxis(xaxis_data=date) .add_yaxis( series_name="新增死亡", y_axis=deaths, markpoint_opts=opts.MarkPointOpts( data=[opts.MarkPointItem(type_="max", name="最大值")] ), markline_opts=opts.MarkLineOpts( data=[ opts.MarkLineItem(type_="average", name="平均值"), opts.MarkLineItem(symbol="none", x="90%", y="max"), opts.MarkLineItem(symbol="circle", type_="max", name="最高点"), ] ), ) .set_global_opts( title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""), tooltip_opts=opts.TooltipOpts(trigger="axis"), toolbox_opts=opts.ToolboxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) ) L2.load_javascript() L2.render_notebook()

sparkshell 执行 sql sparksql命令行-e_可视化_26

(3)画出截止5.19,美国各州累计确诊、死亡人数和病死率—>表格root = "test4/result3/part-00000-253c81bd-4448-4823-954f-e7e9934605c9-c000.json" allState = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row = [] row.append(str(js['state'])) row.append(int(js['totalCases'])) row.append(int(js['totalDeaths'])) row.append(float(js['deathRate'])) allState.append(row) table = Table() headers = ["State name", "Total cases", "Total deaths", "Death rate"] rows = allState table.add(headers, rows) table.set_global_opts( title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="") ) table.load_javascript() table.render_notebook()

sparkshell 执行 sql sparksql命令行-e_python_27

(4)画出美国确诊最多的10个州——>词云图root = "test4/result4/part-00000-9dc04a1e-7763-4429-93fc-23b2f3d45512-c000.json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row=(str(js['state']),int(js['totalCases'])) data.append(row) c = ( WordCloud() .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND) .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10")) ) c.load_javascript() c.render_notebook()

sparkshell 执行 sql sparksql命令行-e_spark_28

(5)画出美国死亡最多的10个州——>象柱状图root = "test4/result5/part-00000-a8169860-0a64-4c5c-b740-fcdafc74505e-c000.json" state = [] totalDeath = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) state.insert(0,str(js['state'])) totalDeath.insert(0,int(js['totalDeaths'])) c = ( PictorialBar() .add_xaxis(state) .add_yaxis( "", totalDeath, label_opts=opts.LabelOpts(is_show=False), symbol_size=18, symbol_repeat="fixed", symbol_offset=[0, 0], is_symbol_clip=True, symbol=SymbolType.ROUND_RECT, ) .reversal_axis() .set_global_opts( title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"), xaxis_opts=opts.AxisOpts(is_show=False), yaxis_opts=opts.AxisOpts( axistick_opts=opts.AxisTickOpts(is_show=False), axisline_opts=opts.AxisLineOpts( linestyle_opts=opts.LineStyleOpts(opacity=0) ), ), ) ) c.load_javascript() c.render_notebook()

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_29

(6)找出美国确诊最少的10个州——>词云图root = "test4/result6/part-00000-9dc41291-7691-4ab3-8a09-2e4fb32bbd02-c000.json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row=(str(js['state']),int(js['totalCases'])) data.append(row) c = ( WordCloud() .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND) ) c.load_javascript() c.render_notebook()

sparkshell 执行 sql sparksql命令行-e_python_30

(7)找出美国死亡最少的10个州——>漏斗图root = "test4/result7/part-00000-0891d181-56a9-4d70-a94c-259bda524607-c000.json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) data.insert(0,[str(js['state']),int(js['totalDeaths'])]) c = ( Funnel() .add( "State", data, sort_="ascending", label_opts=opts.LabelOpts(position="inside"), ) .set_global_opts(title_opts=opts.TitleOpts(title="")) ) c.load_javascript() c.render_notebook()

sparkshell 执行 sql sparksql命令行-e_sparkshell 执行 sql_31

(8)美国的病死率—>饼状图root = "test4/result8/part-00000-47009151-50c4-4bb2-acb1-ddc2e101f6e2-c000.json" values = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) if str(js['state'])=="USA": values.append(["Death(%)",round(float(js['deathRate'])*100,2)]) values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)]) c = ( Pie() .add("", values) .set_colors(["blcak","orange"]) .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率")) .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")) ) c.load_javascript() c.render_notebook()

sparkshell 执行 sql sparksql命令行-e_spark_32

四、遇到的问题1.找不到spark

sparkshell 执行 sql sparksql命令行-e_大数据_33

在开头加上下面两行代码即可

import findspark findspark.init()2.找不到python

查看日志发现不是master中找不到python,而是slave中没找到,然后发现slave中没有安装python,在两个slave中按照第一步安装anaconda即可

sparkshell 执行 sql sparksql命令行-e_大数据_34



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有