#include "basic/pregel-dev.h" #include "utils/type.h" #include "signal.h" #define C 10 struct TCValue_pregel { // number of triangles int count; int send_count; int deg; // iterator position size_t pos_i; size_t pos_j; }; ibinstream & operator<<(ibinstream & m, const TCValue_pregel & v) { m<>(obinstream & m, TCValue_pregel & v) { m>>v.count; m>>v.send_count; m>>v.deg; m>>v.pos_i; m>>v.pos_j; return m; } typedef Edge TCEdge; bool operator<(const TCEdge& lhs, const TCEdge& rhs) { return (lhs.eval < rhs.eval) || ((lhs.eval == rhs.eval) && (lhs.id < rhs.id)); } bool operator==(const TCEdge& lhs, const TCEdge& rhs) { return (lhs.eval == rhs.eval) && (lhs.id == rhs.id); } struct TCEdge_pair { TCEdge p1; TCEdge p2; }; ibinstream & operator<<(ibinstream & m, const TCEdge_pair & v) { m << v.p1; m << v.p2; return m; } obinstream & operator>>(obinstream & m, TCEdge_pair & v) { m >> v.p1; m >> v.p2; return m; } class TCVertex_pregel : public EVertex { public: virtual void compute(MessageContainer & messages) { //*** FT-change if(!newly_respawned) {//only kill for the first time if(_my_rank == 1 && step_num()==17) { printf("%d: I am killing myself !!!\n", _my_rank); raise(SIGKILL); while(1); } } vector & nbs = edges(); size_t num_pending_msg = C*value().deg;//quota of requests int num_msgs = 0; if(step_num() % 2 == 1) { //count triangles value().count += messages.size(); //odd, request //================== part 1: -> //forwarding size_t i = value().pos_i; size_t j = value().pos_j; size_t len = nbs.size(); j++;//forward to next (i, j) for(; i -> msgs //get value() again i = value().pos_i; j = value().pos_j; num_msgs = value().send_count; //generate msgs from value() if(j>=len) { j=len-1; i=j-1; } for(int k=0; k & nbs = edges(); for(int i=0; i { char buf[100]; public: virtual TCVertex_pregel * toVertex(char * line) { char * pch; pch = strtok(line, "\t"); TCVertex_pregel * v = new TCVertex_pregel; v->id = atoi(pch); pch = strtok(NULL, " "); v->value().deg = atoi(pch); TCEdge me; me.id = v->id; me.eval = v->value().deg; vector & nbs = v->edges(); for(int i=0; ivalue().count = 0; v->value().pos_i = 0; v->value().pos_j = 0; //now is (0, 0), so that next is (0, 1) return v; } virtual void toline(TCVertex_pregel * v, BufferedWriter & writer) { sprintf(buf, "%d\t%d\n", v->id, v->value().count); writer.write(buf); } virtual bool setCPdisable() { if (step_num() % 2 == 1) return false; else return true; } }; void pregel_triangle(std::string input_path, std::string output_path) { WorkerParams param; param.input_path = input_path; param.output_path = output_path; param.force_write = true; param.native_dispatcher = false; TCWorker_pregel worker; worker.setCPGap(10); //###### set CP period here worker.run(param); } int main(int argc, char ** argv) { init_workers(&argc, &argv); pregel_triangle(argv[1], argv[2]); worker_finalize(); return 0; }