GraphX

Introduction

GraphX is a distributed graph processing framework that was built on top of Spark.

In this lab, we will introduce GraphX's Pregel-style vertex-centric programming model and apply this model to implement some graph algorithms. Background of Scala language is a must.

Example: find out the maximum vertex value among all vertices

Get Ready

  • Download materials for this lab here and unzip it.
  • Import the project into Eclipse and set "java build path" by adding the jar "spark-assembly-1.6.0-hadoop2.6.0.jar" in folder "lib".

Vertex-centric Thinking

Pregel-style vertex-centric computation model consists of a sequence of iterations, called supersteps. Each superstep S follows a GAS (Gather-Apply-Scatter) paradigm:

  • Gather: it receives and reads messages that are sent to v from the previous superstep S-1.
  • Apply: it applies a user-defined function f to each vertices in parallel; that it, f specifies the behaviour of a single vertex v at a particular superstep S.
  • Scatter: it may send messages to other vertices such that those vertices will receive the messages in the next superstep S+1.

In the application of finding out the maximum vertex value among all vertices, vertex weight may contain a pair (newValue, oldValue). During a superstep, each vertex

  • receives all messages sent to it from the previous superstep and compute their maximum value maxMsg. [Gather]
  • assigns original newValue to oldValue and updates newValue with max(maxMsg, newValue). [Apply]
  • propogates its newValue to all its outgoing neighbours only when newValue and oldValue are not equal. [Scatter]

Go Through the Example

Now let's turn to the code. The first step is to create the graph:

val vertexArray = Array(
	(1L, (9, -1)),		// (vertexID, (new vertex value, old vertex value) )
	(2L, (1, -1)),
	(3L, (6, -1)),
	(4L, (8, -1)))

val edgeArray = Array(
	Edge(1L, 2L, 1),	// Indicate the edge from Vertex 1 to Vertex 2 with unit weight 1
	Edge(2L, 3L, 1),
	Edge(2L, 4L, 1),
	Edge(3L, 4L, 1),
	Edge(3L, 1L, 1))

// Collections of vertices/edges are parallelized to form a distributed dataset that can be operated on in parallel.
val vertexRDD: RDD[(Long, (Int, Int))] = sc.parallelize(vertexArray)	
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

val graph: Graph[(Int, Int), Int] = Graph(vertexRDD, edgeRDD)

In this example, we build the graph from two RDDs:

  • The RDD for vertices has the type (Long, (Int, Int)), where Long is the type of each vertex's identifier, while (Int, Int) is the type of each vertex's property (i.e., the new value and the old value) in this example. Vertex property can be viewed as vertex weight that can be any type.
  • The RDD for edges has the type Edge[Int] where Int is the type of each edge's property (e.g., edge weight). Actually we do not require any property for edges in this example. We just set 1 as the dummy unit weight to all the edges.

Now we are ready to call GraphX's Pregel API on this graph to find the maximum vertex value. The function signature is listed as comments.

val maxGraph = graph.pregel(					// def pregel [A]
			Int.MaxValue,				//	(initialMsg: A,
			100,					//	 maxIter: Int = Int.MaxValue,
			EdgeDirection.Out)(			//	 activeDir: EdgeDirection = EdgeDirection.Out)
			vprog,					//	(vprog: (VertexId, VD, A) => VD,
			sendMsg,				//	 sendMsg: EdgeTriplet[VD,ED] => Iterator[(VertexId, A)],
			mergeMsg)				//	 mergeMsg: (A, A) => A)
								//	: Graph[VD,ED]

Note that there are two argument lists for pregel function. The first argument list contains three configuration parameters:

  • initialMsg is the message that all vertices will receive at the start of superstep 0. Here we use Int.MaxValue just for the purpose of identifying superstep 0 (as we shall see later).
  • maxIter indicates the maximum number of iterations (i.e., supersteps). Here we set it as 100.
  • activeDir refers to the edge direction in which to send message. Take above graph with four vertices as an example, assume Vertex 2 becomes active and wants to send a message to its neighbours. If
    • EdgeDirection.Out is set, the message is sent to Vertex 3 and Vertex 4 (i.e., along outgoing direction).
    • EdgeDirection.In is set, the message is sent to Vertex 1 (i.e., along ingoing direction).
    • EdgeDirection.Either is set, the message is sent to all the other three vertices.
    • EdgeDirection.Both is set, the message is sent to Vertex 1/Vertex 3/Vertex 4 only when Vertex 1/Vertex 3/Vertex 4 is also active.
    Here we use EdgeDirection.Out.

The second argument list contains the functions that conduct the GAS process during a superstep:

// Gather
def mergeMsg(msg1: Int, msg2: Int): Int = msg1 max msg2  

// Apply
def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
    if (message == Int.MaxValue)	// superstep 0
      value
    else				// superstep > 0	
      (message max value._1, value._1)	// concise form for "oldValue = newValue, newValue = max(newValue, msgValue), return (newValue, oldValue)"
}

// Scatter
def sendMsg(triplet: EdgeTriplet[(Int, Int), Int]): Iterator[(VertexId, Int)] = {
    val sourceVertex = triplet.srcAttr

    if (sourceVertex._1 == sourceVertex._2)		// newValue == oldValue for source vertex?
      Iterator.empty					// do nothing
    else
      Iterator((triplet.dstId, sourceVertex._1))	// propogate new (updated) value to the destination vertex
}
  • mergeMsg merges multiple messages reaching the same vertex at the start of a superstep. In this example, it simply computes the maximum value of all received messages. [Gather]
  • vprog handles the behaviour of a vertex after receiving the merged message. In this example, it updates the new vertex value and the old vertex value accordingly. [Apply]
  • sendMsg determines the message to send out for the next iteration and where to send it to. [Scatter]
    Notice the type EdgeTriplet[VD, ED] in its signature. Triplets differ from vertices and edges in that:
    An edge triplet has five properties:
    • srcId: The source vertex id.
    • srcAttr: The source vertex property.
    • dstId: The destination vertex id.
    • dstAttr: The destination vertex property.
    • attr: The edge property.
    In this example, sendMsg determines whether to propogate the source vertex value by checking the equivalence of its new value and old value (cf. Vertex-centric Thinking Section).

Finally, to check the correctness of the code, we have:

for ( (vertexId, (new_value, old_value)) <- maxGraph.vertices.collect) {
	println(new_value)
}

Run the Program

Follow the same procedures as in Spark-lab, we export the project as JAR file (say GraphXExample.jar), and execute the spark program:

bigdata@bigdata-VirtualBox:~/Programs/spark$ ./bin/spark-submit --class "example.MaxVertexValue" --master spark://localhost:7077 /home/bigdata/GraphXExample.jar 

The result should be:

Exercise

Create the following undirected graph and use GraphX's Pregel API to find the shortest path distance from vertex 4 to all the other vertices.

Reference Implementation of inline mergeMsg/vprog/sendMsg functions using Dijkstra's algorithm:

val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => { // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
       Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
  )

Reference