Writing Quegel Server Application Programs

 

This webpage details the concrete APIs that users are interested in when writing the server-side application code. We assume that users are already familiar with the basic concepts of the system; if not, we recommend you to read the system architecture overview first.

 

How to Run a Quegel Server Program?

To write a Quegel server program, a user needs to include ol/pregel-ol-dev.h and subclass the VertexOL and WorkerOL_auto (or WorkerOL if batch querying is not used) classes.

The WorkerOL_auto (and WorkerOL) class has a run() function, which is called by users to start the server program:

void run(const WorkerParams & params)

Here, the object of type WorkerParams is defined in utils/global.h with the following fields of interest:

string input_path;    //HDFS data path of the input graph

string output_path;    //output path on HDFS

bool force_write;    //whether to overwrite to output path if it exists

You should specify parameters like the input/output path in param properly, and call run(param) of your worker to start the job.

 

Global API

The following variables/functions are defined in utils/global.h, and can be used everywhere in your program.

int _my_rank;    //the worker ID of the current process, set by init_workers()

int _num_workers;    //the total number of workers, set by init_workers()

int get_worker_id()    //get _my_rank

int get_num_workers()    //get _num_workers

worker_barrier()    //wait till all workers reach this line of code

int get_vnum()    //get the total vertex number

Among them, _my_rank is usually used for debugging. For example, if you want to print some information to the screen, and you just want the master to print, you may add the following statement in your code:

if(_my_rank == MASTER_RANK)    cout<<your_information<<endl;    //MASTER_RANK is 0

Note that some functions in utils/global.h are not used by Quegel (but Pregel+ uses them) and thus not listed, such as step_num() and set_aggregator(). This is because concepts like superstep number and aggregator are now query-dependent rather than globally unique, and their functionality are covered by replacement functions to be presented shortly, which obtains required data according to the query context.

In fact, Quegel uses some new global variables/functions to manage queries (in addition to those used by Pregel+ and Blogel), which are defined in ol/global_ol.h. However, these variables/functions are only used by the system and tranparent to application developers. For example, global_vertexes tracks the array of vertices in the current worker, active_queries tracks the list of queries being processed, global_query_id tracks the ID of the query that the current worker is processing at the moment, and global_query tracks the information of the query (of the user-transparent type ol/Task) that the current worker is processing at the moment.

 

Vertex API

The VertexOL class contains four template arguments:

KeyT: type of vertex ID

QValueT: type of query-dependent vertex value, such as the current vertex state

NQValueT: type of query-independent vertex value, such as adjacency list(s)

MessageT: type of the messages that a vertex sends

QueryT: type of query content (e.g., source and target vertices in a shortest path query)

HashT: the hash function type

Here, KeyT is the type of vertex ID, which is usually integer (e.g. int, long long). One may also use VertexID which is defined as int in utils/global.h. However, KeyT does not have to be integer, and it can be of any user-defined type as long as the following requirements are met.

1. Operator < is overloaded for KeyT. This is used by the system to sort messages in the message buffer by target vertex, before performing message combining. For basic types, the comparators are already defined.

2. (De)serialization function is defined for KeyT.

3. __gnu_cxx::hash<KeyT> is defined with operator () overloaded indicating the hashing logic. When a message targeted at vertex v arrives at the receiving worker, the worker looks up v's incoming message buffer from a hash table to insert the message, and thus hash(KeyT) should be defined. For basic types, the hash functions are already defined by g++.

4. One should define a type HashT with operator () overloaded indicating how to compute the machine ID from the vertex ID. The type HashT is then provided as the fourth template argument of the Vertex class. For integer types, one may not specify the template argument HashT, and if so, DefaultHash<KeyT> defined in basic/Vertex.h will be used.

We have defined additional types in utils/type.h that can be directly used as KeyT, such as intpair (with HashT = IntPairHash), inttriplet (with HashT = IntTripletHash) and string (with HashT = StringHash). To define other key types, one may take these example types for reference.

The VertexOL class has the following variables/functions, which are can be used in the UDFs of VertexOL and WorkerOL_auto (or WorkerOL) decribed in system architecture overview.

KeyT id    //vertex ID field

NQValueT& nqvalue(), or _value    //get query-independent vertex value field

QValueT& qvalue()    //get query-dependent vertex value of the current query

QueryT get_query()    //get the content of the current query

bool is_active()    //for the current query

void activate()    //for the current query

void vote_to_halt()    //for the current query

int superstep()    //get superstep of the current query

void* get_agg()    //get aggregator value of the current query, need type conversion

void send_message(const KeyT & tgt, const MessageT & msg)

void forceTerminate()    ////once called, the current query terminates after the current superstep ends

Note that the return value of get_agg() needs to be converted to FinalT * for use, where FinalT is a template argument of the Aggregator class (see system architecture overview for details).

 

Worker API

The WorkerOL_auto (or WorkerOL) class contains three template arguments:

VertexOLT: the user-defined VertexOL subclass

AggregatorT (optional): the user-defined Aggregator subclass

IndexT (optional): the user-defined index class

One may not specify AggregatorT if aggregator is not used, in which case the default DummyAgg defined by utils/Aggregator.h is specified. If local indexing is not enabled, one may not specify IndexT. See the XML Keyword Search application here for an example of how to use indexing.

We have discussed in system architecture overview about the UDFs of WorkerOL_auto (or WorkerOL). In toVertex(.), one may call new VertexOLT to create a vertex object, set its fields properly according to the line representing it, and return the vertex (to the system). To parse the line, one may use C++ stringstream which requires copying the line string to the internal buffer of the stringstream object. A more efficient method is to use the strtok(.) or strtok_r(.) functions. See any of our application code for the detailed usage. In dump(v, writer) and save(v, writer), one have access to a writer of type BufferedWriter, which is defined in utils/ydhdfs.h. The writer has a function BufferedWriter::write(char * str), which is called to write a string content to the output path on HDFS. One may construct the string content representing vertex v and call writer.write(.) to output it (possibly appending "\n"). See any of our application code for the detailed usage.

The Worker class has a few more functions that users need to know in order to write the server-side program. The first function is the constructor:

WorkerOL_auto(bool agg_used = false, bool save_at_last = false, bool idx_used = false)

Here, input argument agg_used indicates whether aggregator is enabled; if aggregator is enabled, each query will automatically be associated with an aggregator. Input argument save_at_last indicates whether the graph data will be saved to HDFS when the server program exits (i.e., whether UDF save(v, writer) will be called). If enabled, WorkerOL_auto::set_file2save(path) should be called to set the output path on HDFS. Input argument idx_used indicates whether a local index should be constructed on each machine before query processing starts. If enabled, each worker maintains a local index at WorkerOL_auto::index.

Other important functions are listed below, which are self-explanatory:

setCombiner(cb)    //the input is an object of your Combiner subclass

FinalT* get_agg()    //get the aggregated value from the previous superstep

load_idx_from_file(idxpath)    //called in UDF idx_init() to load an index previously saved to HDFS

load_idx_from_file()    //called in UDF idx_init() to build an index from vertices in local machine

QueryT get_query()    //get the query content of the current query

int get_vpos(vertexID)    //get the vertex position in the current worker's vertex array

void activate(position)    //activate the vertex at the specified position in the current worker's vertex array

set_file2save(path)    //set the HDFS output path for saving the graph data when the server program exits