请选择 进入手机版 | 继续访问电脑版

案例 统计用户上网流量,如果两次上网的时间小于10分钟,合并到一起 --spar

[复制链接]
余峻 发表于 2021-1-2 17:41:02 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
  1. /** * 数据分析: * uid,startTime, endTime, downFlow,  lag() over  ,  flag  ,  sum_over * 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20, 2020-02-18 14:20:30  0  0 * 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30, 2020-02-18 14:46:30  0  0 * 1,2020-02-18 15:37:23,2020-02-18 16:05:26,40, 2020-02-18 15:20:30  1  1 * 1,2020-02-18 16:06:27,2020-02-18 17:20:49,50, 2020-02-18 16:05:26  0  1 * 1,2020-02-18 17:21:50,2020-02-18 18:03:27,60, 2020-02-18 17:20:49  0  1 * * 根据uid分组,每个用户的数据按开始时间排序, * 判断下一行的开始时间减去上一行的竣事时间大于10分钟,开一个窗口flag 设为1,否则为0 * 再开一个窗口sum_over,将flag累加  根据相同的uid,sum_over 聚合流量 */object FlowCountTest {  def main(args: Array[String]): Unit = {    val sc = SparkUtils.createContext(true)    val lines: RDD[String] = sc.textFile("data/flow.txt")    //按uid分组    val groupd: RDD[(String, Iterable[(Long, Long, Long)])] = lines.mapPartitions(it => {      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")      it.map(e => {        val fields = e.split(",")        val uid = fields(0)        //将日期转换为时间戳        val startTime = dateFormat.parse(fields(1)).getTime        val endTime = dateFormat.parse(fields(2)).getTime        val downFlow = fields(3).toLong        //uid作为key,方便排序        (uid, (startTime, endTime, downFlow))      })    }).groupByKey()    //按开始时间排序    val sumOver: RDD[(String, (Long, Long, Long, Int))] = groupd.flatMapValues(it => {      val sorted: List[(Long, Long, Long)] = it.toList.sortBy(_._1)      //界说一个中间变量,吸收上一行的竣事时间,相当于sql的lag      var temp = 0L      var flag = 0  // 0 或 1      var sum = 0   // 累加flag   0 0 一组  1  1  1 一组      sorted.map(e => {        val startTime = e._1        val endTime = e._2        val flow = e._3        //如果不是第一行数据,比力这一行的开始时间和temp(上一行的竣事时间),大于10分钟,将flag设为1,否则为0        if (temp != 0) {          //如果这一行的开始时间减去上一行的竣事时间大于10分钟,开一个窗口设为1,否则为0          if ((startTime - temp) / 60000 > 10) {            flag = 1          } else {            flag = 0          }        }        //如果是第一行数据,将竣事时间赋给temp        temp = endTime        //返回这一行的开始时间,竣事时间,流量,sumOver        sum += flag        (startTime, endTime, flow, sum)      })    })    val res: RDD[((String, Int), (Long, Long, Long))] = sumOver.map {      case (uid, (startTime, endTime, flow, sum)) => {        //把startTime和endTime不高出10分钟的放到一组        ((uid, sum), (flow, startTime, endTime))      }    }.reduceByKey((a, b) => {      //根据uid,sum 聚合流量,      (a._1 + b._1, Math.min(a._2, b._2), Math.max(a._3, b._3))    })    //整理数据,把时间戳转换为日期    val result = res.mapPartitions(it => {      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")      it.map {        case ((uid, _), (flow, startTime, endTime)) => {          (uid, dateFormat.format(new Date(startTime)), dateFormat.format(new Date(endTime)), flow)        }      }    }).collect()    println(result.toBuffer)    sc.stop()  }}
复制代码
来源:https://blog.csdn.net/weixin_46959672/article/details/112068113
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题

专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )