Demo entry 6629771

DataFrame

   

Submitted by anonymous on Jul 07, 2017 at 00:17
Language: Scala. Code size: 874 Bytes.

/** DataFrame operations inside your streaming program */
val dataStream: DStream[String] = ...

// For each RDD in the stream
dataStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val streamDataFrame = rdd.toDF("data")

  // Load model we trained earlier
  val trainedModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

  // Make predictions on streaming data using the Transformer.transform() method 
  model.transform(test)
    .select("id", "text", "probability", "prediction")
    .collect()
    .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
      println(s"($id, $text) --> prob=$prob, prediction=$prediction")
    }
}

This snippet took 0.00 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).