图15 IntelliJ IDEA平分析器运行结不雅
stateful应用案例
在很多半据流相干的实际应用处景中,对当前数据的统计分析须要借助于先前的数据处理结不雅完成。例如电商每距离10分钟统计某一商品当前累计发卖总额、趁魅站每隔3小时统计当前客流总量,等等。词攀类应用需求可借助于Spark Streaming的有状况转换操作实现。
功能需求
监听收集中某节点上指定端口传输的数据流(slave1节点9999端口的英文文本数据,以逗号距离单词),每5秒分别统计各单词典累计出现次数。
代码实现
本案例功能的实现涉及数据流模仿苹赝分析器两部分。
分析器代码:
- package dong.spark
- import org.apache.spark.{SparkContext, SparkConf}
- import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
- import org.apache.spark.streaming.StreamingContext._
- object StatefulWordCount {
- def main(args:Array[String]): Unit ={
- /*定义更新状况办法,参数values为当前批处理时光距离内各单词出现的次数,state为以往所有批次各单词累计出现次数。*/
- val updateFunc=(values: Seq[Int],state:Option[Int])=>{
- val currentCount=values.foldLeft(0)(_+_)
- val previousCount=state.getOrElse(0)
- Some(currentCount+previousCount)
- }
- val conf=new SparkConf().
- setAppName("StatefulWordCount").
- setMaster("spark://192.168.149.132:7077")
- val sc=new SparkContext(conf)
- //创建StreamingContext,Spark Steaming运行时光距离为5秒。
- val ssc=new StreamingContext(sc, Seconds(5))
- /*应用updateStateByKey时须要checkpoint持久化接收到的数据。在集群模式下运行时,须要将持久化目次设为HDFS上的目次。*/
- ssc.checkpoint("hdfs://master:9000/user/dong/input/StatefulWordCountlog")
- /*经由过程Socket获取指定节点指定端口的数据创建DStream,个中节点与端口分别由参数args(0)和args(1)给出。*/
- val lines=ssc.socketTextStream(args(0),args(1).toInt)
- val words=lines.flatMap(_.split(","))
- val wordcounts=words.map(x=>(x,1))
- //应用updateStateByKey来更新状况,筒计大年夜运行开端以来单词总的次数。
- val stateDstream=wordcounts.updateStateByKey[Int](updateFunc)
- stateDstream.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
第2步,打包分析器。master节点启动IntelliJ IDEA创建工程StatefulWordCount编辑分析器,如图16所示,并将分析器直接打包至master节点dong用户的主目次下,如图17所示。
图16 IntelliJ IDEA中StatefulWordCount示意图
图17 master上的StatefulWordCount.jar示意图
第1步,slave1节点启动数据流模仿器。
推荐阅读
实现了功能虚拟化的收集可以或许使通信办事供给商快速供给办事、分析和主动化的收集,加快新办事投向市场的周期,并有效应用数据中间的通用平台。收集功能虚拟化旨在赞助电信行业加快立异>>>详细阅读
本文标题:大数据分析技术与实战之Spark Streaming
地址:http://www.17bianji.com/lsqh/37783.html
1/2 1