数据流模仿器代码实现如下:
- package dong.spark
- import java.io.{PrintWriter}
- import java.net.ServerSocket
- import scala.io.Source
- objectSocketSimulation {
- //定义随机获取整数的办法。
- def index(length: Int)={
- import java.util.Random
- val rdm = new Random
- rdm.nextInt(length)
- }
- def main(args:Array[String]): Unit ={
- if(args.length!=3){
- /*调用数据流模仿器须要三个参数:文件路径、端标语和批处理时光距离时光(单位:毫秒)。*/
- System.err.println("Usage:<filename><port><millisecond>")
- System.exit(1)
- }
- //获取指定文件总的行数。
- val filename = args(0)
- val lines = Source.fromFile(filename).getLines().toList
- val filerow=lines.length
- //指定监听参数args(1)指定的端口,当外部法度榜样请求时建立连接。
- val listener =new ServerSocket(args(1).toInt)
- while(true){
- val socket = listener.accept()
- new Thread(){
- override def run={
- println("Got client connected from:"+socket.getInetAddress)
- val out = new PrintWriter(socket.getOutputStream(),true)
- while(true){
- Thread.sleep(args(2).toLong)
- //当该端口接收请求时,随机获取某行数据发送给对方。
- val content= lines(index(filerow))
- println(content)
- out.write(content+'\n')
- out.flush()
- }
- socket.close()
- }
- }.start()
- }
- }
- }
- package dong.spark
- import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
- import org.apache.spark.streaming.StreamingContext._
- import org.apache.spark.storage.StorageLevel
- object NetworkWordCount {
- def main (args:Array[String]) ={
- //以local模式运行,并设定master节点工作线程数为2。
- val conf=new SparkConf().setAppName("NetworkWordCount").
- setMaster("local[2]")
- val sc=new SparkContext(conf)
- val ssc=new StreamingContext(sc, Seconds(5))
- /*经由过程socketTextStream获取指定节点指定端口的数据创建DStream,并保存在内存和硬盘中,个中节点与端口分别对应参数args(0)和args(1)。*/
- val lines=ssc.socketTextStream(args(0),
- args(1).toInt,
- StorageLevel.MEMORY_AND_DISK_SER)
推荐阅读
实现了功能虚拟化的收集可以或许使通信办事供给商快速供给办事、分析和主动化的收集,加快新办事投向市场的周期,并有效应用数据中间的通用平台。收集功能虚拟化旨在赞助电信行业加快立异>>>详细阅读
本文标题:大数据分析技术与实战之Spark Streaming
地址:http://www.17bianji.com/lsqh/37783.html
1/2 1