//s-t shortest path on unweighted graph, by bi-directional BFS //only for directed graph //compute the reachability of s and t #define SECONDARY_KEY_MSG //use Msg2ndKey type specified combiner #include "utils/Combiner.h" #include "ol/pregel-ol-dev.h" #include "utils/type.h" #include "utils/Heap.h" #include //input line format: bid\t[vid num nb1 len1 bid1 nb2 len2 bid2...][vid num nb1 len1 bid1 nb2 len2 bid2...]... //output line format: query output //*************************************** //Define index class.Used for a mapping from vertex's id to the block's id which it belongs to class V2BIdx{ hash_map v2b_index; public: //API void vertex2block(int vid, int bid){ v2b_index[vid] = bid; } int get_bid(int vid){ hash_map::iterator it = v2b_index.find(vid); if(it != v2b_index.end()) { return it->second; } else { return -1; } } }; //Definition of query value type struct sp_qvalue { int prev; double dist; bool updated; sp_qvalue() { prev = -1; dist = DBL_MAX; updated = false; } }; typedef vector blockQValue; //Definition of non-query value type struct Edge{ int nb_id; double len; int bid; }; ibinstream & operator<<(ibinstream & m, const Edge & e){ m << e.nb_id; m << e.len; m << e.bid; return m; } obinstream & operator>>(obinstream & m, Edge & e){ m >> e.nb_id; m >> e.len; m >> e.bid; return m; } struct sp_nqvalue{ int vid; double x; double y; double z; vector edges; int split; }; ibinstream & operator<<(ibinstream & m, const sp_nqvalue & v){ m << v.vid; m << v.x; m << v.y; m << v.z; m << v.edges; m << v.split; return m; } obinstream & operator>>(obinstream & m, sp_nqvalue & v){ m >> v.vid; m >> v.x; m >> v.y; m >> v.z; m >> v.edges; m >> v.split; return m; } struct blockNQValue { vector vertexes; hash_map map; }; ibinstream & operator<<(ibinstream & m, const blockNQValue & v){ m << v.vertexes; m << v.map; return m; } obinstream & operator>>(obinstream & m, blockNQValue & v){ m >> v.vertexes; m >> v.map; return m; } //Definition of message type struct msg_val{ int prev; double dist; }; ibinstream & operator<<(ibinstream & m, const msg_val & v){ m << v.prev; m << v.dist; return m; } obinstream & operator>>(obinstream & m,msg_val & v){ m >> v.prev; m >> v.dist; return m; } typedef Msg2ndKey sp_msg; class SSSP_BlogelVertex:public VertexOL { public: typedef qelem elemT; virtual blockQValue init_value(intpair& query) //Qvalue Initialization { blockQValue bq; bq.resize(nqvalue().vertexes.size()); return bq; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { blockQValue & qval = qvalue(); int num = nqvalue().vertexes.size(); //------ heap hp; vector tag(num, false); //Help to check whether the vertex has been processed vector eles(num); for(int i = 0; i < num; i++) eles[i] = NULL; //------ if(superstep() == 1){ //Set qvalue for the src vertex for(int i = 0; i < num; i++){ if(nqvalue().vertexes[i].vid == get_query().v1){ qval[i].prev = -1; qval[i].dist = 0; //---- double key = qval[i].dist; int val = i; eles[val] = new elemT(key, val); hp.add(*eles[val]); } } } else { //Vertex-Centric Computation: Receive messages and update vertex's qvalue(distance and previous node) for(int i = 0; i < messages.size();i++){ sp_msg & msg = messages[i]; int pos = nqvalue().map[msg.key]; //For each msg, find its target vertex in this block if(qval[pos].dist > msg.val.dist){ //Update if msg's distance is smaller than current dist qval[pos].updated = true; qval[pos].dist = msg.val.dist; qval[pos].prev = msg.val.prev; } } //------ for(int i = 0; i < num; i++){ if(qval[i].updated) { double key = qval[i].dist; int val = i; eles[val] = new elemT(key, val); hp.add(*eles[val]); } } } //------ //run dijkstra algorithm while(hp.size() > 0){ elemT & u = hp.peek(); if(u.key == DBL_MAX) break; hp.remove(); int uindex = u.val; double udist = qval[uindex].dist; tag[uindex] = true; sp_nqvalue & cur = nqvalue().vertexes[uindex]; int split = cur.split; vector & edges = cur.edges; //in-block processing for(int i = 0;i <= split;i++){ int pos = edges[i].bid; if(tag[pos] == false){ double alt = udist + edges[i].len; if(alt < qval[pos].dist){ if(eles[pos] == NULL){ eles[pos] = new elemT(alt, pos); hp.add(*eles[pos]); } else { eles[pos]->key = alt; hp.fix(*eles[pos]); } qval[pos].updated = true; qval[pos].dist = alt; qval[pos].prev = cur.vid; } } } //out-block processing for(int i = split+1; i < edges.size(); i++){ sp_msg msg; msg.val.dist = udist + edges[i].len; msg.val.prev = cur.vid; msg.key = edges[i].nb_id; send_message(edges[i].bid, msg); } } for(int i = 0;i < num; i++){ if(eles[i] != NULL) delete eles[i]; } vote_to_halt(); } }; struct coord{ //Used in aggregator:(x,y,z) stores source's coordination, min stores minimum distance, dst_dist stores target's distance double x; double y; double z; double min; double dst_dist; }; ibinstream & operator<<(ibinstream & m, const coord & v){ m << v.x; m << v.y; m << v.z; m << v.min; m << v.dst_dist; return m; } obinstream & operator>>(obinstream & m, coord & v){ m >> v.x; m >> v.y; m >> v.z; m >> v.min; m >> v.dst_dist; return m; } class SSSP_BlogelAgg:public Aggregator { public: coord state; virtual void init() { if(SSSP_BlogelVertex::superstep() == 1) { //For the first superstep, we only get source's coordination state.min = -1; state.dst_dist = -1; } else { //For the following superstep, we get smallest direct distance of updated vertices, and compare it with destination node's current distance state.min = DBL_MAX; coord * agg=(coord *)SSSP_BlogelVertex::get_agg(); state.dst_dist = agg->dst_dist; state.x = agg->x; state.y = agg->y; state.z = agg->z; } } virtual void stepPartial(SSSP_BlogelVertex* v) { vector & vertices = v->nqvalue().vertexes; vector & qvertices = v->qvalue(); if(SSSP_BlogelVertex::superstep() == 1) { int src = SSSP_BlogelVertex::get_query().v1; for(int i=0; i dis) state.min = dis; qvertices[i].updated =false; } if(vertices[i].vid == SSSP_BlogelVertex::get_query().v2){ state.dst_dist = qvertices[i].dist; } } } } virtual void stepFinal(coord* part) { if(SSSP_BlogelVertex::superstep() == 1) { if(part->min == 0) { state.min = 0; state.x = part->x; state.y = part->y; state.z = part->z; } } else { if(state.min > part->min) state.min = part->min; if(part->dst_dist != -1) state.dst_dist = part->dst_dist; } } virtual coord* finishPartial() { return &state; } virtual coord* finishFinal() { if(SSSP_BlogelVertex::superstep() > 1){ if(state.dst_dist != -1) { double adjust_dist = state.dst_dist * state.dst_dist; if(state.min > adjust_dist){ SSSP_BlogelVertex::forceTerminate(); } } } return &state; } }; class SSSP_BlogelWorker:public WorkerOL_auto { char buf[1000]; sp_nqvalue dummy; Edge dummyEdge; public: SSSP_BlogelWorker():WorkerOL_auto(true,false,true){} virtual SSSP_BlogelVertex* toVertex(char* line) { //format: bid \t [vid x y z num nb1 len1 bid1 nb2 len2 bid2...] [vid x y z num nb1 len1 bid1 nb2 len2 bid2...] ... SSSP_BlogelVertex* v = new SSSP_BlogelVertex; char *pch; pch = strtok(line, "\t"); v->id = atoi(pch); //read the block_id of one block vector& vertexes = v->nqvalue().vertexes; int count = 0; hash_map& map = v->nqvalue().map; while( (pch=strtok(NULL, " ")) != NULL ) { vertexes.push_back(dummy); sp_nqvalue & cur = vertexes.back(); //load current vertex in the block cur.vid = atoi(pch); //read the vid of current vertex map[cur.vid] = count; //logic(local) id of the vertex in block count++; pch=strtok(NULL," "); //read x cur.x = atof(pch); pch=strtok(NULL," "); //read y cur.y = atof(pch); pch=strtok(NULL," "); //read z cur.z = atof(pch); pch=strtok(NULL, " "); int num = atoi(pch); vector& edges = cur.edges; for(int i=0; i tmp; vector tmp1; for (int i = 0; i < edges.size(); i++) { if (edges[i].bid == v->id) { tmp.push_back(edges[i]); } else tmp1.push_back(edges[i]); } edges.swap(tmp); cur.split = cur.edges.size() - 1; //split defined to be the last element's position which belongs to the current block edges.insert(cur.edges.end(), tmp1.begin(), tmp1.end()); } //------ for(int i=0; i& edges = cur.edges; for(int j=0; j<=cur.split; j++) { edges[j].bid = map[edges[j].nb_id]; } } return v; } virtual intpair toQuery(char* line) { char * pch; pch=strtok(line, " "); int src=atoi(pch); pch=strtok(NULL, " "); int dst=atoi(pch); return intpair(src, dst); } virtual void init(VertexContainer& vertex_vec) { int src = SSSP_BlogelVertex::get_query().v1; int src_bid = idx().get_bid(src); int pos = get_vpos(src_bid); if(pos != -1) activate(pos); } virtual void load2Idx(SSSP_BlogelVertex* v, int position, V2BIdx& idx) { vector& vertexes = v->nqvalue().vertexes; for(int i = 0;i < vertexes.size();i++){ idx.vertex2block(vertexes[i].vid, v->id); } } virtual void idx_init(){ load_idx_from_vertexes(); } virtual void dump(SSSP_BlogelVertex* vertex, BufferedWriter& writer) { VertexID dst = SSSP_BlogelVertex::get_query().v2; int des_bid = idx().get_bid(dst); if(des_bid == -1) return; VertexID src = SSSP_BlogelVertex::get_query().v1; if(vertex->id == des_bid){ int pos = vertex->nqvalue().map[dst]; sprintf(buf,"Distance from %d to %d : %f \n", src, dst, vertex->qvalue()[pos].dist); writer.write(buf); } } }; class SSSP_BlogelCombiner: public Combiner { virtual void combine(msg_val& old, msg_val& new_msg){ if(old.dist > new_msg.dist){ new_msg.dist = old.dist; new_msg.prev = old.prev; } } }; int main(int argc, char* argv[]) { WorkerParams param; param.input_path="/usaxyz_3"; param.output_path="/usaxyz_test_out"; param.force_write=true; param.native_dispatcher=false; SSSP_BlogelWorker worker; // SSSP_BlogelCombiner combiner; // bool use_combiner = false; // if(use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }