//hub-acc indexing for undirected graph #include "ol/pregel-ol-dev.h" #include "utils/type.h" string in_path = "/toy_ug"; string out_path = "/ol_out"; string save_path = "/ol_index"; bool use_combiner = true; int deg_th = 78826; //from TopSelect_2 //input line format: vid \t num nb1 nb2 ... //edge lengths are assumed to be 1 //output line format: query_src vid \t dist //index line format: vid \t in_H num nb1 nb2 ... h_num h1 hop1 h2 hop2 ... //- "h_num h1 hop1 h2 hop2 ..." refers to the entry/exit_vertex_set //-------------------------------------------------- //Step 1: define static field of vertex: adj-list struct SPIdxNQValue { vector nbs; vector hubgate; bool in_H; }; ibinstream & operator<<(ibinstream & m, const SPIdxNQValue & v){ m<>(obinstream & m, SPIdxNQValue & v){ m>>v.nbs; m>>v.hubgate; m>>v.in_H; return m; } //-------------------------------------------------- //Step 2: define query type: here, it is int (src) //-------------------------------------------------- //Step 3: define query-specific vertex state: //here, it is (hop_from/to_src, has_pred_in_H) struct SPIdxQValue { int hop; bool pred_in_H; }; ibinstream & operator<<(ibinstream & m, const SPIdxQValue & v){ m<>(obinstream & m, SPIdxQValue & v){ m>>v.hop; m>>v.pred_in_H; return m; } //-------------------------------------------------- //Step 4: define msg type: here, it is bool (whether there's a predecessor other than src) //-------------------------------------------------- //Step 5: define vertex class class SPIdxVertex:public VertexOL { public: //Step 5.1: define UDF1: query -> vertex's query-specific init state virtual SPIdxQValue init_value(int & query) { SPIdxQValue qval; if(id==query) qval.hop=0; else qval.hop=INT_MAX; qval.pred_in_H=false; return qval; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { if(superstep()==1) { //only src is active vector & nbs=nqvalue().nbs; for(int i=0; i & nbs=nqvalue().nbs; if(nqvalue().in_H) { for(int i=0; i { public: char buf[50]; SPIdxWorkerOL():WorkerOL_auto(false, true){} //Step 6.1: UDF: line -> vertex virtual SPIdxVertex* toVertex(char* line) { SPIdxVertex* v=new SPIdxVertex; char * pch; pch=strtok(line, "\t"); v->id=atoi(pch); pch=strtok(NULL, " "); int num=atoi(pch); if(num >= deg_th) v->nqvalue().in_H=true; else v->nqvalue().in_H=false; for(int i=0; inqvalue().nbs.push_back(nb); } return v; } //Step 6.2: UDF: query string -> query (src_id) virtual int toQuery(char* line) { char * pch; pch=strtok(line, " "); int src=atoi(pch); return src; } //Step 6.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { int src=get_query(); int pos=get_vpos(src); if(pos!=-1) activate(pos); } //Step 6.4: UDF: task_dump virtual void dump(SPIdxVertex* vertex, BufferedWriter& writer) { int src=get_query(); //1. set entry/exit_vertex_set if(vertex->qvalue().pred_in_H==false) { vertex->nqvalue().hubgate.push_back(intpair(src, vertex->qvalue().hop)); } //2. dump matrix entry if(vertex->nqvalue().in_H) { sprintf(buf, "%d %d\t%d\n", src, vertex->id, vertex->qvalue().hop); writer.write(buf); } } //Step 6.5: UDF: server_vertex-saving virtual void save(SPIdxVertex* vertex, BufferedWriter& writer){ vector & nbs=vertex->nqvalue().nbs; sprintf(buf, "%d\t%d %d", vertex->id, vertex->nqvalue().in_H, nbs.size()); writer.write(buf); for(int i=0; i & hubgate=vertex->nqvalue().hubgate; sprintf(buf, " %d", hubgate.size()); writer.write(buf); for(int i=0; i { public: virtual void combine(bool & old, const bool& new_msg) { if(new_msg) old=true; } }; int main(int argc, char* argv[]){ WorkerParams param; param.input_path=in_path; param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; SPIdxWorkerOL worker; worker.set_file2save(save_path); SPIdxCombiner combiner; if(use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }