#include "ol/pregel-ol-dev.h" #include "invidx.h" //ToDo: aggregator force_terminates if no active not_all_1 vertex is found //SLCA algorithm implementation on pregel+ system //Accept online query and return the smallest condition-satisfying sub-tree of the XML structure //Using index loading component to maintain inverted index data structure for quick access to vertex's location //Non-query value definition: all will be initialized when reading file string input_path = "/uwm_level"; string output_path = "/ol_out"; struct MAXMATCHNqvalue { string value; hash_set content; int level; VertexID parent; vector children; }; ibinstream & operator<<(ibinstream & m, const MAXMATCHNqvalue & v) { m << v.value; m << v.content; m << v.level; m << v.parent; m << v.children; return m; } obinstream & operator>>(obinstream & m, MAXMATCHNqvalue & v) { m >> v.value; m >> v.content; m >> v.level; m >> v.parent; m >> v.children; return m; } struct MAXMATCHQvalue { char bitmap; bool is_result; //or has_all_one bool is_max_match; hash_map childbit; MAXMATCHQvalue() { bitmap = 0; is_result = false; is_max_match = false; } }; ibinstream & operator<<(ibinstream & m, const MAXMATCHQvalue & v) { m << v.bitmap; m << v.is_result; m << v.childbit; //no need for is_max_match, always false return m; } obinstream & operator>>(obinstream & m, MAXMATCHQvalue & v) { m >> v.bitmap; m >> v.is_result; m >> v.childbit; //no need for is_max_match, always false return m; } struct MAXMATCHAggfield { int level; int phase; }; ibinstream & operator<<(ibinstream & m, const MAXMATCHAggfield & a) { m << a.level; m << a.phase; return m; } obinstream & operator>>(obinstream & m, MAXMATCHAggfield & a) { m >> a.level; m >> a.phase; return m; } struct MAXMATCHMessage { int id; char bitmap; bool is_all_one; MAXMATCHMessage(){ id = 0; bitmap = 0; is_all_one = false; } }; ibinstream & operator<<(ibinstream & m, const MAXMATCHMessage & ms) { m << ms.id; m << ms.bitmap; m << ms.is_all_one; return m; } obinstream & operator>>(obinstream & m, MAXMATCHMessage & ms) { m >> ms.id; m >> ms.bitmap; m >> ms.is_all_one; return m; } class MAXMATCHVertex: public VertexOL > { public: int num_of_keywords() { return get_query().size(); } virtual MAXMATCHQvalue init_value(vector& qkeys) //Initialize vextex's qvalue refering to query { MAXMATCHQvalue qval; int num = qkeys.size(); for (int i = 0; i < num; i++) { hash_set::iterator it = nqvalue().content.find(qkeys[i]); bool is_in = (it != nqvalue().content.end()); //query key in this vertex's content if(is_in) setbit(qval.bitmap, i); } if (isAllOne(qval.bitmap, num)) qval.is_result = true; return qval; } virtual void compute(MessageContainer& messages) { if (superstep() == 1) { //do nothing, let agg compute the max level of the graph return; } else if (superstep() == 2) { if (nqvalue().level == ((MAXMATCHAggfield*) get_agg())->level) { if (nqvalue().parent != -1) //not root { MAXMATCHMessage msg; msg.id = id; msg.bitmap = qvalue().bitmap; msg.is_all_one = qvalue().is_result; send_message(nqvalue().parent, msg); } if(!qvalue().is_result){//!!! keep matched node alive for phase 2 vote_to_halt(); } } } else { MAXMATCHAggfield* agg = (MAXMATCHAggfield *)get_agg(); if (agg->phase == 1) { if (nqvalue().level == agg->level) { bool has_all_one = false; for (int i = 0; i < messages.size(); i++) { qvalue().bitmap |= messages[i].bitmap; //record child bitmap to "childbit" qvalue().childbit[messages[i].id] = messages[i].bitmap; if (messages[i].is_all_one) //msg.has_all_one { has_all_one = true; qvalue().is_result = false;//to avoid the case where self-match and child-also-match //break; //cannot have more one } } int num = num_of_keywords(); if (isAllOne(qvalue().bitmap, num) == true && (!has_all_one)){ //only check once qvalue().is_result = true; } if (nqvalue().parent != -1){ //not root MAXMATCHMessage msg; msg.id = id; msg.bitmap = qvalue().bitmap; if (isAllOne(msg.bitmap, num)) msg.is_all_one = true; //msg.has_all_one //cannot send qvalue() directly, since if a child is all one, is_result is false, but should send true send_message(nqvalue().parent, msg); } if(!qvalue().is_result){//!!! keep matched node alive for phase 2 vote_to_halt(); } } } else { //phase 2 qvalue().is_max_match = true; vector children = nqvalue().children; //this marks if the bit is dominated by others vector comparison(children.size(), false);//true = dominated for(int i = 0; i < children.size(); i ++){ char bit_me = qvalue().childbit[children[i]]; for(int j = 0; j < children.size(); j ++){ if(j != i) { if(comparison[j]) continue; char bit_other = qvalue().childbit[children[j]]; if(dominates(bit_other, bit_me)){ comparison[i] = true; break; } } } } for(int i = 0; i < children.size(); i ++){ MAXMATCHMessage msg; if(!comparison[i]) send_message(children[i], msg); } vote_to_halt(); } } } }; class MAXMATCHAggregator: public Aggregator { //Only compute max level in the first superstep public: MAXMATCHAggfield field; public: virtual void init() { if (MAXMATCHVertex::superstep() == 1) { field.level = -2; field.phase = 1; } else { if (((MAXMATCHAggfield *)MAXMATCHVertex::get_agg())->phase == 1) { int max_level = ((MAXMATCHAggfield *)MAXMATCHVertex::get_agg())->level; field.level = max_level - 1; } else { field.phase = 2; field.level = -2; } } } // The problem is how to know th e virtual void stepPartial(MAXMATCHVertex* v) { if (MAXMATCHVertex::superstep() == 1) { if (v->nqvalue().level > field.level) field.level = v->nqvalue().level; } } virtual void stepFinal(MAXMATCHAggfield* part) { if (MAXMATCHVertex::superstep() == 1) { if (part->level > field.level) field.level = part->level; } } virtual MAXMATCHAggfield* finishPartial() { return &field; } virtual MAXMATCHAggfield* finishFinal() { if(field.level == -1) { field.phase = 2; field.level = -2; } return &field; } }; //Debug process : worker works well class MAXMATCHWorker: public WorkerOL_auto { public: char buf[100000]; MAXMATCHWorker() : WorkerOL_auto(true, false, true) { } vector parser; //Step 6.1: UDF: line -> vertex virtual MAXMATCHVertex* toVertex(char* line) { //vid\t start end parent num_of_children child1 child2 ... char * pch; pch = strtok(line, "\t"); MAXMATCHVertex* v = new MAXMATCHVertex; v->id = atoi(pch); //vertex's id pch = strtok(NULL, ">"); string& str = v->nqvalue().value = pch + 1; //vertex's content(string) parser.clear(); filter(str.c_str(), parser); for (int i = 0; i < parser.size(); i++) v->nqvalue().content.insert(parser[i]); pch = strtok(NULL, " "); v->nqvalue().level = atoi(pch); //vertex's level pch = strtok(NULL, " "); pch = strtok(NULL, " "); pch = strtok(NULL, " "); v->nqvalue().parent = atoi(pch); pch = strtok(NULL, " "); int num_of_children = atoi(pch); for (int i = 0; i < num_of_children; i++) //vertex's children { pch = strtok(NULL, " "); v->nqvalue().children.push_back(atoi(pch)); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual vector toQuery(char* line) { vector result; filter(line, result); return result; } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { vector qkeys = get_query(); //qkeys contains the key-strings of the query for (int i = 0; i < qkeys.size(); i++) { vector& vposlist = idx().vpos_list(qkeys[i]); //vposlist contains all vertices' location, if its content contains qkeys in the query for (int j = 0; j < vposlist.size(); j++) activate(vposlist[j]); //activate those vertices } } virtual void load2Idx(MAXMATCHVertex* v, int position, InvIdx& idx) { hash_set::iterator it; hash_set& content = v->nqvalue().content; for(it = content.begin(); it != content.end(); it++) idx.insert(*it, position); } virtual void idx_init() { load_idx_from_vertexes(); } virtual void dump(MAXMATCHVertex* vertex, BufferedWriter& writer) { if(vertex->qvalue().is_max_match) { sprintf(buf,"%d\t%d %s\n", vertex->id, vertex->nqvalue().parent, vertex->nqvalue().value.c_str());//output format: vid \t parent node_content writer.write(buf); } } }; int main(int argc, char* argv[]) { MAXMATCHWorker worker; WorkerParams param; param.input_path = input_path; param.output_path = output_path; param.force_write = true; param.native_dispatcher = false; //------ worker.run(param); return 0; }