#include "Vertex.h" #include "Comper.h" #include "Runner.h" #include "Combiner.h" #include //#define WEIGHTED_GRAPH int src = 0; struct SPEdge { double len; VertexID nb; }; obinstream & operator<<(obinstream & m, const SPEdge & e) { m << e.len; m << e.nb; return m; } ibinstream & operator>>(ibinstream & m, SPEdge & e) { m >> e.len; m >> e.nb; return m; } ofbinstream & operator<<(ofbinstream & m, const SPEdge & e) { m << e.len; m << e.nb; return m; } ifbinstream & operator>>(ifbinstream & m, SPEdge & e) { m >> e.len; m >> e.nb; return m; } class SPCombiner:public Combiner { public: virtual void combine(double& old, const double& new_msg) { if (old > new_msg) old = new_msg; } }; //class SPVertex:public Vertex //no combiner class SPVertex:public Vertex //with combiner { public: void broadcast(vector& edges) { for (int i = 0; i < edges.size(); ++ i) send_message(edges[i].nb, value+edges[i].len); } virtual void compute(vector& msgs, vector& edges) { if(step_num()==1) { if (id == src) { value = 0; broadcast(edges); } else { value = DBL_MAX; } } else { double min = DBL_MAX; for (int i = 0; i < msgs.size(); i++) { if (msgs[i] < min) min = msgs[i]; } if (min < value) { value = min; broadcast(edges); } } vote_to_halt(); } }; class SPComper:public Comper { char buf[100]; public: #ifndef WEIGHTED_GRAPH //unweighted graph virtual VertexID parseVertex(char* line, obinstream& file_stream) { char * pch = strtok(line, "\t"); VertexID id = atoll(pch); file_stream << id; //write file_stream << DBL_MAX; //write , init DBL_MAX if (id == src) file_stream << true; //write else file_stream << false; pch=strtok(NULL, " "); int num=atoi(pch); file_stream << num; //write numNbs for(int i=0; i } return id; } #else //weighted graph virtual VertexID parseVertex(char* line, obinstream& file_stream) { cout<<"here"< file_stream << DBL_MAX; //write , init DBL_MAX if (id == src) file_stream << true; //write else file_stream << false; pch=strtok(NULL, " "); int num=atoi(pch); file_stream << num; //write numNbs for(int i=0; i } return id; } #endif /* virtual void to_line(SPVertex& v, ofstream& fout) { fout< runner; src = atoi(argv[3]); string hdfs_inpath = argv[1]; string hdfs_outpath = argv[2]; string local_root = "iopregel_localspace"; bool dump_with_edges = false; runner.runHH(hdfs_inpath, hdfs_outpath, local_root, dump_with_edges, argc, argv); //HDFS Load, HDFS Dump return 0; }