//hub-acc indexing for undirected graph #include "ol/pregel-ol-dev.h" #include "utils/type.h" enum key_type { INDEGREE_ONLY, OUTDEGREE_ONLY, INOUT_DEGREE }; string in_path = "/toy_inout"; string out_path = "/ol_out"; string save_path = "/ol_index"; bool use_combiner = true; int deg_th = 197; //from TopSelect_2 key_type keytype = INDEGREE_ONLY; //-------------------------------------------------- //Step 1: define static field of vertex: adj-list struct SPIdxNQValue { vector in_nbs; vector out_nbs; vector fwd_hubgate; vector back_hubgate; bool in_H; }; ibinstream & operator<<(ibinstream & m, const SPIdxNQValue & v) { m << v.in_nbs; m << v.out_nbs; m << v.fwd_hubgate; m << v.back_hubgate; m << v.in_H; return m; } obinstream & operator>>(obinstream & m, SPIdxNQValue & v) { m >> v.in_nbs; m >> v.out_nbs; m >> v.fwd_hubgate; m >> v.back_hubgate; m >> v.in_H; return m; } //-------------------------------------------------- //Step 2: define query type: here, it is int (src) //-------------------------------------------------- //Step 3: define query-specific vertex state: //here, it is (hop_from/to_src, has_pred_in_H) struct SPIdxQValue { bool fwd_pred_in_H; bool back_pred_in_H; int fwd_hop; int back_hop; }; ibinstream & operator<<(ibinstream & m, const SPIdxQValue & v) { m << v.fwd_hop; m << v.fwd_pred_in_H; m << v.back_hop; m << v.back_pred_in_H; return m; } obinstream & operator>>(obinstream & m, SPIdxQValue & v) { m >> v.fwd_hop; m >> v.fwd_pred_in_H; m >> v.back_hop; m >> v.back_pred_in_H; return m; } //-------------------------------------------------- //Step 4: define msg type: here, it is bool (whether there's a predecessor other than src) /* * The msg consists of 4 bits. We use 0000ABCD to describe a byte, the A bit is 1 if there exists a forward message * B bit is 1 if fwd_pred_in_H is true, C bit is 1 if there exists a backward message, D bit is 1 if back_pred_in_H is true */ char back_has_pred_in_H = 1; char back_has_msg = 2; char fwd_has_pred_in_H = 4; char fwd_has_msg = 8; //-------------------------------------------------- //Step 5: define vertex class class SPIdxVertex: public VertexOL { public: //Step 5.1: define UDF1: query -> vertex's query-specific init state virtual SPIdxQValue init_value(int & query) { SPIdxQValue qval; if (id == query) { qval.fwd_hop = 0; qval.back_hop = 0; } else { qval.fwd_hop = INT_MAX; qval.back_hop = INT_MAX; } qval.fwd_pred_in_H = false; qval.back_pred_in_H = false; return qval; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { if (superstep() == 1) { //notice there are two hubgates, forward hubgates and backward hub gates. //only src is active //fwd broadcast vector & out_nbs = nqvalue().out_nbs; for (int i = 0; i < out_nbs.size(); i++) send_message(out_nbs[i], fwd_has_msg); //msg is false //rule out src !!!, if another in_H vertex is encountered, later src is not an entry/exit of later vertices //backward broadcast vector & in_nbs = nqvalue().in_nbs; for (int i = 0; i < in_nbs.size(); i++) send_message(in_nbs[i], back_has_msg); } else { char msg = 0; for (int i = 0; i < messages.size(); i++) { msg |= messages[i]; } if (qvalue().fwd_hop == INT_MAX) { //not reached forward before //check if there exsits a fwd_msg if (msg & fwd_has_msg) { qvalue().fwd_hop = superstep() - 1; if (msg & fwd_has_pred_in_H) { qvalue().fwd_pred_in_H = true; } vector & out_nbs = nqvalue().out_nbs; if (nqvalue().in_H) { //send forward message and indicating there exist a H for (int i = 0; i < out_nbs.size(); i++) { send_message(out_nbs[i], fwd_has_msg | fwd_has_pred_in_H); } } else { //notice the send message char send = (qvalue().fwd_pred_in_H) ? fwd_has_pred_in_H : 0; for (int i = 0; i < out_nbs.size(); i++) { send_message(out_nbs[i], fwd_has_msg | send); } } } } if (qvalue().back_hop == INT_MAX) { //not reached backward before if (msg & back_has_msg) { qvalue().back_hop = superstep() - 1; if (msg & back_has_pred_in_H) { qvalue().back_pred_in_H = true; } vector & in_nbs = nqvalue().in_nbs; if (nqvalue().in_H) { for (int i = 0; i < in_nbs.size(); i++) { send_message(in_nbs[i], back_has_msg | back_has_pred_in_H); } } else { for (int i = 0; i < in_nbs.size(); i++) { send_message(in_nbs[i], back_has_msg | qvalue().back_pred_in_H); } } } } vote_to_halt(); } } }; //-------------------------------------------------- //Step 6: define worker class class SPIdxWorkerOL: public WorkerOL_auto { public: char buf[50]; SPIdxWorkerOL() : WorkerOL_auto(false, true) { } //Step 6.1: UDF: line -> vertex virtual SPIdxVertex* toVertex(char* line) { SPIdxVertex* v = new SPIdxVertex; char * pch; pch = strtok(line, "\t"); v->id = atoi(pch); pch = strtok(NULL, " "); int in_num = atoi(pch); for (int i = 0; i < in_num; i++) { pch = strtok(NULL, " "); int nb = atoi(pch); v->nqvalue().in_nbs.push_back(nb); } pch = strtok(NULL, " "); int out_num = atoi(pch); for (int i = 0; i < out_num; i++) { pch = strtok(NULL, " "); int nb = atoi(pch); v->nqvalue().out_nbs.push_back(nb); } //--------------------------- if (keytype == INDEGREE_ONLY) { if (in_num >= deg_th) v->nqvalue().in_H = true; else v->nqvalue().in_H = false; } else if (keytype == OUTDEGREE_ONLY) { if (out_num >= deg_th) v->nqvalue().in_H = true; else v->nqvalue().in_H = false; } else { if (in_num + out_num >= deg_th) v->nqvalue().in_H = true; else v->nqvalue().in_H = false; } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual int toQuery(char* line) { char * pch; pch = strtok(line, " "); int src = atoi(pch); return src; } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { VertexID src = get_query(); int pos = get_vpos(src); if (pos != -1) activate(pos); } //Step 6.4: UDF: task_dump virtual void dump(SPIdxVertex* vertex, BufferedWriter& writer) { int src = get_query(); //1. set entry/exit_vertex_set if (vertex->qvalue().fwd_pred_in_H == false && vertex->qvalue().fwd_hop != INT_MAX) { vertex->nqvalue().back_hubgate.push_back( intpair(src, vertex->qvalue().fwd_hop)); } if (vertex->qvalue().back_pred_in_H == false && vertex->qvalue().back_hop != INT_MAX) { vertex->nqvalue().fwd_hubgate.push_back( intpair(src, vertex->qvalue().back_hop)); } //2. dump matrix entry if (vertex->nqvalue().in_H) { sprintf(buf, "%d %d\t%d\n", src, vertex->id, vertex->qvalue().fwd_hop); writer.write(buf); } } //Step 6.5: UDF: server_vertex-saving virtual void save(SPIdxVertex* vertex, BufferedWriter& writer) { vector & in_nbs = vertex->nqvalue().in_nbs; sprintf(buf, "%d\t%d %d", vertex->id, vertex->nqvalue().in_H, in_nbs.size()); writer.write(buf); for (int i = 0; i < in_nbs.size(); i++) { sprintf(buf, " %d", in_nbs[i]); writer.write(buf); } vector & out_nbs = vertex->nqvalue().out_nbs; sprintf(buf, " %d", out_nbs.size()); writer.write(buf); for (int i = 0; i < out_nbs.size(); i++) { sprintf(buf, " %d", out_nbs[i]); writer.write(buf); } vector & fwd_hubgate = vertex->nqvalue().fwd_hubgate; sprintf(buf, " %d", fwd_hubgate.size()); writer.write(buf); for (int i = 0; i < fwd_hubgate.size(); i++) { sprintf(buf, " %d %d", fwd_hubgate[i].v1, fwd_hubgate[i].v2); writer.write(buf); } vector & back_hubgate = vertex->nqvalue().back_hubgate; sprintf(buf, " %d", back_hubgate.size()); writer.write(buf); for (int i = 0; i < back_hubgate.size(); i++) { sprintf(buf, " %d %d", back_hubgate[i].v1, back_hubgate[i].v2); writer.write(buf); } writer.write("\n"); } }; class SPIdxCombiner: public Combiner { public: virtual void combine(char & old, const char& new_msg) { old |= new_msg; } }; int main(int argc, char* argv[]) { WorkerParams param; param.input_path = in_path; param.output_path = out_path; param.force_write = true; param.native_dispatcher = false; SPIdxWorkerOL worker; worker.set_file2save(save_path); SPIdxCombiner combiner; if (use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }