//s-t shortest path on unweighted graph, by bi-directional BFS //only for directed graph #include "ol/pregel-ol-dev.h" #include "utils/type.h" string in_path = "/toy_inout"; string out_path = "/ol_out"; bool use_combiner = true; //input line format: vertexID \t in_num in1 in2 ... out_num out1 out2 ... //edge lengths are assumed to be 1 //output line format: met_vertex \t path_length //-------------------------------------------------- //Step 1: define static field of vertex: adj-list struct BiBFSValue { vector in_nbs; vector out_nbs; char tag; }; ibinstream & operator<<(ibinstream & m, const BiBFSValue & v){ m<>(obinstream & m, BiBFSValue & v){ m>>v.in_nbs; m>>v.out_nbs; m>>v.tag; return m; } //-------------------------------------------------- //Step 2: define query type: here, it is intpair (src, dst) //-------------------------------------------------- //Step 3: define query-specific vertex state: //here, it is intpair (hop_to_src, hop_to_dst) //-------------------------------------------------- //Step 4: define msg type: here, it is char char fwd_msg=1;//01 char back_msg=2;//10 char met_msg=3;//11 //-------------------------------------------------- //Step 5: define vertex class int not_reached=-1; class BiBFSVertex:public VertexOL { public: //Step 5.1: define UDF1: query -> vertex's query-specific init state virtual intpair init_value(intpair& query) { intpair pair(not_reached, not_reached); if(id==query.v1) pair.v1=0; if(id==query.v2) pair.v2=0; return pair; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { nqvalue().tag = 0; if(superstep()==1) { intpair query=get_query(); if(query.v1==query.v2) { forceTerminate(); } else { //forward broadcast if(id==query.v1) { vector & out_nbs=nqvalue().out_nbs; nqvalue().tag |= fwd_msg; for(int i=0; i & in_nbs=nqvalue().in_nbs; nqvalue().tag |= back_msg; for(int i=0; i & out_nbs=nqvalue().out_nbs; for(int i=0; i & in_nbs=nqvalue().in_nbs; for(int i=0; i { public: inttriplet triplet; virtual void init() { triplet.v1=INT_MAX; //min triplet.v2=0;//fwd_msg triplet.v3=0;//back_msg } virtual void stepPartial(BiBFSVertex* v) { int dist1 = v->qvalue().v1; int dist2 = v->qvalue().v2; char tag = v->nqvalue().tag; if(dist1 != -1 && dist2 != -1)//both reachable { int dist = dist1 + dist2; if(dist < triplet.v1) { triplet.v1 = dist; } } if(tag & fwd_msg){ triplet.v2 ++; } if(tag & back_msg){ triplet.v3 ++; } } virtual void stepFinal(inttriplet* part) { if(part->v1 < triplet.v1) { triplet.v1 = part->v1; } triplet.v2 += part->v2; triplet.v3 += part->v3; } virtual inttriplet* finishPartial() { return &triplet; } virtual inttriplet* finishFinal() { if(BiBFSVertex::superstep() > 1){ if(triplet.v2 == 0 || triplet.v3 == 0){ BiBFSVertex::forceTerminate(); } } return &triplet; } }; //-------------------------------------------------- //Step 7: define worker class class BiBFSWorkerOL:public WorkerOL_auto { public: char buf[50]; BiBFSWorkerOL():WorkerOL_auto(true){} //Step 6.1: UDF: line -> vertex virtual BiBFSVertex* toVertex(char* line) { char * pch; pch=strtok(line, "\t"); BiBFSVertex* v=new BiBFSVertex; int id=atoi(pch); v->id=id; pch=strtok(NULL, " "); int in_num=atoi(pch); for(int i=0; inqvalue().in_nbs.push_back(nb); } pch=strtok(NULL, " "); int out_num=atoi(pch); for(int i=0; inqvalue().out_nbs.push_back(nb); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual intpair toQuery(char* line) { char * pch; pch=strtok(line, " "); int src=atoi(pch); pch=strtok(NULL, " "); int dst=atoi(pch); return intpair(src, dst); } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { int src=get_query().v1; int pos=get_vpos(src); if(pos!=-1) activate(pos); //------ int dst=get_query().v2; pos=get_vpos(dst); if(pos!=-1) activate(pos); } //Step 6.4: UDF: task_dump virtual void dump(BiBFSVertex* vertex, BufferedWriter& writer) { intpair pair=BiBFSVertex::get_query(); if((vertex->id == pair.v1)) { int dist = get_agg()->v1; if(dist == INT_MAX) sprintf(buf,"%d %d\tnot reachable\n", vertex->id, pair.v2); else sprintf(buf, "%d %d\t%d\n", vertex->id, pair.v2, dist); writer.write(buf); } } }; class BiBFSCombiner:public Combiner { public: virtual void combine(char & old, const char& new_msg) { old|=new_msg; } }; int main(int argc, char* argv[]){ WorkerParams param; param.input_path=in_path; param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; BiBFSWorkerOL worker; BiBFSCombiner combiner; if(use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }