文本文件数据处理案例
功能需求
及时`听并获取本地home/dong/Streamingtext目次中新生成的文件(文件均为英文文本文件,单词之间应用空格进行距离),并对文件中各单词出现的次数进行统计。
代码实现
- package dong.spark
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds,StreamingContext}
- import org.apache.spark.streaming.StreamingContext._
- object StreamingFileWordCount {
- def main(args: Array[String]): Unit ={
- //以local模式运行,并设定master节点工作线程数为2。
- val sparkConf = new SparkConf().
- setAppName("StreamingFileWordCount").
- setMaster("local[2]")
- /*创建StreamingContext实例,设定批处理时光距离为20秒。*/
- val ssc = new StreamingContext(sparkConf,Seconds(20))
- /*指定命据源来自本地home/dong/Streamingtext。*/
- val lines = ssc.textFileStream("/home/dong/Streamingtext")
- /*在每个批处理时光距离内,对指定文件夹中变更的数据进行单词统计并打印。*/
- val words= lines.flatMap(_.split(" "))
- val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)
- wordcounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
运行演示
第1步,启动Hadoop与Spark。
第2步,创建Streaming监控目次。
- $ mkdir /home/dong/Streamingtext
在dong用户主目次下创建Streamingtext为Spark Streaming监控的目次,创建后如图6所示。
第3步,在IntelliJ IDEA中编辑运行Streaming法度榜样。在IntelliJ IDEA中创建工程StreamingFileWordCount,编辑对象StreamingFileWordCount,如图7所示。
对于批处理时光和窗口大年夜小的设定,并没有同一的标准。平日是先年腋荷琐比较大年夜的批处理时光(10秒阁下)开端,然后赓续地应用更小的值进行比较测试。如不雅Spark Streaming用户界面中显示的处理时光保持不变,则可以进一步设定更小的值;如不雅处理时光开端增长,则可能已经达到了应用的极限,再减小该值袈潋可能会影响体系的机能。
图7 IntelliJ IDEA中StreamingFileWordCount示意图
因为该示例没有输入参数,是以不须要设备参数,可直接单击右键->单击"Run‘StreamingFileWordCount’ "。
第4步,在监听目次下创建文本文件。在master节点上的/home/dong/Streamingtext平分别创建file1.txt与file2.txt。
file1.txt内容如下:
- aa
- bb
file2.txt内容如下:
- ee
- dd
- cc
创建后,/home/dong/Streamingtext中内容如图8所示。
图8 Streamingtext文件夹内容示意图
终端窗口输出了每个批处理时光距离(20秒)内,/home/dong/Streamingtext中新生成文件所包含的各单词个数,如图9所示。
图9 StreamingFileWordCount运行结不雅示意图
收集数据处理案例
功能需求
监听本地节点指定端口传输的数据流(本案例为master节点9999端口的英文文本数据,以逗号距离单词),每5秒统计一次该时光距离内收集到的各单词典个数。
代码实现
本案例涉及数据流模仿苹赝分析器两部分。为了更接近真实的收集情况,起首定义数据流模仿器,该模仿器以Socket方法监听收集中指定节点上的指定端标语(master节点9999端口),当外部法度榜样经由过程该端口连接并请求数据时,数据流模仿器将准时地大年夜指定文本文件中随机拔取数据发送至指定端口(每距离1秒钟数据流模仿器大年夜master节点上的/home/dong/Streamingtext/file1.txt中随机朝长进步一行文本发送给master节点的9999端口),经由过程这种方法模仿收集情况下源源赓续的数据流。针对获取到的及时数据,再定义分析器(Spark Streaming应用法度榜样),用以统计时光距离(5秒)内收集到的单词个数。
推荐阅读
实现了功能虚拟化的收集可以或许使通信办事供给商快速供给办事、分析和主动化的收集,加快新办事投向市场的周期,并有效应用数据中间的通用平台。收集功能虚拟化旨在赞助电信行业加快立异>>>详细阅读
本文标题:大数据分析技术与实战之Spark Streaming
地址:http://www.17bianji.com/lsqh/37783.html
1/2 1