文章目次
知识储备
1、数据处置处罚的方式:
- 流式数据处置处罚(Streaming)
- 批量数据处置处罚(batch)
2、数据处置处罚延迟的是非
- 实时数据处置处罚:毫秒级别
- 离线数据处置处罚:小时 or 天级别
3、Spark streaming 是准实时(秒、分钟)、微批次(时间)的数据处置处罚框架
1 根本概念
1、Spark streaming用于流式数据的处置处罚。
- 支持的数据输入源许多,比方:kafka、Flume、Twitter、ZeroMQ和简朴的TCP套接字等
- 数据输入后可以使用Spark的高度抽象原语如:map、reduce、join、window等举行运算
- 效果可以生存在许多地方,比方HDFS、数据库等
2、Spark Streaming使用离散化流作为抽象体现,叫做DStream,是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD组成的序列。
3、Spark Streaming步伐根本步调
- 通过创建输入DStream来界说输入源。
- 通过对DStream应用转换操纵和输出操纵来界说流盘算。
- 使用streamingContext.start()来开始吸收数据和处置处罚流程。
- 通过streamingContext.awaitTermination()方法来等候处置处罚竣事(手动竣事或因为错误而竣事)。
- 可以通过streamingContext.stop()来手动竣事流盘算进程。
2 WordCount入门
- object word { def main(args: Array[String]): Unit = { // 创建情况对象,通报两个参数,第一个参数体现情况设置,第二个参数体现批量处置处罚的周期(收罗周期) val conf = new SparkConf().setMaster("local").setAppName("SparkStreaming") val ssc = new StreamingContext(conf,Seconds(3)) // 逻辑处置处罚 // 获取端口数据 val lines = ssc.socketTextStream("brace", 9999) val words = lines.flatMap(_.split(" ")) val wordToOne = words.map((_,1)) val WordToCount = wordToOne.reduceByKey(_+_) WordToCount.print() // 关闭情况,由于SparkStreaming收罗器是恒久执行的任务,不能直接关闭 // 如果main方法执行完毕,应用步伐也会自动竣事,所以不能让main方法执行完毕// ssc.stop() // 1、启动收罗器 ssc.start() // 2、等候收罗器的关闭 ssc.awaitTermination() }}
复制代码 在虚拟机输入:nc -lp 9999,传入数据
3 DStream的创建
3.1 文件数据源
3.1.1 用法和说明
文件数据流:可以大概读取所有HDFS API兼容的文件系统文件,通过fileStream方法举行读取,SparkStreaming会监控dataDirectory目次并不绝处置处罚移动进来的文件。
- streamingContext.textFileStream(dataDirectory)
复制代码 注意:
- 文件需要有相同的数据格式
- 文件进入目次的方式需要通过移动或重定名来实现
- 一旦文件移动进目次,则不能修改
3.2 RDD队列
3.2.1 用法
测试过程中,通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送到这个队列的RDD都会作为一个DStream处置处罚
3.2.2 案例
- object word2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkStreaming") val ssc = new StreamingContext(conf,Seconds(3)) // 创建RDD队列 val rddQueue = new mutable.Queue[RDD[Int]]() val inputStream = ssc.queueStream(rddQueue,false) val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_+_) reducedStream.print() // 1、启动收罗器 ssc.start() for(i "brace:9092", ConsumerConfig.GROUP_ID_CONFIG -> "aaa", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", ) // 工具类 val kafkaDtatDS = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Set("aaa"),kafkaParams) ) kafkaDtatDS.map(_.value()).print() ssc.start() ssc.awaitTermination() }}
复制代码 3、创建topic
- /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server brace:9092 --listor/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper brace:2181
复制代码
- /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server brace:9092 --create --topic aaa --partitions 3 --replication-factor 1 or/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper brace:2181 --replication-factor 1 --partitions 3 --topic aaa
复制代码
- /usr/local/kafka/bin/kafka-console-producer.sh --broker-list brace:9092 --topic aaa
复制代码
- /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server brace:9092 --topic aaa --from-beginning
复制代码 3.5 Flume
3.5.1
4 DStream转换
DStream上的操纵与RDD相似,分为Transformations(转换)和Output Operations(输出)两种,别的转换操纵中另有一些比力特殊的原语,如:updateStateByKey()、transform()
- 无状态转换:每个批次的处置处罚不依赖于之前批次的数据。
- 有状态转换:当前批次的处置处罚需要使用之前批次的数据大概中间效果。有状态转换包罗基于滑动窗口的转换和追踪状态厘革的转换(updateStateByKey)。
4.1 无状态转化操纵
无状态操纵就是把简朴的RDD转化操纵应用的每个批次上,也就是转化DStream中的每一个RDD。
Transform
通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操纵。
join
两个流之间的join将相同key的数据毗连到一起
4.2 有状态转化操纵
4.2.1 UpdateStateByKey
用于纪录汗青纪录,有时候我们需要在DStream中跨批次维护状态(比方wordcount),针对这种情况该方法为我们提供了一个状态变量的访问,用于键值对形式的Dstream。
UpdateStateByKey的效果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的
两步:
- 界说状态,状态可以是任意的数据范例
- 界说状态更新函数
例子
- object word5 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local
- [*]").setAppName("SparkStreaming") val ssc = new StreamingContext(conf,Seconds(3)) ssc.checkpoint("cp") val datas = ssc.socketTextStream("brace",9999) val wordToOne = datas.map((_,1)) // updateStateByKey根据key对数据的状态举行更新 // 通报的参数中含有两个值 // 第一个值体现相同key的value数据 // 第二个值体现缓存区相同key的value数据 val state = wordToOne.updateStateByKey ( (seq:Seq[Int], buff:Option[Int])=>{ val newCount = buff.getOrElse(0)+seq.sum Option(newCount) } ) state.print() ssc.start() ssc.awaitTermination() }}
复制代码 4.2.2 Window Operations
可以设置窗口的巨细和滑动窗口的隔断来动态的获取当前Steaming的允许状态。基于窗口的操纵会在一个比streamingcontext的批次隔断更长的时间范围内,通过整合多个批次的效果,盘算出整个窗口的效果。
应用场景
厘革(交通流量等)
- object word6 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local
- [*]").setAppName("SparkStreaming") val ssc = new StreamingContext(conf,Seconds(3)) val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "brace:9092", ConsumerConfig.GROUP_ID_CONFIG -> "aaa", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", ) val kafkaDataDS = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Set("aaa"),kafkaParams) ) val value = kafkaDataDS.map(_.value()) // 参数:隔断(巨细:收罗周期的整数倍)、滑动隔断(步长:收罗周期的整数倍) value.window(Seconds(9),Seconds(3)) value.print() ssc.start() ssc.awaitTermination() }}
复制代码 5 DStream输出
输出操纵如下:
- print()
- saveAsTextFile(prefix,[suffix]):以txt文件形式存储DStream的内容
- saveAsObjectFiles(prefix,[suffix]):以java对象序列化的方式将数据生存为SequenceFiles
- saveAsHadoopFiles
- foreachRDD()
- import java.sql.{PreparedStatement, Connection, DriverManager}object word8 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local
- [*]").setAppName("SparkStreaming") val ssc = new StreamingContext(conf,Seconds(3)) val lines = ssc.socketTextStream("brace", 9998) val words = lines.flatMap(_.split(" ")) val wordToOne = words.map((_,1)) val WordToCount = wordToOne.reduceByKey(_+_) WordToCount.print() WordToCount.foreachRDD(rdd => { //内部函数 def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/spark" val user = "root" val password = "Hive@2020" conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into wordcount(word,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) ssc.start() ssc.awaitTermination() }}
复制代码 6 优雅地关闭
来源:https://blog.csdn.net/qq_45650899/article/details/112008647
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |