Writing Pregel+ Application Programs

 

This webpage details the concrete APIs that users are interested in when writing application codes. We assume that users are already familiar with the basic concepts of the system; if not, we recommend you to read the system architecture overview first.

 

How to Run a Pregel Job?

Pregel+ supports three modes, which corresponds to different worker and vertex classes. We summarize them below:

Basic Mode: include "basic/pregel-dev.h" and subclass the "Vertex" and "Worker" classes.

Mirroring Mode: include "ghost/ghost-dev.h" and subclass the "GVertex" and "GWorker" classes.

Req-Resp Mode: include "reqresp/req-dev.h" and subclass the "RVertex" and "RWorker" classes.

All worker classes have a run() function, which is called by users to start the specified job:

void run(const WorkerParams & params)

Here, the object of type WorkerParams is defined in utils/global.h with the following fields:

string input_path;    //HDFS data path of the input graph

string output_path;    //output path on HDFS

bool force_write;    //whether to overwrite to output path if it exists

bool native_dispatcher;    //whether the input graph is written by another Pregel+ job    //if it's true, loading can be optimized; though it always works if native_dispatcher is false

Two other functions defined in utils/global.h are also important for job running:

init_workers()

worker_finalize()

The first function should be called at the beginning of your main() function so that each process (worker) gets its worker ID; while the second function should be called at the end of main() to indicate the termination of the MPI program.

In between them, users should specify parameters like the input/output path in param properly, and call run(param) of your worker to start the job.

 

Global API

The following variables/functions are defined in utils/global.h, and can be used everywhere in your program.

int _my_rank;    //the worker ID of the current process, set by init_workers()

int _num_workers;    //the total number of workers, set by init_workers()

int get_worker_id()    //get _my_rank

int get_num_workers()    //get _num_workers

worker_barrier()    //wait till all workers reach this line of code

step_num()    //get the current superstep number (starting from 1)

set_combiner()    //the input is an object of your Combiner subclass

set_aggregator()    //the input is an object of your Aggregator subclass

void* getAgg()    //get the aggregated value from the previous superstep, should be cast by (FinalT *)

int get_vnum()    //get the total vertex number

int active_vnum()    //get the total number of active vertices

wakeAll()    //setting all vertices to be active for the next superstep

forceTerminate()    //once called, the job terminates after the current superstep ends

Among them, _my_rank is usually used for debugging. For example, if you want to print some information to the screen, and you just want the master to print, you may add the following statement in your code:

if(_my_rank == MASTER_RANK)    cout<<your_information<<endl;    //MASTER_RANK is 0

The functions set_combiner() and set_aggregator() are usually called in your main() function before calling run() of your worker. Note, however, that the Aggregator subclass still need to be specified as the second argument of the Worker class that you subclass.

The function step_num() is usually called in Vertex::compute(), Aggregator::stepParital() and Aggregator::stepFinal() to branch for different operations. For example, the operation performed in the first superstep is usually different from the rest since there are no incoming messages. In this case, we can branch in compute() like:

if(step_num() == 1)    {...}

else    {...}

The function getAgg() is usually called in Vertex::compute() and Aggregator::init(). For example, for a cumulative aggregator that aggregates over all supersteps, we set the current aggregated value to be the return value of getAgg() in Aggregator::init(). However, we usually do not call it in the first superstep since no aggregation is done before. The function is usually called like:

FinalT* agg_val = (FinalT *)getAgg();

//use "agg_val->some_field" later

The function forceTerminate() is usually called in Vertex::compute() and Aggregator::finishFinal() to terminate the job earlier when some end condition is met, such as when the target vertex is met when BFS from the source vertex for computing point-to-point reachability.

 

Vertex API

The Vertex class contains four template arguments:

KeyT: type of vertex ID

ValueT: type of vertex value, such as its state and adjacency list(s)

MessageT: type of the messages that a vertex sends

HashT: the hash function type

Here, KeyT is the type of vertex ID, which is usually integer (e.g. int, long long). One may also use VertexID which is defined as int in utils/global.h. However, KeyT does not have to be integer, and it can be of any user-defined type as long as the following requirements are met.

1. Operator < is overloaded for KeyT. This is used by the system to sort messages in the message buffer by target vertex, before performing message combining. For basic types, the comparators are already defined.

2. (De)serialization function is defined for KeyT.

3. __gnu_cxx::hash<KeyT> is defined with operator () overloaded indicating the hashing logic. When a message targeted at vertex v arrives at the receiving worker, the worker looks up v's incoming message buffer from a hash table to insert the message, and thus hash(KeyT) should be defined. For basic types, the hash functions are already defined by g++.

4. One should define a type HashT with operator () overloaded indicating how to compute the machine ID from the vertex ID. The type HashT is then provided as the fourth template argument of the Vertex class. For integer types, one may not specify the template argument HashT, and if so, DefaultHash<KeyT> defined in basic/Vertex.h will be used.

We have defined two other types intpair (with HashT = IntPairHash) and inttriplet (with HashT = IntTripletHash) in utils/type.h, which are be directly used as KeyT for the Vertex class. To define other key types, one may take these two types for reference. In our "Biconnected Component" application code, we actually use these types as vertex key, and one may check the usage by studying it.

The Vertex class has the following variables/functions, which are usually used in Vertex::compute(), Worker::toVertex() and Worker::toline().

KeyT id    //vertex ID field

ValueT& value()    //vertex value field

bool is_active()

void activate()

void vote_to_halt()

void send_message(const KeyT & tgt, const MessageT & msg)

void add_vertex(VertexT * v)    //one may call "new VertexT" to create a new vertex to add, it will be moved to the right worker in the next superstep

For simplicity, we do not implement delete_vertex(v), since physical deletion of v can usually be replaced by voting v to halt and never activating it again (e.g. never sending messages to v). Edge addition/deletion can be directly done by manipulating the adjacency list field of value().

 

Worker API

The Worker class contains two template arguments:

VertexT: the user-defined Vertex subclass

AggregatorT: the user-defined Aggregator subclass

One may not specify AggregatorT if aggregator is not used, in which case the default DummyAgg defined by utils/Aggregator.h is specified.

As we discussed in system architecture overview, the Worker class has the following two vertex loading/dumping functions for users to specify:

VertexT* toVertex(char* line)

void toline(VertexT* v, BufferedWriter& writer)

In toVertex(.), one may call "new VertexT" to create a vertex object, set its fields properly according to the line representing it, and return the vertex (to the system). To parse the line, one may use C++ stringstream which requires copying the line string to the internal buffer of the stringstream object. A more efficient method is to use the strtok(.) or strtok_r(.) functions. See any of our application code for the detailed usage.

In toline(v, writer), one have access to a writer of type BufferedWriter, which is defined in utils/ydhdfs.h. The writer has a function BufferedWriter::write(char * str), which is called to write a string content to the output path on HDFS. One may construct the string content representing vertex v and call writer.write(.) to output it (possibly appending "\n"). See any of our application code for the detailed usage.

 

GVertex API

For the mirroring mode, we only discuss the API of GVertex. The API of GWorker is similar to that of Worker (though run() is implemented differently) and is thus omitted.

Unlike the Vertex class, GVertex isolates the adjacency list from the vertex value field, and requires that the adjacency list be of type vector<EdgeT> and stores all out-neighbors. Accordingly, GVertex has the following template arguments:

KeyT: type of vertex ID

ValueT: type of vertex value, now without adjacency list

MessageT: type of the messages that a vertex sends

EdgeT: the type of edge value; accordingly, the adjacency list is of type vector<EdgeT>

HashT: the hash function type

If we do not need any value field for each edge (other than the neighbor vertex ID), we do not need to specify EdgeT. In this case, the default edge type DefaultGEdge<KeyT, MessageT> (defined in ghost/GVertex.h) is used, which only contains one field id indicating the neighbor ID. On the contrary, if each edge has a field (e.g. edge weight) of type EValT, one should specify EdgeT as:

GEdge<KeyT, MessageT, EValT>

The GEdge is also defined in ghost/GVertex.h, and has two fields: (1) id of type KeyT indicating the neighbor ID, and (2) eval of type EValT indicating the edge value.

Both DefaultGEdge and GEdge has a function that performs nothing:

void relay(MessageT& msg)

This function can be overloaded with other logic by subclassing DefaultGEdge and GEdge, and it indicates how to post-process the message msg sent along the current edge. We will return to relay() see its usage soon.

The GVertex class has similar variables/functions as the Vertex class, with slight differences. Firstly, send_message(tgt, msg) is no longer supported. Instead, users can call the following function in GVertex::compute():

void broadcast(const MessageT & msg)

Usually, msg is a vertex state maintained by GVertex::value(). However, sometimes the message value also depends on the value of the edge that it is sent along. For example, when computing single-source shortest paths, each vertex maintains a field dist indicating its current distance estimation to the source s. If a vertex u's distance estimation gets updated by the incoming messages, it should notify all its neighbors about the state change by calling:

broadcast(value().dist)

However, the receiver v should not just receive u's distance estimation to s, but rather to obtain the distance estimation from v to s (through u). In this case, we need to add the message value by the length of edge (u, v), and this is what EdgeT::relay(msg) does. See our application code [Mirroring Mode] -> [SSSP] for the detailed usage.

For simplicity, we do not allow add_vertex(v) in the mirroring mode.

Finally, since one may need to initialize adjacency list in GWorker::toVertex(line), the GVertex class has one additional function to return the adjacency list:

EdgeContainer& neighbors()    //GVertex::EdgeContainer is defined as vector<EdgeT>

 

RVertex API

For the request-respond mode, we only discuss the additional API of RVertex besides that of Vertex. The API of RWorker is similar to that of Worker (though run() is implemented differently) and is thus omitted.

RVertex<KeyT, ValueT, MessageT, RespondT, HashT> contains a template argument RespondT indicating the type of the response. In one superstep, a vertex can call the following functions in order to make responses ready for use in the next superstep:

void request(KeyT & tgt)    //data pulling: if u calls request(v), the response from v will be available to u in the next superstep

void exp_respond(KeyT & tgt)    //data pushing (explicit responding): if v calls exp_respond(u), the response from v will be available to u in the next superstep

If the above functions are called in the previous superstep, a vertex can call the following functions to get the response:

RespondT get_respond(KeyT& tgt)    //to obtain tgt's response; this function is unsafe and users should guarantee that the response exist

RespondT* get_respond_safe(KeyT& tgt)    //the safe version: if there is no response from tgt, it returns NULL; otherwise, it returns the address of the received response

Currently, we have not discussed how to specify what a vertex should respond. The response is usually a state of the requested/responding vertex, users specify it by implementing the following abstract function in their RVertex subclass

RespondT respond()

Similar to compute(), one may implement respond() to return different fields of a vertex in different supersteps.