请选择 进入手机版 | 继续访问电脑版

【Spark】Spark Streaming编程实践(简单易懂 快速上手)

[复制链接]
余峻 发表于 2021-1-2 17:36:47 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
文章目次



知识储备

1、数据处置处罚的方式:


  • 流式数据处置处罚(Streaming)
  • 批量数据处置处罚(batch)
2、数据处置处罚延迟的是非


  • 实时数据处置处罚:毫秒级别
  • 离线数据处置处罚:小时 or 天级别
3、Spark streaming 是准实时(秒、分钟)、微批次(时间)的数据处置处罚框架
1 根本概念

1、Spark streaming用于流式数据的处置处罚。


  • 支持的数据输入源许多,比方:kafka、Flume、Twitter、ZeroMQ和简朴的TCP套接字等
  • 数据输入后可以使用Spark的高度抽象原语如:map、reduce、join、window等举行运算
  • 效果可以生存在许多地方,比方HDFS、数据库
2、Spark Streaming使用离散化流作为抽象体现,叫做DStream,是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD组成的序列。
3、Spark Streaming步伐根本步调

  • 通过创建输入DStream来界说输入源。
  • 通过对DStream应用转换操纵和输出操纵来界说流盘算。
  • 使用streamingContext.start()来开始吸收数据和处置处罚流程。
  • 通过streamingContext.awaitTermination()方法来等候处置处罚竣事(手动竣事或因为错误而竣事)。
  • 可以通过streamingContext.stop()来手动竣事流盘算进程。
2 WordCount入门

  1. object word {  def main(args: Array[String]): Unit = {    // 创建情况对象,通报两个参数,第一个参数体现情况设置,第二个参数体现批量处置处罚的周期(收罗周期)    val conf = new SparkConf().setMaster("local").setAppName("SparkStreaming")    val ssc = new StreamingContext(conf,Seconds(3))    // 逻辑处置处罚    // 获取端口数据    val lines = ssc.socketTextStream("brace", 9999)    val words = lines.flatMap(_.split(" "))    val wordToOne = words.map((_,1))    val WordToCount = wordToOne.reduceByKey(_+_)    WordToCount.print()    // 关闭情况,由于SparkStreaming收罗器是恒久执行的任务,不能直接关闭    // 如果main方法执行完毕,应用步伐也会自动竣事,所以不能让main方法执行完毕//    ssc.stop()    // 1、启动收罗器    ssc.start()    // 2、等候收罗器的关闭    ssc.awaitTermination()  }}
复制代码
在虚拟机输入:nc -lp 9999,传入数据
3 DStream的创建

3.1 文件数据源

3.1.1 用法和说明

文件数据流:可以大概读取所有HDFS API兼容的文件系统文件,通过fileStream方法举行读取,SparkStreaming会监控dataDirectory目次并不绝处置处罚移动进来的文件。
  1. streamingContext.textFileStream(dataDirectory)
复制代码
注意:


  • 文件需要有相同的数据格式
  • 文件进入目次的方式需要通过移动或重定名来实现
  • 一旦文件移动进目次,则不能修改
3.2 RDD队列

3.2.1 用法

测试过程中,通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送到这个队列的RDD都会作为一个DStream处置处罚
3.2.2 案例

  1. object word2 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("SparkStreaming")    val ssc = new StreamingContext(conf,Seconds(3))    // 创建RDD队列    val rddQueue = new mutable.Queue[RDD[Int]]()    val inputStream = ssc.queueStream(rddQueue,false)    val mappedStream = inputStream.map((_,1))    val reducedStream = mappedStream.reduceByKey(_+_)    reducedStream.print()    // 1、启动收罗器    ssc.start()    for(i "brace:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "aaa",      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",    )    // 工具类    val kafkaDtatDS = KafkaUtils.createDirectStream[String,String](      ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String,String](Set("aaa"),kafkaParams)    )        kafkaDtatDS.map(_.value()).print()    ssc.start()    ssc.awaitTermination()  }}
复制代码
3、创建topic


  • 查询
  1. /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server brace:9092 --listor/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper brace:2181
复制代码


  • 创建
  1. /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server brace:9092 --create --topic aaa  --partitions 3 --replication-factor 1  or/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper brace:2181 --replication-factor 1 --partitions 3 --topic aaa
复制代码


  • 使用
  1. /usr/local/kafka/bin/kafka-console-producer.sh --broker-list brace:9092 --topic aaa
复制代码


  • 消费
  1. /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server brace:9092 --topic aaa --from-beginning
复制代码
3.5 Flume

3.5.1

4 DStream转换

DStream上的操纵与RDD相似,分为Transformations(转换)和Output Operations(输出)两种,别的转换操纵中另有一些比力特殊的原语,如:updateStateByKey()、transform()


  • 无状态转换:每个批次的处置处罚不依赖于之前批次的数据。
  • 有状态转换:当前批次的处置处罚需要使用之前批次的数据大概中间效果。有状态转换包罗基于滑动窗口的转换和追踪状态厘革的转换(updateStateByKey)。
4.1 无状态转化操纵

无状态操纵就是把简朴的RDD转化操纵应用的每个批次上,也就是转化DStream中的每一个RDD。
Transform

通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操纵。
join

两个流之间的join将相同key的数据毗连到一起
4.2 有状态转化操纵

4.2.1 UpdateStateByKey

用于纪录汗青纪录,有时候我们需要在DStream中跨批次维护状态(比方wordcount),针对这种情况该方法为我们提供了一个状态变量的访问,用于键值对形式的Dstream。
UpdateStateByKey的效果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的
两步:


  • 界说状态,状态可以是任意的数据范例
  • 界说状态更新函数
例子

  1. object word5 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local
  2. [*]").setAppName("SparkStreaming")    val ssc = new StreamingContext(conf,Seconds(3))    ssc.checkpoint("cp")    val datas = ssc.socketTextStream("brace",9999)    val wordToOne = datas.map((_,1))    // updateStateByKey根据key对数据的状态举行更新    // 通报的参数中含有两个值    // 第一个值体现相同key的value数据    // 第二个值体现缓存区相同key的value数据    val state = wordToOne.updateStateByKey (      (seq:Seq[Int], buff:Option[Int])=>{        val newCount = buff.getOrElse(0)+seq.sum      Option(newCount)      }    )    state.print()        ssc.start()    ssc.awaitTermination()  }}
复制代码
4.2.2 Window Operations

可以设置窗口的巨细和滑动窗口的隔断来动态的获取当前Steaming的允许状态。基于窗口的操纵会在一个比streamingcontext的批次隔断更长的时间范围内,通过整合多个批次的效果,盘算出整个窗口的效果。
应用场景

厘革(交通流量等)
  1. object word6 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local
  2. [*]").setAppName("SparkStreaming")    val ssc = new StreamingContext(conf,Seconds(3))    val kafkaParams = Map[String, Object](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "brace:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "aaa",      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",    )    val kafkaDataDS = KafkaUtils.createDirectStream[String,String](      ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String,String](Set("aaa"),kafkaParams)    )    val value = kafkaDataDS.map(_.value())    // 参数:隔断(巨细:收罗周期的整数倍)、滑动隔断(步长:收罗周期的整数倍)    value.window(Seconds(9),Seconds(3))    value.print()    ssc.start()    ssc.awaitTermination()  }}
复制代码
5 DStream输出

输出操纵如下:


  • print()
  • saveAsTextFile(prefix,[suffix]):以txt文件形式存储DStream的内容
  • saveAsObjectFiles(prefix,[suffix]):以java对象序列化的方式将数据生存为SequenceFiles
  • saveAsHadoopFiles
  • foreachRDD()
  1. import java.sql.{PreparedStatement, Connection, DriverManager}object word8 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local
  2. [*]").setAppName("SparkStreaming")    val ssc = new StreamingContext(conf,Seconds(3))    val lines = ssc.socketTextStream("brace", 9998)    val words = lines.flatMap(_.split(" "))    val wordToOne = words.map((_,1))    val WordToCount = wordToOne.reduceByKey(_+_)    WordToCount.print()    WordToCount.foreachRDD(rdd => {      //内部函数      def func(records: Iterator[(String,Int)]) {        var conn: Connection = null        var stmt: PreparedStatement = null                try {          val url = "jdbc:mysql://localhost:3306/spark"          val user = "root"          val password = "Hive@2020"          conn = DriverManager.getConnection(url, user, password)                    records.foreach(p => {            val sql = "insert into wordcount(word,count) values (?,?)"            stmt = conn.prepareStatement(sql);            stmt.setString(1, p._1.trim)            stmt.setInt(2,p._2.toInt)            stmt.executeUpdate()          })                  } catch {          case e: Exception => e.printStackTrace()        } finally {          if (stmt != null) {            stmt.close()          }          if (conn != null) {            conn.close()          }        }      }            val repartitionedRDD = rdd.repartition(3)      repartitionedRDD.foreachPartition(func)    })    ssc.start()    ssc.awaitTermination()  }}
复制代码
6 优雅地关闭


来源:https://blog.csdn.net/qq_45650899/article/details/112008647
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题

专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )