PageRank Algorithm
PageRank measures the importance of each vertex in a graph. PageRank was started by Google's founders, who used the theory that the most important pages on the Internet are the pages with the most links leading to them. PageRank also looks at the importance of a page leading to the target page. So, if a given web page has incoming links from higher rank pages, it will be ranked higher.
Apache Spark GraphX PageRank
GraphX includes a number of built-in algorithms to leverage.
PageRank is one of the more popular ones popularized by the Google Search Engine and created by Larry Page.
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given indata/graphx/users.txt
, and a set of relationships between users is given in data/graphx/followers.txt
. We compute the PageRank of each user as follows:
1) Let's we have the following users (data/graphx/users.txt
):
1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys
2) And the following relationships between users (data/graphx/followers.txt
):
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7
3) So the source code of Spark's GraphX PageRank:
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.{SparkConf, SparkContext}
object PageRank {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val followers_data = args(0)
val users_data = args(1)
val conf = new SparkConf()
.setAppName("SparkGraphX")
.set("spark.executor.memory", "2g")
.setMaster("local[*]")
val sc = new SparkContext(conf)
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, followers_data)
graph.triplets.foreach(println)
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile(users_data).map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
}
}
And result will be:
(justinbieber,0.15)
(matei_zaharia,0.7013599933629602)
(ladygaga,1.390049198216498)
(BarackObama,1.4588814096664682)
(jeresig,0.9993442038507723)
(odersky,1.2973176314422592)