#include "ol/pregel-ol-dev.h" #include "invidx.h" string in_path = "/uwm"; string out_path = "/ol_out"; bool use_combiner = true; //!!! input has no level(v) //input line format: id\t sta_pos end_pos parentID num_child child1 child2 .. //output line format: id1 id2... //step1:define static field of vertex struct SLCANqvalue { hash_set content; int start; int end; VertexID parent; vector children; }; ibinstream & operator<<(ibinstream &m, const SLCANqvalue &v) { m << v.content; m << v.start; m << v.end; m << v.parent; m << v.children; return m; } obinstream & operator>>(obinstream &m, SLCANqvalue &v) { m >> v.content; m >> v.start; m >> v.end; m >> v.parent; m >> v.children; return m; } //step 2: define query type in this case is a sentence with many keywords //we shall first find the keywords in the index and activate the corresponding vertexes; //step 3: define query specific type struct SCLAQvalue { char bitmap; bool is_result; SCLAQvalue(){ bitmap = 0; is_result = false; } }; ibinstream & operator<<(ibinstream &m, const SCLAQvalue &s) { m << s.bitmap; m << s.is_result; return m; } obinstream & operator>>(obinstream &m, SCLAQvalue &s) { m >> s.bitmap; m >> s.is_result; return m; } //step 4: define msg type, in this case is char //step 5: define vertex type class SLCAVertex: public VertexOL > { public: int num_of_keywords(){ return get_query().size(); } virtual SCLAQvalue init_value(vector& query) { SCLAQvalue qval; int num = query.size(); for(int i = 0;i < num; i++){ hash_set::iterator it = nqvalue().content.find(query[i]); bool is_in = (it != nqvalue().content.end());//query key in this vertex's content if(is_in) setbit(qval.bitmap, i); } if(isAllOne(qval.bitmap, num)) qval.is_result = true; return qval; } virtual void compute(MessageContainer& messages) { //every activate vertex will send message to it's parent of the bit map it has; if (superstep() == 1) { if(nqvalue().parent != -1){ send_message(nqvalue().parent, qvalue()); } } else{ int num = num_of_keywords(); if(isAllOne(qvalue().bitmap, num)) { if(qvalue().is_result==false) { vote_to_halt(); return; } else { for (int i = 0; i < messages.size(); i++) { if(messages[i].is_result)//all 1 msg { qvalue().is_result=false; break; } } } } else { bool has_all_one = false; char bor; for (int i = 0; i < messages.size(); i++) { bor |= messages[i].bitmap; if (messages[i].is_result) { has_all_one= true; break; } } //-------- char old_qval = qvalue().bitmap; qvalue().bitmap |= bor; if(old_qval != qvalue().bitmap) { if(has_all_one) { qvalue().is_result = false; } else { if(isAllOne(qvalue().bitmap, num)) { qvalue().is_result = true; } } if(nqvalue().parent != -1){ SCLAQvalue msg = qvalue(); if(isAllOne(msg.bitmap, num)) msg.is_result = true; send_message(nqvalue().parent, msg); } } } } vote_to_halt(); } }; //step 7: define worker class SLCAWorkerOL: public WorkerOL_auto { public: char buf[100]; SLCAWorkerOL() : WorkerOL_auto(false, false, true) { } vector parser; virtual SLCAVertex* toVertex(char *line) { SLCAVertex* v = new SLCAVertex; char * pch = strtok(line, "\t"); v->id = atoi(pch); pch = strtok(NULL, ">"); pch++; parser.clear(); filter(pch, parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().content.insert(parser[i]); pch = strtok(NULL, " "); v->nqvalue().start = atoi(pch); pch = strtok(NULL, " "); v->nqvalue().end = atoi(pch); pch = strtok(NULL, " "); v->nqvalue().parent = atoi(pch); pch = strtok(NULL, " "); int out_nbs_size = atoi(pch); for (int j = 0; j < out_nbs_size; j++) { pch = strtok(NULL, " "); v->nqvalue().children.push_back(atoi(pch)); } return v; } virtual vector toQuery(char* line) { vector result; filter(line, result); return result; } virtual void init(VertexContainer &vc) { vector query = get_query(); for (int i = 0; i < query.size(); i++) { vector & vposlist = idx().vpos_list(query[i]); for (int j = 0; j < vposlist.size(); j++) { activate(vposlist[j]); } } } virtual void load2Idx(SLCAVertex* v, int position, InvIdx& idx) { //this function will be called in 'load_idx_from_vertexes()'{ hash_set & content = v->nqvalue().content; for(hash_set::iterator i = content.begin(); i != content.end(); ++ i){ idx.insert(*i, position); } } virtual void idx_init() { load_idx_from_vertexes(); } virtual void dump(SLCAVertex* v, BufferedWriter& writer) { if(v->qvalue().is_result){ sprintf(buf,"%d\t%d %d\n", v->id, v->nqvalue().start, v->nqvalue().end); writer.write(buf); } } }; class SLCACombiner: public Combiner { public: virtual void combine(SCLAQvalue &old, const SCLAQvalue &new_msg) { old.bitmap |= new_msg.bitmap; if(new_msg.is_result) old.is_result = true; } }; int main(int argc, char* argv[]) { WorkerParams param; param.input_path = in_path; param.output_path = out_path; param.force_write = true; param.native_dispatcher = false; SLCAWorkerOL worker; SLCACombiner combiner; if (use_combiner) { worker.setCombiner(&combiner); } worker.run(param); return 0; }