博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark-Streaming
阅读量:4512 次
发布时间:2019-06-08

本文共 9500 字,大约阅读时间需要 31 分钟。

目录

监听文件-定时文件监听
import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.SparkConf// local[3] 至少开启2个; 一个用于监听文件,一个用于处理数据val sparkConf = new SparkConf().setAppName("fileStream").setMaster("local[3]")// 间隔20秒查看一次val ssc = new StreamingContext(sparkConf, Seconds(10))// 设置监听的文件夹val lines = ssc.textFileStream("D:\\sparkStreamLog")// 处理并打印监听到的内容lines.print()// 开启 spark streamssc.start()//阻塞等待计算ssc.awaitTermination()
-------------------------------------------Time: 1565595490000 ms-------------------------------------------此处打印监听的内容...-------------------------------------------Time: 1565595500000 ms-------------------------------------------
监听文件-结构化数据流

数据

{"name":"json","age":23,"hobby":"running"}{"name":"charles","age":32,"hobby":"basketball"}{"name":"tom","age":28,"hobby":"football"}{"name":"lili","age":24,"hobby":"running"}{"name":"bob","age":20,"hobby":"swimming"}
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.streaming.{Seconds, StreamingContext}val spark = SparkSession.builder().appName("dStream_1").master("local[*]").getOrCreate()// 导入rdd的隐式转换import spark.implicits._// 因为输入的数据是结构化数据,因此在创建DataFrame时,需要先定义好schema,这样spark程序才知道如何解析json数据;val userSchema = new StructType().add("name","string") // 定义name字段.add("age","integer").add("hobby","string")// 创建DataFrameval userDF = spark.readStream.schema(userSchema)  // 设置字段解析.json("D:/JsonFile")  // 读取数据文件val userlittler25DF = userDF.filter($"age"<25)  // 筛选age小于25的数据val hobbyDF = userlittler25DF.groupBy("hobby").count()  // 对hobby字段进行分组求和val query = hobbyDF.writeStream.outputMode("complete")  // 设置输出模式为complete.format("console") // 输出到控制台.start() // 开始执行query.awaitTermination()  // 等待执行结果
-------------------------------------------Batch: 1-------------------------------------------+--------+-----+|   hobby|count|+--------+-----+| running|    2||swimming|    1|+--------+-----+
监听端口套接字

随机读取文件数据发送

table.txt 数据

scala javajava C++C C++ PHPC++ PHPpython C++PHP java

数据发送服务程序

import java.io.PrintWriterimport java.net.ServerSocketimport scala.io.Sourcedef main(args: Array[String]): Unit = {    val file = "D:\\sparkStreamLog\\change\\table.txt"    //读取文件的每一行数据到list里面    val lines = Source.fromFile(file).getLines().toList    //获得数据行数    val rowCount = lines.length    //设置客户端的端口    val listen = new ServerSocket(6666)    while (true) {        // 等待端口被连接        val socket = listen.accept()        // 创建一个端口连接后的处理线程        val thread = new Thread() {            //重写run方法,线程启动后自动调用run方法            override def run = {                // 打印客户端的 IP地址                println("客户端地址为:" + socket.getInetAddress)                // 获取的客户端的输出流(可以向服务器发送(写)数据)                val send = new PrintWriter(socket.getOutputStream, true)                while (true) {                    //每隔3秒发送一次数据                    Thread.sleep(3000)                    // 随机获取 list里的一条数据                    val content = lines(index(rowCount))                    println("******")                    println(content)                    // 向服务器发送一条数据                    send.write(content + "\n")                    // 刷新写入                    send.flush()                }                // 当连接断开时,socket也断开连接                socket.close()            }        }        //启动处理线程        thread.start()    }}// 生成一个 0到length的随机数def index(length: Int): Int = {    val rd = new java.util.Random()    rd.nextInt(length) //随机获取0-length 之间的一个数}
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}val Conf = new SparkConf().setAppName("套接字流").setMaster("local[2]")val ss = new StreamingContext(Conf, Seconds(6))// 监听客户端正在使用的 6666 端口,接收发送的信息val lines = ss.socketTextStream("localhost", 6666)// 处理和打印 发送过来的数据 进行WordCount处理lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey((x, y) => x + y).print()//启动 spark streamss.start()// 等待接收和处理数据ss.awaitTermination()
-------------------------------------------Time: 1565598120000 ms-------------------------------------------(PHP,1)(java,2)(C++,1)-------------------------------------------Time: 1565598126000 ms-------------------------------------------(scala,2)(java,2)

监听端口数据-统计包括历史数据

import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))// 设置检查点,检查点具有容错机制; 用于存放之前处理好的数据的文件夹sc.checkpoint("D:\\sparkStreamLog\\change")// 从6666端口读取发送过来的数据val lines = sc.socketTextStream("localhost", 6666)//定义状态更新函数,历史数据和新数据的处理方式val updateFunc = (values: Seq[Int], state: Option[Int]) => {    //      values: 新数据key的values   state: 历史数据key的values    // 新数据的value求和    val currentCount = values.foldLeft(0)(_ + _) //初始值为 0 防止遍历到其他key的value为空时相加出现异常    // 获取历史数据key对应的值,没有则返回 0    val previousCount = state.getOrElse(0)    println("#################", values.toBuffer, state.toBuffer, "\t result: " + (currentCount + previousCount))    //返回 新数据value+历史数据value    Some(currentCount + previousCount)}// 处理数据val wordDstream = lines.flatMap(_.split(" ")).map(x => (x, 1)).updateStateByKey[Int](updateFunc) // 更新数据, 将之前的结果和现在处理的结果合并统计并输出//    保存数据到MySQLwordDstream.foreachRDD(rdd => {    // 创建用于将每条数据插入MySQL数据库的方法, 接受一个装数据的迭代器    def funChange(words: Iterator[(String, Int)]): Unit = {        // 创建MySQL连接        var conn: Connection = null        // 用于执行SQL语句        var stat: PreparedStatement = null        try {            val url = "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC"            val user = "root"            val password = "123456"            // 连接MySQL            conn = DriverManager.getConnection(url, user, password)            // 遍历每一条数据            words.foreach(word => {                val sql = "insert into fromsparkdata(word,counts) values (?,?)"                // 包装SQL语句(反SQL语句注入)                stat = conn.prepareStatement(sql)                // 向第一个 ? 放入数据                stat.setString(1, word._1.trim)                // 第二个 ? 放入数据                stat.setInt(2, word._2.toInt)                // 提交执行 更新数据的操作                stat.executeUpdate()            })        } catch {            case e: Exception => e.printStackTrace()        } finally {            // 关闭操作            if (stat != null) {                stat.close()            }            // 关闭连接            if (conn != null) {                conn.close()            }        }    }    // 重新划分为3个分区    val reparRdd = rdd.repartition(3)    // 遍历每一个分区的数据迭代    reparRdd.foreachPartition(funChange)})//    保存数据到文件夹//    wordDstream.saveAsTextFiles("D:\\sparkStreamLog\\change\\data\\")// 开启 spark stream 开始监听sc.start()sc.awaitTermination()
新数据统计的个数  历史数据的个数       返回的个数(#################,ArrayBuffer(1),ArrayBuffer(),     result: 1)(#################,ArrayBuffer(1),ArrayBuffer(),     result: 1)(#################,ArrayBuffer(1, 1),ArrayBuffer(),  result: 2)(#################,ArrayBuffer(1),ArrayBuffer(),     result: 1)(#################,ArrayBuffer(1),ArrayBuffer(1),    result: 2)(#################,ArrayBuffer(),ArrayBuffer(1),     result: 1)
监听端口结构化数据
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}val spark = SparkSession.builder().appName("dStream_1").master("local[*]").getOrCreate()import spark.implicits._val lines = spark.readStream.format("socket") // 读取的数据流类型.option("host", "localhost") // IP地址.option("port", 6666) // 端口.load() // 监听数据 返回DataFrame类型// lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例 type DataFrame = Dataset[Row]val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete") // outputMode设置了’complete’模式,即每次都输出全部结果数据.format("console") // format定义输出媒介,这里为控制台.start() // 开始 查询query.awaitTermination() // 等待查询结果
-------------------------------------------Batch: 0-------------------------------------------+-----+-----+|value|count|+-----+-----+|  C++|    1||    C|    1||  PHP|    1|+-----+-----+-------------------------------------------Batch: 1-------------------------------------------+------+-----+| value|count|+------+-----+|   C++|    5||     C|    2|| scala|    1||   PHP|    5||  java|    2||python|    1|+------+-----+
监听RDD队列数据
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}val sparkConf = new SparkConf().setAppName("RDDQueue").setMaster("local[2]")// 10 秒检查一次队列是否有新数据(以添加一次的数据就处理一次,时间只是处理的间隔时间)val ssc = new StreamingContext(sparkConf, Seconds(2))// 创建可变的rdd队列val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()// 监听rdd队列val queueStream = ssc.queueStream(rddQueue)//处理数据并打印queueStream.map(r => (r % 10, 1)).reduceByKey(_ + _).print()//启动 spark streaming 开始监听ssc.start()//每隔3秒添加数据到队列里面for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)Thread.sleep(3000)}// 等待数据处理完再关闭Thread.sleep(30000)ssc.stop()
-------------------------------------------Time: 1565598778000 ms-------------------------------------------(4,10)(0,10)(6,10)(8,10)(2,10)(1,10)(3,10)(7,10)(9,10)(5,10)

转载于:https://www.cnblogs.com/studyNotesSL/p/11341313.html

你可能感兴趣的文章
ctypes调用dll的参数问题
查看>>
微信支付接口的调用(转)
查看>>
XSS攻击
查看>>
百度下的SEO规则
查看>>
【资料整理】一些英语面试题整理
查看>>
Android真机调试的时候logcat中无法输出调试信息的解决办法
查看>>
这个七夕,送你一份程序员教科书级别的告白指南
查看>>
如何优雅的写一篇安利文-以Sugar ORM为例
查看>>
Could not find a storyboard named 'Main' in bundle NSBundle
查看>>
HTML5 Drag & Drop
查看>>
关于git
查看>>
来自java文档 File类
查看>>
.net下使用最小堆实现TopN算法
查看>>
最强日期正则表达式
查看>>
get与post区别
查看>>
mysql中级操作
查看>>
python类之魔法方法
查看>>
International Conference for Smart Health 2015 Call for Papers
查看>>
面向对象编程的介绍:三大特征:封装性, 继承, 多态.
查看>>
netty是什么?
查看>>