//hub-acc: merge outputs of indexing to one file #include "basic/pregel-dev.h" #include "utils/type.h" string entry_exit_path = "/ol_index"; string matrix_path = "/ol_format"; string out_path = "/ol_merged"; //input: from hubacc_ug_index_batch.cpp //input 1: entry_exit_path line format: vid \t in_H num nb1 nb2 ... h_num h1 hop1 h2 hop2 ... //input 2: matrix_path line format:# srcHVid \t dstHVid1 hop1 dstHVid2 hop2 ... (from FormatHighDeg) //output line format: //vid \t in_H==false num nb1 nb2 ... h_num h1 hop1 h2 hop2 ... //vid \t in_H==true num nb1 nb2 ... dstHVid1 hop1 dstHVid2 hop2 ... class HubUGMergeVertex:public Vertex { public: virtual void compute(MessageContainer & messages) { if(step_num()==1) { if(id.v2==1)//Type 1 vertex sends info to Type 0 vertex { intpair tgt(id.v1, 0); send_message(tgt, value()); } } else { value()+=messages[0]; } vote_to_halt(); } }; class HubUGMergeWorker:public Worker { char buf[100]; public: //C version virtual HubUGMergeVertex* toVertex(char* line) { HubUGMergeVertex* v=new HubUGMergeVertex; char * pch; if(line[0]=='#') { pch=strtok(line+2, "\t"); v->id.v1=atoi(pch); v->id.v2=1; pch=strtok(NULL, "\n"); v->value()=pch; } else { pch=strtok(line, "\t"); v->id.v1=atoi(pch); v->id.v2=0; pch=strtok(NULL, "\n"); v->value()=pch; //============== pch=strtok(pch, " "); int in_H=atoi(pch); if(in_H) { v->value().clear(); v->value() += pch; v->value() += " "; pch=strtok(NULL, " "); int num=atoi(pch); v->value() += pch; v->value() += " "; for(int i=0; ivalue()+=pch; v->value()+=" "; } } } return v; } virtual void toline(HubUGMergeVertex* v, BufferedWriter & writer) { if(v->id.v2==0) { sprintf(buf, "%d\t", v->id.v1); writer.write(buf); writer.write(v->value().c_str()); writer.write("\n"); } } }; int main(int argc, char* argv[]){ init_workers(); MultiInputParams param; param.add_input_path(entry_exit_path); param.add_input_path(matrix_path); param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; HubUGMergeWorker* worker=new HubUGMergeWorker; worker->run(param); delete worker; worker_finalize(); return 0; }