#include "ol/pregel-ol-dev.h" #include "invidx.h" //ToDo: aggregator force_terminates if no active not_all_1 vertex is found //SLCA algorithm implementation on pregel+ system //Accept online query and return the smallest condition-satisfying sub-tree of the XML structure //Using index loading component to maintain inverted index data structure for quick access to vertex's location //Non-query value definition: all will be initialized when reading file struct SLCANqvalue{ hash_set content; int level; int start; int end; VertexID parent; vector children; }; ibinstream & operator<<(ibinstream & m, const SLCANqvalue & v) { m<>(obinstream & m, SLCANqvalue & v) { m>>v.content; m>>v.level; m>>v.start; m>>v.end; m>>v.parent; m>>v.children; return m; } struct SCLAQvalue{ char bitmap; bool is_result; //or has_all_one SCLAQvalue(){ bitmap = 0; is_result = false; } }; ibinstream & operator<<(ibinstream & m, const SCLAQvalue & v) { m<>(obinstream & m, SCLAQvalue & v) { m>>v.bitmap; m>>v.is_result; return m; } class SLCAVertex:public VertexOL > { public: int num_of_keywords() { return get_query().size(); } virtual SCLAQvalue init_value(vector& qkeys) //Initialize vextex's qvalue refering to query { SCLAQvalue qval; int num = qkeys.size(); for(int i = 0;i < num; i++){ hash_set::iterator it = nqvalue().content.find(qkeys[i]); bool is_in = (it != nqvalue().content.end());//query key in this vertex's content if(is_in) setbit(qval.bitmap, i); } if(isAllOne(qval.bitmap, num)) qval.is_result = true; return qval; } virtual void compute(MessageContainer& messages) { if(superstep() == 1) { //do nothing, let agg compute the max level of the graph return; } else if(superstep() == 2) { if(nqvalue().level == *(int *)get_agg()){ if(nqvalue().parent != -1) //not root { send_message(nqvalue().parent, qvalue()); } vote_to_halt(); } } else { if(nqvalue().level == *(int *)get_agg()){ bool has_all_one = false; for(int i = 0; i < messages.size();i++) { qvalue().bitmap |= messages[i].bitmap; if(messages[i].is_result)//msg.has_all_one { has_all_one = true; qvalue().is_result = false;//to avoid the case where self-match and child-also-match break;//cannot have more one } } int num = num_of_keywords(); if(isAllOne(qvalue().bitmap, num) == true && (!has_all_one))//only check once { qvalue().is_result = true; } if(nqvalue().parent != -1) //not root { SCLAQvalue msg = qvalue(); if(isAllOne(msg.bitmap, num)) msg.is_result = true;//msg.has_all_one //cannot send qvalue() directly, since if a child is all one, is_result is false, but should send true send_message(nqvalue().parent, msg); } vote_to_halt(); } } } }; class SLCAggregator:public Aggregator { //Only compute max level in the first superstep public: int level; public: virtual void init(){ if(SLCAVertex::superstep() == 1) level= -1; else{ int max_level = *(int *)SLCAVertex::get_agg(); level=max_level-1; } } virtual void stepPartial(SLCAVertex* v) { if(SLCAVertex::superstep() == 1){ if(v->nqvalue().level > level) level = v->nqvalue().level; } } virtual void stepFinal(int* part) { if(SLCAVertex::superstep() == 1){ if(*part > level) level = *part; } } virtual int* finishPartial(){ return &level; } virtual int* finishFinal(){return &level; } }; //Debug process : worker works well class SLCAWorker:public WorkerOL_auto { public: char buf[100]; SLCAWorker():WorkerOL_auto(true, false, true){} vector parser; //Step 6.1: UDF: line -> vertex virtual SLCAVertex* toVertex(char* line) { //vid\t start end parent num_of_children child1 child2 ... char * pch; pch=strtok(line, "\t"); SLCAVertex* v=new SLCAVertex; v->id=atoi(pch); //vertex's id pch=strtok(NULL, ">"); string str = pch+1; //vertex's content(string) parser.clear(); filter(str.c_str(), parser); for(int i = 0;i < parser.size();i++) v->nqvalue().content.insert(parser[i]); pch = strtok(NULL," "); v->nqvalue().level = atoi(pch); //vertex's level pch = strtok(NULL," "); v->nqvalue().start = atoi(pch); pch = strtok(NULL," "); v->nqvalue().end = atoi(pch); pch = strtok(NULL," "); v->nqvalue().parent = atoi(pch); pch = strtok(NULL," "); int num_of_children = atoi(pch); for(int i=0; inqvalue().children.push_back(atoi(pch)); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual vector toQuery(char* line) { vector result; filter(line, result); return result; } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { vector qkeys = get_query();//qkeys contains the key-strings of the query for(int i=0; i& vposlist=idx().vpos_list(qkeys[i]);//vposlist contains all vertices' location, if its content contains qkeys in the query for(int j=0; j::iterator it; hash_set& content = v->nqvalue().content; for(it = content.begin(); it != content.end(); it++) idx.insert(*it, position); } virtual void idx_init(){ load_idx_from_vertexes(); } virtual void dump(SLCAVertex* vertex, BufferedWriter& writer) { if(vertex->qvalue().is_result == true){ sprintf(buf,"%d\t%d %d\n", vertex->id, vertex->nqvalue().start, vertex->nqvalue().end); writer.write(buf); } } }; class SLCACombiner:public Combiner { public: virtual void combine(SCLAQvalue & old, const SCLAQvalue& new_msg) { old.bitmap |= new_msg.bitmap; if(new_msg.is_result) old.is_result = true; } }; int main(int argc, char* argv[]) { SLCAWorker worker; /* if(argc != 3) { if(_my_rank==MASTER_RANK) cout<<"Usage: ./run inputpath outputpath"; return 0; } */ WorkerParams param; param.input_path="/uwm_level"; param.output_path="/ol_out"; param.force_write=true; param.native_dispatcher=false; //------ SLCACombiner combiner; bool use_combiner = true; if(use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }