#include "basic/pregel-dev.h" #include using namespace std; //input line format: vertexID \t start end parent num_of_children child1 child2... //output line format: vertexID \t level start end parent num_of_children child1 child2... struct SLCALevelValue_pregel { string content; int level; int start; int end; VertexID parent; vector children; }; ibinstream & operator<<(ibinstream & m, const SLCALevelValue_pregel & v) { m<>(obinstream & m, SLCALevelValue_pregel & v) { m>>v.content; m>>v.level; m>>v.start; m>>v.end; m>>v.parent; m>>v.children; return m; } //==================================== char dummy_msg = 0; class SLCALevelVertex_pregel:public Vertex { public: void broadcast() { vector & nbs=value().children; //only broadcast info to children for(int i=0; i { char buf[100000]; public: //C version virtual SLCALevelVertex_pregel* toVertex(char* line) { //vid\t start end parent num_of_children child1 child2 ... char * pch; pch=strtok(line, "\t"); SLCALevelVertex_pregel* v=new SLCALevelVertex_pregel; v->id=atoi(pch); pch=strtok(NULL, ">"); v->value().content = pch + 1; //vertex's content(string) pch = strtok(NULL," "); v->value().start = atoi(pch); //vertex's start position pch = strtok(NULL," "); v->value().end = atoi(pch); //vertex's end position pch = strtok(NULL," "); int parent = atoi(pch); //vertex's parent v->value().parent = parent; if(parent != -1) v->vote_to_halt(); pch = strtok(NULL," "); int num_of_children = atoi(pch); for(int i=0; ivalue().children.push_back(atoi(pch)); } return v; } virtual void toline(SLCALevelVertex_pregel* v, BufferedWriter & writer) { //vertexID\t level start end parent num_of_children child1 child2... int num_of_children = v->value().children.size(); sprintf(buf, "%d\t<%s> %d %d %d %d %d",v->id, v->value().content.c_str(), v->value().level, v->value().start, v->value().end, v->value().parent, num_of_children); writer.write(buf); for (int i = 0; i < num_of_children; i++) { sprintf(buf, " %d",v->value().children[i]); writer.write(buf); } writer.write("\n"); } }; class SLCALevelCombiner_pregel:public Combiner { public: virtual void combine(char & old, const char & new_msg){} }; void pregel_SLCAlevel(string in_path, string out_path, bool use_combiner) { WorkerParams param; param.input_path=in_path; param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; SLCALevelWorker_pregel worker; SLCALevelCombiner_pregel combiner; if(use_combiner) worker.setCombiner(&combiner); worker.run(param); } int main(int argc, char* argv[]){ init_workers(); /* if(argc != 3) { if(_my_rank==MASTER_RANK) cout<<"Usage: ./treelevel inputpath outputpath"; return 0; } pregel_SLCAlevel(argv[1], argv[2], true); */ pregel_SLCAlevel("/uwm", "/uwm_level", true); worker_finalize(); return 0; }