Demo entry 6364042

architettura streaming

   

Submitted by anonymous on May 15, 2017 at 10:46
Language: Scala. Code size: 3.5 kB.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import java.util.{Date, Properties}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._

object Traccia2014{
  def main(args: Array[String]){
    if (args.length < 4) {
      System.err.println(s"""
        |Usage:<broker><topicRisultato><slice><filtro>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }
val Array(brokers,risultato,slice,filtro) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val sc=new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(8))
val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="user/root/split", #HDFS, cambiare S3 eventualmente
filter = (path: org.apache.hadoop.fs.Path) => (!path.getName.endsWith(".csv._COPYING_")),newFilesOnly = true).map(_._2.toString)
println("in ascolto su nuovi file")
//********* Definizioni Producer***********
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
lines.foreachRDD( rdd => {
 if(!rdd.isEmpty){
 val start=System.currentTimeMillis()/1000
 val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
 if(!min.isEmpty){
 val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice.toInt).round*slice.toInt+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
 if(!ipDst.isEmpty){
 val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice.toInt).round*slice.toInt+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
 if(!ipSrc.isEmpty){
 val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }
 val RapportoFiltrato=Rapporto.filter{case (key, value) => value > filtro.toInt }
 println("###(ConsumerScala) CalcoloRapporti: ###")
//****trasimissione risultato******
val end=System.currentTimeMillis()/1000
val differenza=end-start
val str = RapportoFiltrato.collect().mkString("\n")
println(s"###(ConsumerScala) Produco Risultato : ${str}"+"\n"+"Tempo di esecuzione1: "+differenza.toString+"sec."+"\n")
val message = new ProducerRecord[String, String](risultato, null, str)
producer.send(message)
//  Thread.sleep(1000)
}else{
println("src vuoto")
}
}else{
println("dst vuoto")
}
}else{
println("min vuoto")
}
 }else
 {
println("rdd vuoto")
}
})//foreach
 ssc.start()
 ssc.awaitTermination()
  }
}

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).