Pipeline Example: RandomForestClassifier

The full source code of the task : San Francisco Crime Classification

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import java.time.LocalTime
import java.time.format.DateTimeFormatter

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

/**
  * Created by ALINA on 24.02.2018.
  */
object CrimeRFClassifier {

  def main(args: Array[String]): Unit = {

    val inputFile = args(0);
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    //Initialize SparkSession
    val sparkSession = SparkSession
      .builder()
      .appName("spark-read-csv")
      .master("local[*]")
      .getOrCreate();

    //Read file to DF
    val crimes = sparkSession.read
      .option("header", "true")
      .option("delimiter", ",")
      .option("nullValue", "")
      .option("treatEmptyValuesAsNulls", "true")
      .option("inferSchema", "true")
      .csv(inputFile)

    crimes.show(100)
    crimes.printSchema()

    import sparkSession.implicits._;

    val dayOrNight = udf {
      (h: Int) =>
        if (h > 5 && h < 18) {
          "Day"
        } else {
          "Night"
        }
    }

    val weekend = udf {
      (day: String) =>
        if (day == "Sunday" || day == "Saturday") {
          "Weekend"
        } else {
          "NotWeekend"
        }
    }

    val df = crimes
      .withColumn("HourOfDay", hour(col("Dates")))
      .withColumn("Month", month(col("Dates")))
      .withColumn("Year", year(col("Dates")))
      .withColumn("HourOfDay", hour(col("Dates")))

    val df1 = df
      .withColumn("DayOrNight", dayOrNight(col("HourOfDay")))
      .withColumn("Weekend", weekend(col("DayOfWeek")))

    var categoryIndex = new StringIndexer().setInputCol("Category").setOutputCol("CategoryIndex")
    var dayIndex = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekIndex")
    var districtIndex = new StringIndexer().setInputCol("PdDistrict").setOutputCol("PdDistrictIndex")
    // var addressIndex = new StringIndexer().setInputCol("Address").setOutputCol("AddressIndex")
    var dayNightIndex = new StringIndexer().setInputCol("DayOrNight").setOutputCol("DayOrNightsIndex")

    val assembler = new VectorAssembler().setInputCols(Array(
      "DayOfWeekIndex", "PdDistrictIndex", "HourOfDay", "Month"))
      .setOutputCol("indexedFeatures")

    val Array(training, test) = df1.randomSplit(Array(0.7, 0.3))

    val rf = new RandomForestClassifier()
      .setLabelCol("CategoryIndex")
      .setFeaturesCol("indexedFeatures")
      .setMaxDepth(10)
      .setMaxBins(100)

    val pipeline = new Pipeline()
      .setStages(Array(categoryIndex, dayIndex, districtIndex, assembler, rf))

    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("CategoryIndex")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")

    val paramGrid = new ParamGridBuilder()
      .addGrid(rf.impurity, Array("entropy", "gini"))
      .build()

    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(3)

    val model = cv.fit(training)

    val predictions = model.transform(test)
    predictions.show()

    val accuracy = evaluator.evaluate(predictions)
    println("Test Error = " + (1.0 - accuracy))
  }
}

results matching ""

    No results matching ""