Demo entry 6746396

coursera - Getting and Cleaning data

   

Submitted by anonymous on May 30, 2018 at 04:18
Language: Scala. Code size: 8.4 kB.

import org.apache.spark.{SparkContext}
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object task {
  def main(args:Array[String]): Unit ={
    val conf =new SparkConf().setAppName("task1")
    //.setMaster("local[3]")
    val sc=new SparkContext(conf)

    //输入参数:原始数据路径,保存结果路径
    // example:
    // /opt/spark/bin/spark-submit --master local[3] --executor-memory 10G
    // --total-executor-cores 3 --class task
    // /home/stone/IdeaProjects/dataprocess/target/scala-2.11/dataprocess_2.11-0.1.jar
    // "file:///home/stone/rt_basic_20180418.csv" "hdfs://localhost:9000/result"
    // 需要新建result文件夹
    // /opt/hadoop/bin/hdfs dfs -mkdir /result
    if(args.length != 2){
      println("input: inputFilePath saveFilePath")
      System.exit(1)
    }
    val inputFilePath = args(0)
    val saveFilePath = args(1)


    val sqlContext = new SQLContext(sc)
    val df=sqlContext.read.option("header", "false").option("charset", "utf-8")
      .option("delimiter", "|").csv(inputFilePath).select("_c0","_c1","_c4","_c10","_c21","_c29","_c25")
      .withColumnRenamed("_c0","vin")
      .withColumnRenamed("_c1","trace_time")
      .withColumnRenamed("_c4","charging_stat")
      .withColumnRenamed("_c10","hvbatsoc")
      .withColumnRenamed("_c21","prot_ver")
      .withColumnRenamed("_c29","vehicle_stat")
      .withColumnRenamed("_c25","total_current")



    val table0=df.registerTempTable("table0")
    // 去除字符前后的空格,转换数据格式
    val df1=sqlContext.sql("select trim(vin) as vin," +
      "trim(trace_time) as trace_time, " +
      "trim(prot_ver) as prot_ver, " +
      "trim(vehicle_stat) as vehicle_stat, " +
      "trim(charging_stat) as charging_stat, " +
      "int(trim(hvbatsoc)) as hvbatsoc " +
      "float(trim(total_current) as total_current " +
      "from table0")
    val table1=df1.registerTempTable("table1")
    // 符合国标协议的筛选 不同的车辆,时刻
    val df2=sqlContext.sql("select distinct vin,substring(trace_time,12,5) as time, 1 as num " +
      "from table1 " +
      "where " +
      "((int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=3630 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=3600) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=5430 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=5400) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=7230 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=7200) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=9030 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=9000) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=10830 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=10800) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=12630 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=12600) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=14430 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=14400) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=16230 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=16200)) " +
      "and prot_ver = '1'" +
      "and vehicle_stat ='02'" +
      "and charging_stat = '03' " +
      "and hvbatsoc < 99 " +
      "and total_current = 0")
    val table2=df2.registerTempTable("table2")

    // 符合私有协议的筛选 不同的车辆,时刻
    val df3=sqlContext.sql("select distinct vin,substring(trace_time,12,5) as time, 1 as num " +
      "from table1 " +
      "where " +
      "((int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=3630 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=3600) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=5430 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=5400) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=7230 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=7200) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=9030 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=9000) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=10830 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=10800) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=12630 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=12600) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=14430 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=14400) or " +
      "(int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))<=16230 " +
      "and int(substring(trace_time,12,2))*3600+int(substring(trace_time,15,2))*60+int(substring(trace_time,18,2))>=16200)) " +
      "and prot_ver = '2'" +
      "and vehicle_stat ='0'" +
      "and charging_stat = '0' " +
      "and hvbatsoc < 99 " +
      "and total_current = 0")
    val table3=df3.registerTempTable("table3")

    // 合并结果
    val df4=sqlContext.sql("select * from table2 union select * from table3")
    val table4=df4.registerTempTable("table4")
    // 筛选出现次数大于等于2次的车辆
    val df5=sqlContext.sql("select vin,count(time) as num " +
      "from table4 " +
      "group by vin " +
      "having num>=2")
    val table5=df5.registerTempTable("table5")
    // 在table4中筛选出符合table5条件的 车辆,时刻
    val df6=sqlContext.sql("select vin,time from table4 where vin in (select vin from table5)")
    // 形成透视数据透视表 1表示可疑,0表示没问题
    val df7=sqlContext.sql("select vin,sum(case time when '01:00' then num else 0 end) as _01_00, " +
      "sum(case time when '01:30' then num else 0 end) as _01_30, " +
      "sum(case time when '02:00' then num else 0 end) as _02_00, " +
      "sum(case time when '02:30' then num else 0 end) as _02_30, " +
      "sum(case time when '03:00' then num else 0 end) as _03_00, " +
      "sum(case time when '03:30' then num else 0 end) as _03_30, " +
      "sum(case time when '04:00' then num else 0 end) as _04_00, " +
      "sum(case time when '04:30' then num else 0 end) as _04_30, " +
      "sum(num) as sum " +
      "from table4 " +
      "where vin in (select vin from table5) " +
      "group by vin")
    // 生成数据所在的日期
    val day =sqlContext.sql("select distinct substring(trace_time,1,10) as day " +
      "from table1 ").first().toString().slice(1,11)


    // 中间结果:异常可疑车辆出现的时刻
    df6.coalesce(1).write.option("header","true").csv(saveFilePath+"/"+day+"_vintime.csv")
    // 透视表:异常可疑车辆出现的时刻,以及总次数
    df7.coalesce(1).write.option("header","true").csv(saveFilePath+"/"+day+"_pivot.csv")
  }
}

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).