Practice 8: Spark GraphX - Analyze Bike Routes Data
The goal of this practice is to perfom basic graph analysis using the GraphFrames package available on spark-packages.org. We will analyze publicly available bike data from the Bay Area Bike Share portal.
Load Input Dataset
As an input we will have two inpu files: bike stations and bike trips.Let's load data to Spark's DataFrame:
//Initialize SparkSession
val sparkSession = SparkSession
.builder()
.appName("spark-sql-basic")
.master("local[*]")
.getOrCreate()
//Read stations
val stations = sparkSession.read
.option("header", "true")
.option("delimiter", ",")
.option("nullValue", "")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.csv(stations_txt)
stations.show()
Also load trips data to dataframe.
Stations dataframe:
Trips dataframe:
Create Trips GraphFrame
GraphFrame as an extension of Spark GraphX by Databriks library. Please follow to get more information.
GraphFrames support general graph processing, similar to Apache Spark’s GraphX library. However, GraphFrames are built on top of Spark DataFrames
//Create a graph
val stationVertices = stations.withColumnRenamed("station_id", "id").distinct()
val tripEdges = trips
.withColumnRenamed("Start Terminal", "src")
.withColumnRenamed("End Terminal", "dst")
val stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()
stationGraph.triplets.show()
Carry out some analysis
1) Find the most popular bike stations via PageRank algorithm:
val ranks = stationGraph.pageRank.resetProbability(0.15).maxIter(10).run()
ranks.vertices.orderBy(desc("pagerank")).show()
2) Get the the most common destinations in the dataset from location to location
//Get the the most common destinations in the dataset from location to location
val topTrips = stationGraph
.edges
.groupBy("src", "dst")
.count()
.orderBy(desc("count"))
.limit(10)
3) Find the stations with lots of inbound and outbound trips
val inDeg = stationGraph.inDegrees
inDeg.orderBy(desc("inDegree")).limit(5).show()
val outDeg = stationGraph.outDegrees
outDeg.orderBy(desc("outDegree")).limit(5).show()