//s-t shortest path on unweighted graph, by bi-directional BFS //only for directed graph //Updated version: Use aggregator to check if at some superstep there's no forward or backward message broadcasting #include "ol/pregel-ol-dev.h" #include "utils/type.h" string in_path = "/reach_yeslabel"; string out_path = "/ol_out"; bool use_combiner = true; #define DEBUG_MODE //input line format: vertexID \t level preorder maxpre minpost postorder in_num in1 in2 ... out_num out1 out2 ... //output line format: src_vertexID tgt_vertexID \t reachable/not reachable struct ReachBiBFSNQValue //NQvalue { int level; int preorder; int maxpre; int minpost; int postorder; vector in_nbs; vector out_nbs; }; struct ReachBiBFSQValue //Qvalue { int v1; int v2; bool reach; bool has_fwd_msg; bool has_back_msg; ReachBiBFSQValue() { v1 = -1; v2 = -1; reach = false; has_fwd_msg = false; has_fwd_msg = false; } }; ibinstream & operator<<(ibinstream & m, const ReachBiBFSNQValue & v) { m << v.level; m << v.preorder; m << v.maxpre; m << v.minpost; m << v.postorder; m << v.in_nbs; m << v.out_nbs; return m; } obinstream & operator>>(obinstream & m, ReachBiBFSNQValue & v) { m >> v.level; m >> v.preorder; m >> v.maxpre; m >> v.minpost; m >> v.postorder; m >> v.in_nbs; m >> v.out_nbs; return m; } //-------------------------------------------------- //Step 2: define query type: here, it is intpair (src, dst) //-------------------------------------------------- //Step 3: define query-specific vertex state: //here, it is intpair (hop_to_src, hop_to_dst) //-------------------------------------------------- //Step 4: define msg type: here, it is char char fwd_msg = 1; //01 char back_msg = 2; //10 char met_msg = 3; //11 //-------------------------------------------------- //Step 5: define vertex class const int not_reached = -1; const int reached = 1; //Define src&tdt info struct for aggregator to operate struct info_of_st { int src_level; int src_pre; int src_maxpre; int src_minpost; int src_post; int tgt_level; int tgt_pre; int tgt_maxpre; int tgt_minpost; int tgt_post; bool reach; bool has_fwd_msg; bool has_back_msg; info_of_st() { src_level = -1; src_pre = -1; src_maxpre = -1; src_minpost = -1; src_post = -1; tgt_level = -1; tgt_pre = -1; tgt_maxpre = -1; tgt_minpost = -1; tgt_post = -1; reach = false; has_fwd_msg = false; has_back_msg = false; } }; ibinstream& operator<<(ibinstream& m, const info_of_st& v) { m << v.src_level; m << v.src_pre; m << v.src_maxpre; m << v.src_minpost; m << v.src_post; m << v.tgt_level; m << v.tgt_pre; m << v.tgt_maxpre; m << v.tgt_minpost; m << v.tgt_post; m << v.reach; m << v.has_fwd_msg; m << v.has_back_msg; return m; } obinstream& operator>>(obinstream& m, info_of_st& v) { m >> v.src_level; m >> v.src_pre; m >> v.src_maxpre; m >> v.src_minpost; m >> v.src_post; m >> v.tgt_level; m >> v.tgt_pre; m >> v.tgt_maxpre; m >> v.tgt_minpost; m >> v.tgt_post; m >> v.reach; m >> v.has_fwd_msg; m >> v.has_back_msg; return m; } //Helper Functions //Return true if u can not reach v, when u's level is greater than or equal to v's level bool levelAssert(int u_level, int v_level) { return (u_level >= v_level); } //Return true if u can not reach, when v's nolabel is not contained by u's nolabel bool nolabelAssert(int u_minpost, int u_post, int v_minpost, int v_post) { bool not_belong = !(v_minpost >= u_minpost && v_post <= u_post); return not_belong; } //Return true if u can reach v, when v's yeslabel is contained by v's yeslabel bool yeslabelAssert(int u_pre, int u_maxpre, int v_pre, int v_maxpre) { bool belong = (v_pre >= u_pre && v_maxpre <= u_maxpre); return belong; } class ReachBiBFSVertex: public VertexOL { public: //Step 5.1: define UDF1: query -> vertex's query-specific init state virtual ReachBiBFSQValue init_value(intpair& query) { ReachBiBFSQValue pair; if (id == query.v1) pair.v1 = reached; if (id == query.v2) pair.v2 = reached; return pair; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { intpair query = get_query(); if (superstep() == 1) //At 1st superstep,only check if src and tgt are the same vertex { if (query.v1 == query.v2) { qvalue().reach = true; } } else if (superstep() == 2) //Use 3 assert way to check src and tgt vertex, and let them broadcast. Only src and tgt vertices are active in this superstep { info_of_st st_info = *(info_of_st*) get_agg(); //At the first of the 2nd iteration, we can get src&tgt info #ifdef DEBUG_MODE cout << "At 2nd superstep----------------------------------" << endl; cout << "src_info level:" << st_info.src_level << " preorder: " << st_info.src_pre << " maxpre: " << st_info.src_maxpre << " minpost: " << st_info.src_minpost << " postorder: " << st_info.src_post << endl; cout << "tgt_info level:" << st_info.tgt_level << " preorder: " << st_info.tgt_pre << " maxpre: " << st_info.tgt_maxpre << " minpost: " << st_info.tgt_minpost << " postorder: " << st_info.tgt_post << endl; #endif if (levelAssert(st_info.src_level, st_info.tgt_level)) { #ifdef DEBUG_MODE cout << "Query (" << query.v1 << ", " << query.v2 << "), Stop as unreachable because l(s) = " << st_info.src_level << " >= l(t) = " << st_info.tgt_level << endl; #endif forceTerminate(); } if (nolabelAssert(st_info.src_minpost, st_info.src_post, st_info.tgt_minpost, st_info.tgt_post)) { #ifdef DEBUG_MODE cout << "Query (" << query.v1 << ", " << query.v2 << "), Stop as unreachable because no(s) = [" << st_info.src_minpost << ", " << st_info.src_post << "] does not contain [" << st_info.tgt_minpost << ", " << st_info.tgt_post << "]" << endl; #endif forceTerminate(); } if (yeslabelAssert(st_info.src_pre, st_info.src_maxpre, st_info.tgt_pre, st_info.tgt_maxpre)) { #ifdef DEBUG_MODE cout << "Query (" << query.v1 << ", " << query.v2 << "), Stop as reachable because yes(s) = [" << st_info.src_pre << ", " << st_info.src_maxpre << "] contains [" << st_info.tgt_pre << ", " << st_info.tgt_maxpre << "]" << endl; #endif qvalue().reach = true; //No need to force-terminate here because the aggregator will check if some vertex's reach qvalue is ture.If so, force-terminate in aggregator } //forward broadcast if (id == query.v1) { vector & out_nbs = nqvalue().out_nbs; if(out_nbs.size() != 0) qvalue().has_fwd_msg = true; //has fwd broadcast for (int i = 0; i < out_nbs.size(); i++) send_message(out_nbs[i], fwd_msg); } //backward broadcast if (id == query.v2) { vector & in_nbs = nqvalue().in_nbs; if(in_nbs.size() != 0) qvalue().has_back_msg = true; //send backward broadcast for (int i = 0; i < in_nbs.size(); i++) send_message(in_nbs[i], back_msg); } vote_to_halt(); } else //Check each active vertex in the following superstep { info_of_st st_info = *(info_of_st*) get_agg(); char bor = 0; for (int i = 0; i < messages.size(); i++) { bor |= messages[i]; if (bor == met_msg) break; } if ((bor & fwd_msg) != 0) //recv msgs from forward propagation { if (qvalue().v1 == not_reached) { qvalue().v1 = reached; //v-->t?? if (yeslabelAssert(nqvalue().preorder, nqvalue().maxpre, st_info.tgt_pre, st_info.tgt_maxpre)) { qvalue().reach = true; } if (!levelAssert(nqvalue().level, st_info.tgt_level) && !nolabelAssert(nqvalue().minpost, nqvalue().postorder, st_info.tgt_minpost, st_info.tgt_post)) { vector & out_nbs = nqvalue().out_nbs; if(out_nbs.size() != 0) { qvalue().has_fwd_msg = true; //has fwd broadcast for (int i = 0; i < out_nbs.size(); i++) send_message(out_nbs[i], fwd_msg); } } } } if ((bor & back_msg) != 0) //recv msgs from backward propagation { if (qvalue().v2 == not_reached) { qvalue().v2 = reached; //s-->v?? if (yeslabelAssert(st_info.src_pre, st_info.src_maxpre, nqvalue().preorder, nqvalue().maxpre)) { qvalue().reach = true; } if (!levelAssert(st_info.src_level, nqvalue().level) && !nolabelAssert(st_info.src_minpost, st_info.src_post, nqvalue().minpost, nqvalue().postorder)) { vector & in_nbs = nqvalue().in_nbs; if(in_nbs.size() != 0) qvalue().has_back_msg = true; //send backward broadcast for (int i = 0; i < in_nbs.size(); i++) send_message(in_nbs[i], back_msg); } } } //check met? if ((qvalue().v1 != not_reached) && (qvalue().v2 != not_reached)) { qvalue().reach = true; } vote_to_halt(); } } }; //-------------------------------------------------- //Step 6: define aggregator logic class ReachBiBFSAgg: public Aggregator { public: info_of_st info; virtual void init() { if (ReachBiBFSVertex::superstep() > 1) //Initialize the info from last superstep's result { info = *(info_of_st *) ReachBiBFSVertex::get_agg(); info.has_fwd_msg=false; info.has_back_msg=false; } } virtual void stepPartial(ReachBiBFSVertex* v) { //If some vertex's reach qvalue is true, it means src and tgt meet or reachable if (v->qvalue().reach) info.reach = true; //Scan all vertices to set the info struct to contain source and info information if (ReachBiBFSVertex::superstep() == 1) { intpair query = ReachBiBFSVertex::get_query(); if (v->id == query.v1) { info.src_level = v->nqvalue().level; info.src_pre = v->nqvalue().preorder; info.src_maxpre = v->nqvalue().maxpre; info.src_minpost = v->nqvalue().minpost; info.src_post = v->nqvalue().postorder; } if (v->id == query.v2) { info.tgt_level = v->nqvalue().level; info.tgt_pre = v->nqvalue().preorder; info.tgt_maxpre = v->nqvalue().maxpre; info.tgt_minpost = v->nqvalue().minpost; info.tgt_post = v->nqvalue().postorder; } } if(ReachBiBFSVertex::superstep() > 1){ if(v->qvalue().has_fwd_msg == true) { info.has_fwd_msg = true; v->qvalue().has_fwd_msg = false; } if(v->qvalue().has_back_msg == true) { info.has_back_msg = true; v->qvalue().has_back_msg = false; } } } virtual void stepFinal(info_of_st* part) { if (part->reach) info.reach = true; if (ReachBiBFSVertex::superstep() == 1) { if (part->src_level != -1) { info.src_level = part->src_level; info.src_maxpre = part->src_maxpre; info.src_pre = part->src_pre; info.src_post = part->src_post; info.src_minpost = part->src_minpost; } if (part->tgt_level != -1) { info.tgt_level = part->tgt_level; info.tgt_maxpre = part->tgt_maxpre; info.tgt_pre = part->tgt_pre; info.tgt_post = part->tgt_post; info.tgt_minpost = part->tgt_minpost; } } if(ReachBiBFSVertex::superstep() > 1){ if(part->has_fwd_msg == true) info.has_fwd_msg = true; if(part->has_back_msg == true) info.has_back_msg = true; } } virtual info_of_st* finishPartial() { return &info; } virtual info_of_st* finishFinal() { //If some vertex's reach qvalue is true, it means src and tgt meet or reachable if (info.reach) ReachBiBFSVertex::forceTerminate(); //If at this superstep(>=2) there's no forward or backward message broadcasted,then we can force terminate the program if(ReachBiBFSVertex::superstep() > 1){ if(info.has_fwd_msg == false || info.has_back_msg == false) ReachBiBFSVertex::forceTerminate(); } return &info; } }; //-------------------------------------------------- //Step 7: define worker class class ReachBiBFSWorkerOL: public WorkerOL_auto { public: char buf[100]; ReachBiBFSWorkerOL() : WorkerOL_auto(true) { } //Step 6.1: UDF: line -> vertex virtual ReachBiBFSVertex* toVertex(char* line) { char * pch; pch = strtok(line, "\t"); ReachBiBFSVertex* v = new ReachBiBFSVertex; int id = atoi(pch); v->id = id; //id pch = strtok(NULL, " "); v->nqvalue().level = atoi(pch); //level pch = strtok(NULL, " "); v->nqvalue().preorder = atoi(pch); //preorder pch = strtok(NULL, " "); v->nqvalue().maxpre = atoi(pch); //maxpre pch = strtok(NULL, " "); v->nqvalue().minpost = atoi(pch); //minpost pch = strtok(NULL, " "); v->nqvalue().postorder = atoi(pch); //postorder pch = strtok(NULL, " "); int in_num = atoi(pch); for (int i = 0; i < in_num; i++) //in_neighbors { pch = strtok(NULL, " "); int nb = atoi(pch); v->nqvalue().in_nbs.push_back(nb); } pch = strtok(NULL, " "); int out_num = atoi(pch); for (int i = 0; i < out_num; i++) //out_neighbors { pch = strtok(NULL, " "); int nb = atoi(pch); v->nqvalue().out_nbs.push_back(nb); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual intpair toQuery(char* line) { char * pch; pch = strtok(line, " "); int src = atoi(pch); pch = strtok(NULL, " "); int dst = atoi(pch); return intpair(src, dst); } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { int src = get_query().v1; int pos = get_vpos(src); if (pos != -1) activate(pos); //------ int dst = get_query().v2; pos = get_vpos(dst); if (pos != -1) activate(pos); } //Step 6.4: UDF: task_dump virtual void dump(ReachBiBFSVertex* vertex, BufferedWriter& writer) { intpair pair = ReachBiBFSVertex::get_query(); info_of_st info = *(info_of_st *) ReachBiBFSVertex::get_agg(); if ((vertex->id == pair.v1)) { if (!info.reach) sprintf(buf, "%d %d\tnot reachable\n", vertex->id, pair.v2); else sprintf(buf, "%d %d\treachable\n", vertex->id, pair.v2); writer.write(buf); } } }; class ReachBiBFSCombiner: public Combiner { public: virtual void combine(char & old, const char& new_msg) { old |= new_msg; } }; int main(int argc, char* argv[]) { ReachBiBFSWorkerOL worker; if(argc != 3) { if(_my_rank==MASTER_RANK) cout<<"Usage: ./run inputpath outputpath"; return -1; } WorkerParams param; param.input_path=argv[1]; param.output_path=argv[2]; param.force_write=true; param.native_dispatcher=false; //------ ReachBiBFSCombiner combiner; bool use_combiner = true; if(use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }