Apache Spark GraphX: Getting Started
Linking
In order to provide your maven project by Spark GraphX component please add the following links in your pom file:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.1.0</version>
</dependency>
Imports
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
Create Graph RDD
Let's create a simple Graph for the following case. For instance, we have three vertices: each representing yhe city: Kazan, Moscow and Saint-Petersburg:
1) Create an RDD for vertices
//create a vertices RDD
val vertices = Array((1L, "Kazan"), (2L, "Moscow"), (3L, "Petresburg"))
val vericesRDD = sc.parallelize(vertices)
2) Create an RDD for edges
//create edges RDD
val edges = Array(Edge(1L, 2L, 810), Edge(1L, 3L, 1500), Edge(2L, 3L, 700));
val edgesRDD = sc.parallelize(edges)
3) Create an Graph
//create a Graph
val graph = Graph(vericesRDD, edgesRDD)
4) Print vertices and edges
//print all vertices of graph
graph.vertices.foreach(println)
//print all edges of graph
graph.edges.foreach(println)
5) Print edge triplets
A triplet is created by adding source and destination attributes an edge
//print edge triplets
graph.triplets.collect.foreach(println)
Triplets in output:
The full source code:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
object GraphExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SparkGraphX")
.set("spark.executor.memory", "2g")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//create a vertices RDD
val vertices = Array((1L, "Kazan"), (2L, "Moscow"), (3L, "Petresburg"))
val vericesRDD = sc.parallelize(vertices)
//create edges RDD
val edges = Array(Edge(1L, 2L, 810), Edge(1L, 3L, 1500), Edge(2L, 3L, 700));
val edgesRDD = sc.parallelize(edges)
//create a Graph
val graph = Graph(vericesRDD, edgesRDD)
//print all vertices of graph
graph.vertices.foreach(println)
//print all edges of graph
graph.edges.foreach(println)
//print edge triplets
graph.triplets.collect.foreach(println)
}
}