Writing Blogel Application Programs

 

This webpage details the concrete APIs that users are interested in when writing application codes. We assume that users are already familiar with the basic concepts of the system; if not, we recommend you to read the system architecture overview first.

 

How to Run a Blogel Job?

Blogel supports both vertex-centric computing, and block-centric computing. The latter further supports three modes, B-mode, V-mode and VB-mode. We summarize them below:

Vertex-Centric Computing: include "basic/pregel-dev.h" and subclass the "Vertex" and "Worker" classes.

Block-Centric Computing: include the header files in the "blogel" folder as needed, and subclass the "BVertex", "Block" and "BWorker" classes.

All worker/partitioner classes have a run() function, which is called by users to start the specified job:

void run(const WorkerParams & params)

Here, the object of type WorkerParams is defined in utils/global.h with the following fields of interest:

string input_path;    //HDFS data path of the input graph

string output_path;    //output path on HDFS

bool force_write;    //whether to overwrite to output path if it exists

Two other functions defined in utils/global.h are also important for job running:

init_workers()

worker_finalize()

The first function should be called at the beginning of your main() function so that each process (worker) gets its worker ID; while the second function should be called at the end of main() to indicate the termination of the MPI program.

In between them, users should specify parameters like the input/output path in param properly, and call run(param) of your worker to start the job.

 

Global API

The following variables/functions are defined in utils/global.h, and can be used everywhere in your program.

int _my_rank;    //the worker ID of the current process, set by init_workers()

int _num_workers;    //the total number of workers, set by init_workers()

int get_worker_id()    //get _my_rank

int get_num_workers()    //get _num_workers

worker_barrier()    //wait till all workers reach this line of code

step_num()    //get the current superstep number (starting from 1)

set_combiner()    //the input is an object of your vertex-centric Combiner subclass

set_aggregator()    //the input is an object of your Aggregator subclass

void* getAgg()    //get the aggregated value from the previous superstep, should be cast by (FinalT *)

int get_vnum()    //get the total vertex number

int active_vnum()    //get the total number of active vertices

wakeAll()    //setting all vertices to be active for the next superstep

forceTerminate()    //once called, the job terminates after the current superstep ends

Among them, _my_rank is usually used for debugging. For example, if you want to print some information to the screen, and you just want the master to print, you may add the following statement in your code:

if(_my_rank == MASTER_RANK)    cout<<your_information<<endl;    //MASTER_RANK is 0

The functions set_combiner() and set_aggregator() are usually called in your main() function before calling run() of your worker. Note, however, that the Aggregator subclass still need to be specified as the second argument of the Worker class that you subclass.

The function step_num() is usually called in the compute() function of vertices/blocks, and "stepParital" and "stepFinal" functions of aggregators to branch for different operations. For example, the operation performed in the first superstep is usually different from the rest since there are no incoming messages. In this case, we can branch in compute() like:

if(step_num() == 1)    {...}

else    {...}

The function getAgg() is usually called in compute() and aggregator's init(). For example, for a cumulative aggregator that aggregates over all supersteps, we set the current aggregated value to be the return value of getAgg() in init(). However, we usually do not call it in the first superstep since no aggregation is done before. The function is usually called like:

FinalT* agg_val = (FinalT *)getAgg();

//use "agg_val->some_field" later

The function forceTerminate() is usually called in compute() and aggregator's finishFinal() to terminate the job earlier when some end condition is met, such as when a vertex is met in both directions when doing bidirectional BFS from the both source and target vertices for computing point-to-point reachability.

The global variables/functions defined in utils/global.h are for both vertex-centric and block-centric computation. Additionally, if block-centric computing is used, one may include "blogel/BGlobal.h" to use the following global variables/functions:

set_sampRate(double psamp)        //e.g., psamp = 0.001

set_maxHop(int δmax)        //e.g., δmax = 10

set_maxVCSize(int bmax)        //e.g., bmax = 100000

set_factor(double f)        //e.g., f = 2.0

set_stopRatio(double γ)        //e.g., γ = 0.9

set_maxRate(double pmax)        //e.g., pmax = 0.1

int get_bnum()    //get the total block number

set_bcombiner(void* cb)    //the input is an object of your block-centric Combiner subclass

 

Vertex API

The Vertex class contains four template arguments:

KeyT: type of vertex ID

ValueT: type of vertex value, such as its state and adjacency list(s)

MessageT: type of the messages that a vertex sends

HashT: the hash function type

Here, KeyT is the type of vertex ID, which is usually integer (e.g. int, long long). One may also use VertexID which is defined as int in utils/global.h. However, KeyT does not have to be integer, and it can be of any user-defined type as long as the following requirements are met.

1. Operator < is overloaded for KeyT. This is used by the system to sort messages in the message buffer by target vertex, before performing message combining. For basic types, the comparators are already defined.

2. (De)serialization function is defined for KeyT.

3. __gnu_cxx::hash<KeyT> is defined with operator () overloaded indicating the hashing logic. When a message targeted at vertex v arrives at the receiving worker, the worker looks up v's incoming message buffer from a hash table to insert the message, and thus hash(KeyT) should be defined. For basic types, the hash functions are already defined by g++.

4. One should define a type HashT with operator () overloaded indicating how to compute the machine ID from the vertex ID. The type HashT is then provided as the fourth template argument of the Vertex class. For integer types, one may not specify the template argument HashT, and if so, DefaultHash<KeyT> defined in basic/Vertex.h will be used.

We have defined two other types intpair (with HashT = IntPairHash) and inttriplet (with HashT = IntTripletHash) in utils/type.h, which are be directly used as KeyT for the Vertex class. To define other key types, one may take these two types for reference. In our "Biconnected Component" application code, we actually use these types as vertex key, and one may check the usage by studying it.

The Vertex class has the following variables/functions, which are usually used in Vertex::compute(), Worker::toVertex() and Worker::toline().

KeyT id    //vertex ID field

ValueT& value()    //vertex value field

bool is_active()

void activate()

void vote_to_halt()

void send_message(const KeyT & tgt, const MessageT & msg)

void add_vertex(VertexT * v)    //one may call "new VertexT" to create a new vertex to add, it will be moved to the right worker in the next superstep

For simplicity, we do not implement delete_vertex(v), since physical deletion of v can usually be replaced by voting v to halt and never activating it again (e.g. never sending messages to v). Edge addition/deletion can be directly done by manipulating the adjacency list field of value().

 

Worker API

The Worker class contains two template arguments:

VertexT: the user-defined Vertex subclass

AggregatorT: the user-defined Aggregator subclass

One may not specify AggregatorT if aggregator is not used, in which case the default DummyAgg defined by utils/Aggregator.h is specified.

As we discussed in system architecture overview, the Worker class has the following two vertex loading/dumping functions for users to specify:

VertexT* toVertex(char* line)

void toline(VertexT* v, BufferedWriter& writer)

In toVertex(.), one may call "new VertexT" to create a vertex object, set its fields properly according to the line representing it, and return the vertex (to the system). To parse the line, one may use C++ stringstream which requires copying the line string to the internal buffer of the stringstream object. A more efficient method is to use the strtok(.) or strtok_r(.) functions. See any of our application code for the detailed usage.

In toline(v, writer), one have access to a writer of type BufferedWriter, which is defined in utils/ydhdfs.h. The writer has a function BufferedWriter::write(char * str), which is called to write a string content to the output path on HDFS. One may construct the string content representing vertex v and call writer.write(.) to output it (possibly appending "\n"). See any of our application code for the detailed usage.

 

API for Partitioning/Block-Centric Computing

Most of them have been described in [system architecture overview]->[Part II: Graph Partitioning], [Part III: Block-Centric Computation]. Here, we summarize the relevant header files:

BVertex.h        //defines the BVertex class

Block.h        //defines the Block class

BWorker.h        //defines the BWorker class

BGlobal.h        //defines the functions for GVD partitioning, and other global functions for block-centric computing

BAggregator.h        //defines the BAggregator class

BType.h        //defines the triplet class, i.e. <vertexID, blockID, workerID>

Voronoi.h        //defines the classes for GVD partitioning

STRPart.h        //defines the classes for 1-st round of 2D partitioning

STRPartR2.h        //defines the classes for 2-nd round of 2D partitioning