请选择 进入手机版 | 继续访问电脑版

spark-sql实践

[复制链接]
茂忠想你 发表于 2020-12-31 20:23:19 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
spark-sql实践



一、安装anaconda

安装包链接
链接:https://pan.baidu.com/s/1dvNVT4VW34SW4EVoZRqNgA
提取码:batk
  使用bash下令运行安装包

  一直回车,遇到选择选yes即可


  安装乐成

  配置情况变量
  1. export PATH=$PATH:/root/anaconda3/bin
复制代码

  可以看出安装乐成

二、配置jupyter notebook

配置情况变量后使用下面下令生成jupyter notebook配置文件
  1. jupyter notebook --generate-config
复制代码

  使用下面下令设置jupyter密码并记着sha1值,反面配置要用
  1. python -c "import IPython; print(IPython.lib.passwd())"
复制代码

在刚刚生成的配置文件中添加下面语句
  1. # 允许所有IP登录c.NotebookApp.ip = '*'# 使用刚刚生成的sha1值c.NotebookApp.password = 'sha1:679a04c48eec:050346283252410f864ddfbf397a5aa64dd2ae09'# 是否自动打开欣赏器c.NotebookApp.open_browser = False# 允许使用root用户登录c.NotebookApp.allow_root =True# 设置访问jupyter notebook的端口为4040c.NotebookApp.port = 4040c.ContentsManager.root_dir = '/usr/jupyter'c.NotebookApp.notebook_dir = '/usr/jupyter'
复制代码

  启动jupyter notebook
  1. jupyter notebook
复制代码

  输入密码乐成登录


三、案例分析

代码下载链接:
链接:https://pan.baidu.com/s/1Zjb-prt6v2Nbhhy6M1ZoUQ
提取码:cgnc
  数据下载链接:
链接:https://pan.baidu.com/s/1UgkbxCDS_ne2zFqHxFAn8w
提取码:qskr
  本案例使用的数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至2020-05-19的相关数据

1.格式转换

原始数据集是以.csv文件组织的,为了方便spark读取生成RDD或者DataFrame,首先将us-counties.csv转换为.txt格式文件us-counties.txt
  1. import pandas as pd #.csv->.txtdata = pd.read_csv('us-counties.csv')with open('us-counties.txt','a+',encoding='utf-8') as f:    for line in data.values:        f.write((str(line[0])+'\t'+str(line[1])+'\t'                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))
复制代码
然后将数据上传到hdfs上
  1. hdfs dfs -put us-counties.txt /test4
复制代码

2.读取文件生成DataFrame

这里读取的路径都是hdfs路径
  1. import findsparkfindspark.init()
复制代码
  1. from pyspark import SparkConf,SparkContextfrom pyspark.sql import Rowfrom pyspark.sql.types import *from pyspark.sql import SparkSessionfrom datetime import datetimeimport pyspark.sql.functions as func
复制代码
  1. def toDate(inputStr):    newStr = ""    if len(inputStr) == 8:        s1 = inputStr[0:4]        s2 = inputStr[5:6]        s3 = inputStr[7]        newStr = s1+"-"+"0"+s2+"-"+"0"+s3    else:        s1 = inputStr[0:4]        s2 = inputStr[5:6]        s3 = inputStr[7:]        newStr = s1+"-"+"0"+s2+"-"+s3    date = datetime.strptime(newStr, "%Y-%m-%d")    return date
复制代码
  1. spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]schema = StructType(fields) rdd0 = spark.sparkContext.textFile("/test4/us-counties.txt")rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema)shemaUsInfo.createOrReplaceTempView("usInfo")
复制代码
3.举行数据分析

这里存储的路径都是hdfs路径
(1)盘算逐日的累计确诊病例数和死亡数

  1. df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc()) #列重命名df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")df1.repartition(1).write.json("/test4/result1")  #注册为暂时表供下一步使用df1.createOrReplaceTempView("ustotal")
复制代码

(2)盘算逐日较昨日的新增确诊病例数和死亡病例数

  1. df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)") df2.sort(df2["date"].asc()).repartition(1).write.json("/test4/result2")
复制代码

(3)统计截止5.19日 美国各州的累计确诊人数和死亡人数

  1. df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state") df3.sort(df3["totalCases"].desc()).repartition(1).write.json("/test4/result3") #写入hdfs df3.createOrReplaceTempView("eachStateInfo")
复制代码

(4)找出美国确诊最多的10个州

  1. df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")df4.repartition(1).write.json("/test4/result4")
复制代码

(5)找出美国死亡最多的10个州

  1. df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")df5.repartition(1).write.json("/test4/result5")
复制代码

(6)找出美国确诊最少的10个州

  1. df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")df6.repartition(1).write.json("/test4/result6")
复制代码

(7)找出美国死亡最少的10个州

  1. df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")df7.repartition(1).write.json("/test4/result7")
复制代码

(8)统计截止5.19全美和各州的病死率

  1. df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("/test4/result8")
复制代码

4.数据可视化

导入所需库
  1. from pyecharts import options as optsfrom pyecharts.charts import Barfrom pyecharts.charts import Linefrom pyecharts.components import Tablefrom pyecharts.charts import WordCloudfrom pyecharts.charts import Piefrom pyecharts.charts import Funnelfrom pyecharts.charts import Scatterfrom pyecharts.charts import PictorialBarfrom pyecharts.options import ComponentTitleOptsfrom pyecharts.globals import SymbolTypeimport json
复制代码
将hdfs生成效果放在当地,因为可视化部门不需要使用集群,下面使用的路径均为当地路径

(1)画出逐日的累计确诊病例数和死亡数——>双柱状图

  1. root = "test4/result1/part-00000-35b0ecb6-8abe-4342-90ce-bd9b86acc054-c000.json"date = []cases = []deaths = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        date.append(str(js['date']))        cases.append(int(js['cases']))        deaths.append(int(js['deaths']))d = (Bar().add_xaxis(date).add_yaxis("累计确诊人数", cases, stack="stack1").add_yaxis("累计死亡人数", deaths, stack="stack1").set_series_opts(label_opts=opts.LabelOpts(is_show=False)).set_global_opts(title_opts=opts.TitleOpts(title="美国逐日累计确诊和死亡人数")))d.load_javascript()d.render_notebook()
复制代码

(2)画出逐日的新增确诊病例数和死亡数——>折线图

  1. root = "test4/result2/part-00000-6a74a9a3-dc2c-4d6b-997c-a74762a27bd0-c000.json"date = []cases = []deaths = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        date.append(str(js['date']))        cases.append(int(js['caseIncrease']))        deaths.append(int(js['deathIncrease']))L1 = (Line(init_opts=opts.InitOpts(width="1600px", height="800px")).add_xaxis(xaxis_data=date).add_yaxis(    series_name="新增确诊",    y_axis=cases,    markpoint_opts=opts.MarkPointOpts(        data=[            opts.MarkPointItem(type_="max", name="最大值")        ]    ),    markline_opts=opts.MarkLineOpts(        data=[opts.MarkLineItem(type_="average", name="匀称值")]    ),).set_global_opts(    title_opts=opts.TitleOpts(title="美国逐日新增确诊折线图", subtitle=""),    tooltip_opts=opts.TooltipOpts(trigger="axis"),    toolbox_opts=opts.ToolboxOpts(is_show=True),    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),))L1.load_javascript()L1.render_notebook()
复制代码

  1. L2 = (Line(init_opts=opts.InitOpts(width="1600px", height="800px")).add_xaxis(xaxis_data=date).add_yaxis(    series_name="新增死亡",    y_axis=deaths,    markpoint_opts=opts.MarkPointOpts(        data=[opts.MarkPointItem(type_="max", name="最大值")]    ),    markline_opts=opts.MarkLineOpts(        data=[            opts.MarkLineItem(type_="average", name="匀称值"),            opts.MarkLineItem(symbol="none", x="90%", y="max"),            opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),        ]    ),).set_global_opts(    title_opts=opts.TitleOpts(title="美国逐日新增死亡折线图", subtitle=""),    tooltip_opts=opts.TooltipOpts(trigger="axis"),    toolbox_opts=opts.ToolboxOpts(is_show=True),    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),))L2.load_javascript()L2.render_notebook()
复制代码

(3)画出截止5.19,美国各州累计确诊、死亡人数和病死率—>表格

  1. root = "test4/result3/part-00000-253c81bd-4448-4823-954f-e7e9934605c9-c000.json"allState = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        row = []        row.append(str(js['state']))        row.append(int(js['totalCases']))        row.append(int(js['totalDeaths']))        row.append(float(js['deathRate']))        allState.append(row)table = Table()headers = ["State name", "Total cases", "Total deaths", "Death rate"]rows = allStatetable.add(headers, rows)table.set_global_opts(    title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle=""))table.load_javascript()table.render_notebook()
复制代码

(4)画出美国确诊最多的10个州——>词云图

  1. root = "test4/result4/part-00000-9dc04a1e-7763-4429-93fc-23b2f3d45512-c000.json"data = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        row=(str(js['state']),int(js['totalCases']))        data.append(row)c = (WordCloud().add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND).set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10")))c.load_javascript()c.render_notebook()
复制代码

(5)画出美国死亡最多的10个州——>象柱状图

  1. root = "test4/result5/part-00000-a8169860-0a64-4c5c-b740-fcdafc74505e-c000.json"state = []totalDeath = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        state.insert(0,str(js['state']))        totalDeath.insert(0,int(js['totalDeaths']))c = (PictorialBar().add_xaxis(state).add_yaxis(    "",    totalDeath,    label_opts=opts.LabelOpts(is_show=False),    symbol_size=18,    symbol_repeat="fixed",    symbol_offset=[0, 0],    is_symbol_clip=True,    symbol=SymbolType.ROUND_RECT,).reversal_axis().set_global_opts(    title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),    xaxis_opts=opts.AxisOpts(is_show=False),    yaxis_opts=opts.AxisOpts(        axistick_opts=opts.AxisTickOpts(is_show=False),        axisline_opts=opts.AxisLineOpts(            linestyle_opts=opts.LineStyleOpts(opacity=0)        ),    ),))c.load_javascript()c.render_notebook()
复制代码

(6)找出美国确诊最少的10个州——>词云图

  1. root = "test4/result6/part-00000-9dc41291-7691-4ab3-8a09-2e4fb32bbd02-c000.json"data = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        row=(str(js['state']),int(js['totalCases']))        data.append(row)c = (WordCloud().add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND))c.load_javascript()c.render_notebook()
复制代码

(7)找出美国死亡最少的10个州——>漏斗图

  1. root = "test4/result7/part-00000-0891d181-56a9-4d70-a94c-259bda524607-c000.json"data = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        data.insert(0,[str(js['state']),int(js['totalDeaths'])])c = (Funnel().add(    "State",    data,    sort_="ascending",    label_opts=opts.LabelOpts(position="inside"),).set_global_opts(title_opts=opts.TitleOpts(title="")))c.load_javascript()c.render_notebook()
复制代码

(8)美国的病死率—>饼状图

  1. root = "test4/result8/part-00000-47009151-50c4-4bb2-acb1-ddc2e101f6e2-c000.json"values = []with open(root, 'r') as f:    while True:        line = f.readline()        if not line:                            # 到 EOF,返回空字符串,则终止循环            break        js = json.loads(line)        if str(js['state'])=="USA":            values.append(["Death(%)",round(float(js['deathRate'])*100,2)])            values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])c = (Pie().add("", values).set_colors(["blcak","orange"]).set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率")).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")))c.load_javascript()c.render_notebook()
复制代码

四、遇到的问题

1.找不到spark


在开头加上下面两行代码即可
  1. import findsparkfindspark.init()
复制代码
2.找不到python

检察日志发现不是master中找不到python,而是slave中没找到,然后发现slave中没有安装python,在两个slave中按照第一步安装anaconda即可


来源:https://blog.csdn.net/weixin_43622131/article/details/111940873
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题

专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )