请选择 进入手机版 | 继续访问电脑版

SparkStreaming三种流计算方式

[复制链接]
甜蜜的负担 发表于 2021-1-2 17:41:11 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
几个知识:
文件流 FileStream继承与Stream类,一个FileStream类的实例实际上代表一个文件流,使用FileStream类可以对文件系统上的文件举行读取、写入、打开和关闭操纵。
打开一个终端窗口,创建需要的文件夹以及文件
Spark Streaming是构建在Spark上的实时盘算框架,且是对Spark Core API的一个扩展,它可以大概实现对流数据举行实时处理处罚,并具有很好的可扩展性、高吞吐量和容错性。

DStream会被按照时间隔断分别成一批一批的RDD(准实时性/近实时性(不是100%的实时)
创建steamingContext的方式

DStream是Spark Streaming提供的根本抽象。它表现一连的数据流,可以是从源吸收的输入数据流,也可以是通过转换输入流生成的已处理处罚数据流。
一.文件流
A.
1.进入spark-shell
hadoop@ubuntu:/usr/local/spark$ ./bin/spark-shell
Spark-Shell:Spark的shell作为一个强大的交互式数据分析工具,提供了一个简朴的方式学习API。它可以使用Scala(在Java虚拟机上运行现有的Java库的一个很好方式)或Python。

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(20))//分段周期是20秒
scala> val lines = ssc.textFileStream(“file:///usr/local/spark/mycode/streaming/logfile”)//界说输入源,文件流范例的inputstream
scala> val words = lines.flatMap(.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(
+ _)
scala> wordCounts.print()
scala> ssc.start()//调用start开始担当数据和处理处罚流程
scala> ssc.awaitTermination()
2.导入包

3.为了初始化SparkStreaming步调,得以进入SparkSreaming的所有流操纵,需要创建一个Spark Streaming所有流操纵的主要入口。
创建SparkStreaming的入口StreamingContext。
val ssc = new StreamingContext(sc, Seconds(20))//分段周期是20秒

方法是通过SparkContext和批处理处罚时间隔断,SparkContext是指
类似于javaSpringContext,是一个容器,内里装各种各样的资源。

4.界说输入源:文件流范例

5.对文件流范例的输入源举行转换操纵


map: 对RDD每个元素转换
flatMap: 对RDD每个元素转换, 然后再扁平化即将所有对象归并为一个zhuan对象)

6.举行词频统计


7.开始吸收数据和处理处罚数据

8.监听效果
没有新文件在文件目次下时

在/usr/local/spark/mycode/streaming/logfile目次下面创建log2.txt文件

可以看到监听窗口监听到了文件夹中的内容

B.接纳独立应用步调方式创建文件流
在监听窗口:
在目次下创建scala文件,写入相应内容

创建sbt文件

在sbt文件中写入下列内容

SBT是一个现代构建工具,SBT安装。从https://www.scala-sbt.org/download.html官网上寻找所需要的安装包

将下载的sbt文件中的sbt-launch.jar拷贝到/usr/local/下的sbt目次中的一级目次下


直接传输会拒绝访问

先放进可以放的地方,再用下令移动。

执行sbt打包编译

启动步调

到文件创建的Shell窗口

按键盘Ctrl+C大概Ctrl+D停止监听步调,就可以看到监听窗口的屏幕上会打印出单词统计信息

按照‘二’的调文件部分调解文件后
开启步调

/usr/local/spark/bin/spark-submit --class “WordCountStreaming” /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar
在另一个窗口创建文件,输入内容

可以看到监听效果

二.套接字流

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket实在就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口反面,对用户来说,一组简朴的接口就是全部,让Socket去组织数据,以符合指定的协议。以下是socket工作原理:
1.服务器根据地点范例(ipv4,ipv6)、socket范例、协议创建socket
2.服务器为socket绑定ip地点和端标语
3.服务器socket监听端标语请求,随时准备吸收客户端发来的毗连,这时候服务器的socket并没有被打开
4.客户端创建socket
5.客户端打开socket,根据服务器ip地点和端标语试图毗连服务器socket
6.服务器socket吸收到客户端socket请求,被动打开,开始吸收客户端请求,直到客户端返回毗连信息。
7.客户端毗连乐成,向服务器发送毗连状态信息,服务器accept方法返回,毗连乐成
8.客户端向socket写入信息
9.服务器读取信息
10.客户端关闭 服务器端关闭、
在对应目次下分别创建两个文件

一个写入
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel//存储级别
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount “)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()//设置日志显示级别
val sparkConf = new SparkConf().setAppName(“NetworkWordCount”).setMaster(“local[2]”)
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)//生存数据方式,内存和磁盘同时
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(
+ _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
另一个写入
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. /
object StreamingExamples extends Logging {
/
* Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark’s default logging, then we override the
// logging level.
logInfo(“Setting log level to [WARN] for streaming example.” +
" To override add a custom log4j.properties to the classpath.”)
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
打包

执行下列下令时堕落


启动nc窗口
调解文件

打包文件

执行步调

/usr/local/spark/bin/spark-submit --class “org.apache.spark.examples.streaming.NetworkWordCount” /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
将日志处理处罚文件也复制过来

打包

输入/usr/local/spark/bin/spark-submit --class “org.apache.spark.examples.streaming.NetworkWordCount” /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9998执行步调

nc窗口,启动nc步调

三.RDD队列流
参考http://dblab.xmu.edu.cn/blog/1085-2/
1.启动spark

2.使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
新建一个TestRDDQueueStream.scala代码文件,功能是:每隔1秒创建一个RDD,Streaming每隔2秒就对数据举行处理处罚

sbt打包


按照下面步调执行步调



来源:https://blog.csdn.net/tianchengyiyi/article/details/111361811
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )