#include "ol/pregel-ol-dev.h" #include "invidx.h" //ToDo: aggregator force_terminates if no active not_all_1 vertex is found //ELCA algorithm implementation on pregel+ system //Accept online query and return the exclusive condition-satisfying sub-tree of the XML structure //Using index loading component to maintain inverted index data structure for quick access to vertex's location //Main difference from SCLA, if some vertex is the satisfying result, it will stop broadcasting and directly halt //Non-query value definition: all will be initialized when reading file struct ELCANqvalue{ hash_set content; int level; int start; int end; VertexID parent; vector children; }; ibinstream & operator<<(ibinstream & m, const ELCANqvalue & v) { m<>(obinstream & m, ELCANqvalue & v) { m>>v.content; m>>v.level; m>>v.start; m>>v.end; m>>v.parent; m>>v.children; return m; } struct ELCAQvalue{ char bitmap; bool is_result; //or has_all_one ELCAQvalue(){ bitmap = 0; is_result = false; } }; ibinstream & operator<<(ibinstream & m, const ELCAQvalue & v) { m<>(obinstream & m, ELCAQvalue & v) { m>>v.bitmap; m>>v.is_result; return m; } class ELCAVertex:public VertexOL > { public: int num_of_keywords() { return get_query().size(); } //Initialize vextex's qvalue refering to query virtual ELCAQvalue init_value(vector& qkeys) { ELCAQvalue qval; int num = qkeys.size(); for(int i = 0;i < num; i++){ hash_set::iterator it = nqvalue().content.find(qkeys[i]); bool is_in = (it != nqvalue().content.end());//query key in this vertex's content if(is_in) setbit(qval.bitmap, i); } if(isAllOne(qval.bitmap, num)) qval.is_result = true; return qval; } virtual void compute(MessageContainer& messages) { if(superstep() == 1) { //do nothing, let agg compute the max level of the graph return; } else if(superstep() == 2) { //Difference here from SCLA: if vertex's bitmap is all one,then give up broadcasting to parent if(nqvalue().level == *(int *)get_agg()){ if(nqvalue().parent != -1 && !isAllOne(qvalue().bitmap, num_of_keywords())) //not root and its bitmap is all-one { send_message(nqvalue().parent, qvalue()); } vote_to_halt(); } } else { if(nqvalue().level == *(int *)get_agg()){ for(int i = 0; i < messages.size();i++) { qvalue().bitmap |= messages[i].bitmap; } //Will not receive all-one message,so if its bitmap now is all-one, it should be the result we want if(isAllOne(qvalue().bitmap,num_of_keywords())) qvalue().is_result = true; if(nqvalue().parent != -1 && qvalue().is_result == false) //not root and vertex is not result { send_message(nqvalue().parent, qvalue()); } vote_to_halt(); } } } }; class ELCAggregator:public Aggregator { //Only compute max level in the first superstep public: int level; public: virtual void init(){ if(ELCAVertex::superstep() == 1) level= -1; else{ int max_level = *(int *)ELCAVertex::get_agg(); level=max_level-1; } } virtual void stepPartial(ELCAVertex* v) { if(ELCAVertex::superstep() == 1){ if(v->nqvalue().level > level) level = v->nqvalue().level; } } virtual void stepFinal(int* part) { if(ELCAVertex::superstep() == 1){ if(*part > level) level = *part; } } virtual int* finishPartial(){ return &level; } virtual int* finishFinal(){return &level; } }; //Debug process : worker works well class ELCAWorker:public WorkerOL_auto { public: char buf[100]; ELCAWorker():WorkerOL_auto(true, false, true){} vector parser; //Step 6.1: UDF: line -> vertex virtual ELCAVertex* toVertex(char* line) { //vid\t start end parent num_of_children child1 child2 ... char * pch; pch=strtok(line, "\t"); ELCAVertex* v=new ELCAVertex; v->id=atoi(pch); //vertex's id pch=strtok(NULL, ">"); string str = pch+1; //vertex's content(string) parser.clear(); filter(str.c_str(), parser); for(int i = 0;i < parser.size();i++) v->nqvalue().content.insert(parser[i]); pch = strtok(NULL," "); v->nqvalue().level = atoi(pch); //vertex's level pch = strtok(NULL," "); v->nqvalue().start = atoi(pch); pch = strtok(NULL," "); v->nqvalue().end = atoi(pch); pch = strtok(NULL," "); v->nqvalue().parent = atoi(pch); pch = strtok(NULL," "); int num_of_children = atoi(pch); for(int i=0; inqvalue().children.push_back(atoi(pch)); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual vector toQuery(char* line) { vector result; filter(line, result); return result; } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { vector qkeys = get_query();//qkeys contains the key-strings of the query for(int i=0; i& vposlist=idx().vpos_list(qkeys[i]);//vposlist contains all vertices' location, if its content contains qkeys in the query for(int j=0; j::iterator it; hash_set& content = v->nqvalue().content; for(it = content.begin(); it != content.end(); it++) idx.insert(*it, position); } virtual void idx_init(){ load_idx_from_vertexes(); } virtual void dump(ELCAVertex* vertex, BufferedWriter& writer) { if(vertex->qvalue().is_result == true){ sprintf(buf,"%d\t%d %d\n", vertex->id, vertex->nqvalue().start, vertex->nqvalue().end); writer.write(buf); } } }; class ELCACombiner:public Combiner { public: virtual void combine(ELCAQvalue & old, const ELCAQvalue& new_msg) { old.bitmap |= new_msg.bitmap; if(new_msg.is_result) old.is_result = true; } }; /* int main(int argc, char* argv[]) { if(argc != 3) { init_workers(); if(_my_rank==MASTER_RANK) cout<<"Usage: ./run inputpath outputpath"<