GraphX
Introduction
GraphX is a distributed graph processing framework that was built on top of Spark.
In this lab, we will introduce GraphX's Pregel-style vertex-centric programming model and apply this model to implement some graph algorithms. Background of Scala language is a must.
Example: find out the maximum vertex value among all vertices
Get Ready
- Download materials for this lab here and unzip it.
- Import the project into Eclipse and set "java build path" by adding the jar "spark-assembly-1.6.0-hadoop2.6.0.jar" in folder "lib".
Vertex-centric Thinking
Pregel-style vertex-centric computation model consists of a sequence of iterations, called supersteps. Each superstep S
follows a GAS (Gather-Apply-Scatter) paradigm:
- Gather: it receives and reads messages that are sent to
v
from the previous superstepS-1
. - Apply: it applies a user-defined function
f
to each vertices in parallel; that it,f
specifies the behaviour of a single vertexv
at a particular superstepS
. - Scatter: it may send messages to other vertices such that those vertices will receive the messages in the next superstep
S+1
.
In the application of finding out the maximum vertex value among all vertices, vertex weight may contain a pair (newValue, oldValue)
.
During a superstep, each vertex
- receives all messages sent to it from the previous superstep and compute their maximum value
maxMsg
. [Gather] - assigns original
newValue
tooldValue
and updatesnewValue
withmax(maxMsg, newValue)
. [Apply] - propogates its
newValue
to all its outgoing neighbours only whennewValue
andoldValue
are not equal. [Scatter]
Go Through the Example
Now let's turn to the code. The first step is to create the graph:
val vertexArray = Array( (1L, (9, -1)), // (vertexID, (new vertex value, old vertex value) ) (2L, (1, -1)), (3L, (6, -1)), (4L, (8, -1))) val edgeArray = Array( Edge(1L, 2L, 1), // Indicate the edge from Vertex 1 to Vertex 2 with unit weight 1 Edge(2L, 3L, 1), Edge(2L, 4L, 1), Edge(3L, 4L, 1), Edge(3L, 1L, 1)) // Collections of vertices/edges are parallelized to form a distributed dataset that can be operated on in parallel. val vertexRDD: RDD[(Long, (Int, Int))] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) val graph: Graph[(Int, Int), Int] = Graph(vertexRDD, edgeRDD)
In this example, we build the graph from two RDDs:
- The RDD for vertices has the type
(Long, (Int, Int))
, whereLong
is the type of each vertex's identifier, while(Int, Int)
is the type of each vertex's property (i.e., the new value and the old value) in this example. Vertex property can be viewed as vertex weight that can be any type. - The RDD for edges has the type
Edge[Int]
whereInt
is the type of each edge's property (e.g., edge weight). Actually we do not require any property for edges in this example. We just set1
as the dummy unit weight to all the edges.
Now we are ready to call GraphX's Pregel API on this graph to find the maximum vertex value. The function signature is listed as comments.
val maxGraph = graph.pregel( // def pregel [A] Int.MaxValue, // (initialMsg: A, 100, // maxIter: Int = Int.MaxValue, EdgeDirection.Out)( // activeDir: EdgeDirection = EdgeDirection.Out) vprog, // (vprog: (VertexId, VD, A) => VD, sendMsg, // sendMsg: EdgeTriplet[VD,ED] => Iterator[(VertexId, A)], mergeMsg) // mergeMsg: (A, A) => A) // : Graph[VD,ED]
Note that there are two argument lists for pregel
function.
The first argument list contains three configuration parameters:
initialMsg
is the message that all vertices will receive at the start of superstep 0. Here we useInt.MaxValue
just for the purpose of identifying superstep 0 (as we shall see later).maxIter
indicates the maximum number of iterations (i.e., supersteps). Here we set it as 100.activeDir
refers to the edge direction in which to send message. Take above graph with four vertices as an example, assume Vertex 2 becomes active and wants to send a message to its neighbours. IfEdgeDirection.Out
is set, the message is sent to Vertex 3 and Vertex 4 (i.e., along outgoing direction).EdgeDirection.In
is set, the message is sent to Vertex 1 (i.e., along ingoing direction).EdgeDirection.Either
is set, the message is sent to all the other three vertices.EdgeDirection.Both
is set, the message is sent to Vertex 1/Vertex 3/Vertex 4 only when Vertex 1/Vertex 3/Vertex 4 is also active.
EdgeDirection.Out
.
The second argument list contains the functions that conduct the GAS process during a superstep:
// Gather def mergeMsg(msg1: Int, msg2: Int): Int = msg1 max msg2 // Apply def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = { if (message == Int.MaxValue) // superstep 0 value else // superstep > 0 (message max value._1, value._1) // concise form for "oldValue = newValue, newValue = max(newValue, msgValue), return (newValue, oldValue)" } // Scatter def sendMsg(triplet: EdgeTriplet[(Int, Int), Int]): Iterator[(VertexId, Int)] = { val sourceVertex = triplet.srcAttr if (sourceVertex._1 == sourceVertex._2) // newValue == oldValue for source vertex? Iterator.empty // do nothing else Iterator((triplet.dstId, sourceVertex._1)) // propogate new (updated) value to the destination vertex }
mergeMsg
merges multiple messages reaching the same vertex at the start of a superstep. In this example, it simply computes the maximum value of all received messages. [Gather]vprog
handles the behaviour of a vertex after receiving the merged message. In this example, it updates the new vertex value and the old vertex value accordingly. [Apply]sendMsg
determines the message to send out for the next iteration and where to send it to. [Scatter]
Notice the typeEdgeTriplet[VD, ED]
in its signature. Triplets differ from vertices and edges in that:
An edge triplet has five properties:srcId
: The source vertex id.srcAttr
: The source vertex property.dstId
: The destination vertex id.dstAttr
: The destination vertex property.attr
: The edge property.
sendMsg
determines whether to propogate the source vertex value by checking the equivalence of its new value and old value (cf. Vertex-centric Thinking Section).
Finally, to check the correctness of the code, we have:
for ( (vertexId, (new_value, old_value)) <- maxGraph.vertices.collect) { println(new_value) }
Run the Program
Follow the same procedures as in Spark-lab,
we export the project as JAR file (say GraphXExample.jar
), and execute the spark program:
bigdata@bigdata-VirtualBox:~/Programs/spark$ ./bin/spark-submit --class "example.MaxVertexValue" --master spark://localhost:7077 /home/bigdata/GraphXExample.jar
The result should be:

Exercise
Create the following undirected graph and use GraphX's Pregel API to find the shortest path distance from vertex 4 to all the other vertices.
Reference Implementation of inline mergeMsg/vprog/sendMsg functions using Dijkstra's algorithm:
val sssp = initialGraph.pregel(Double.PositiveInfinity)( |