#include "RecodeVertex.h" #include "RecodeComper.h" #include "RecodeRunner.h" #include "Combiner.h" #include int src = 0; struct SPEdge { double len; VertexID nb; }; obinstream & operator<<(obinstream & m, const SPEdge & e) { m << e.len; m << e.nb; return m; } ibinstream & operator>>(ibinstream & m, SPEdge & e) { m >> e.len; m >> e.nb; return m; } ofbinstream & operator<<(ofbinstream & m, const SPEdge & e) { m << e.len; m << e.nb; return m; } ifbinstream & operator>>(ifbinstream & m, SPEdge & e) { m >> e.len; m >> e.nb; return m; } class SPCombiner:public Combiner { public: virtual void combine(double& old, const double& new_msg) { if (old > new_msg) old = new_msg; } }; class SPVertex:public RecodeVertex //with combiner { public: void broadcast(vector& edges) { for (int i = 0; i < edges.size(); ++ i) { send_message(edges[i].nb, value+edges[i].len); } } virtual void compute(double agg_msg, vector& edges) { if(step_num()==1) { if (old_id == src) { value = 0; broadcast(edges); } else { value = DBL_MAX; } } else { if (agg_msg < value) { value = agg_msg; broadcast(edges); } } vote_to_halt(); } }; class SPComper:public RecodeComper { char buf[100]; public: virtual double get_zero() { return DBL_MAX; } /* virtual void to_line(SPVertex& v, ofstream& fout) { fout< runner; src = atoi(argv[3]); string hdfs_inpath = argv[1]; string hdfs_outpath = argv[2]; string local_root = "iopregel_localspace"; bool dump_with_edges = false; bool sender_combine = true; runner.runLH(hdfs_outpath, local_root, dump_with_edges, sender_combine, argc, argv); //HDFS Load, HDFS Dump return 0; }