#include "basic/pregel-dev.h" using namespace std; //input line format: vertexID \t level preorder bvalue postorder in_num in1 in2 ... out_num out1 out2 ... //output line format: vertexID \t level preorder avalue bvalue postorder in_num in1 in2 ... out_num out1 out2 ... //Try to implement a more effective algorithm: At each iteration,only let vertex with specific level to broadcast struct YesLabelAlignValue_pregel { int avalue; int bvalue; int level; int preorder; int postorder; vector in_edges; vector out_edges; }; ibinstream & operator<<(ibinstream & m, const YesLabelAlignValue_pregel & v) { m<>(obinstream & m, YesLabelAlignValue_pregel & v) { m>>v.avalue; m>>v.bvalue; m>>v.level; m>>v.preorder; m>>v.postorder; m>>v.in_edges; m>>v.out_edges; return m; } //==================================== class YesLabelAlignVertex_pregel:public Vertex { public: void broadcast(int msg) { vector & nbs=value().in_edges; //only broadcast info to in_neighbors for(int i=0; i//typename:vertex typevalue/partial/finalist { //Compute the max-level of the graph at the first iteration and decrease by 1 at each iteration private: int level; public: virtual void init(){ if(step_num()==1) level= -1; else{ int max_level = *(int *)getAgg(); level=max_level-1; } } virtual void stepPartial(YesLabelAlignVertex_pregel* v) { if(step_num()==1){ if(v->value().level > level) level = v->value().level; } } virtual void stepFinal(int* part) { if(step_num()==1){ if(*part > level) level = *part; } } virtual int* finishPartial(){ return &level; } virtual int* finishFinal(){ return &level; } }; class YesLabelAlignWorker_pregel:public Worker { char buf[100]; public: //C version virtual YesLabelAlignVertex_pregel* toVertex(char* line) { char * pch; pch=strtok(line, "\t"); YesLabelAlignVertex_pregel* v=new YesLabelAlignVertex_pregel; v->id=atoi(pch); //id pch=strtok(NULL," "); v->value().level = atoi(pch); //level pch=strtok(NULL, " "); v->value().avalue = v->value().preorder = atoi(pch); //preorder and initialize avalue pch = strtok(NULL," "); v->value().bvalue = atoi(pch); //bvalue pch = strtok(NULL," "); v->value().postorder = atoi(pch); //postorder pch = strtok(NULL," "); int in_num=atoi(pch); for(int i=0; ivalue().in_edges.push_back(atoi(pch)); } pch = strtok(NULL," "); int out_num = atoi(pch); for (int i = 0; i < out_num; i++) { //out_neighbors pch = strtok(NULL," "); v->value().out_edges.push_back(atoi(pch)); } if (v->value().out_edges.size() > 0) v->vote_to_halt(); return v; } virtual void toline(YesLabelAlignVertex_pregel* v, BufferedWriter & writer) { sprintf(buf, "%d\t%d %d %d %d %d", v->id, v->value().level, v->value().preorder,v->value().avalue, v->value().bvalue, v->value().postorder); writer.write(buf); int in_num = v->value().in_edges.size(); sprintf(buf, " %d",in_num); writer.write(buf); for (int i = 0; i < in_num; i++) { sprintf(buf, " %d",v->value().in_edges[i]); writer.write(buf); } int out_num = v->value().out_edges.size(); sprintf(buf, " %d",out_num); writer.write(buf); for (int i = 0; i < out_num; i++) { sprintf(buf, " %d",v->value().out_edges[i]); writer.write(buf); } writer.write("\n"); } }; class YesLabelAlignCombiner_pregel:public Combiner { public: virtual void combine(int & old, const int & new_msg) { if(old < new_msg) old = new_msg; } }; void pregel_YeslabelAlign(string in_path, string out_path, bool use_combiner) { WorkerParams param; param.input_path=in_path; param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; YesLabelAlignWorker_pregel worker; YesLabelAlignCombiner_pregel combiner; if(use_combiner) worker.setCombiner(&combiner); YesLabelAlignAgg_pregel agg; worker.setAggregator(&agg); worker.run(param); }