#include "basic/pregel-dev.h" using namespace std; int src=100; int dst=10000; struct ReachValue { char tag; vector in_edges; vector out_edges; }; ibinstream & operator<<(ibinstream & m, const ReachValue & v){ m<>(obinstream & m, ReachValue & v){ m>>v.tag; m>>v.in_edges; m>>v.out_edges; return m; } class ReachVertex:public Vertex { public: virtual void compute(MessageContainer & messages) { if(step_num()==1) { char & mytag=value().tag; if(mytag==2)//v->dst { vector & nbs=value().in_edges; for(int i=0; iv { vector & nbs=value().out_edges; for(int i=0; idst { vector & nbs=value().in_edges; for(int i=0; iv { vector & nbs=value().out_edges; for(int i=0; i { char buf[100]; public: //input format: vid \t num_inNB nb1 nb2 ... \t num_outNB nb1 nb2 ... //output format: no output if not reachable virtual ReachVertex* toVertex(char* line) { char * pch; pch=strtok(line, "\t"); ReachVertex* v=new ReachVertex; v->id=atoi(pch); vector & in_edges=v->value().in_edges; pch=strtok(NULL, " "); int indegree=atoi(pch); for(int i=0; i & out_edges=v->value().out_edges; pch=strtok(NULL, " "); int outdegree=atoi(pch); for(int i=0; iid==src) v->value().tag=1; else if(v->id==dst) v->value().tag=2; else { v->value().tag=0; v->vote_to_halt(); } return v; } virtual void toline(ReachVertex* v, BufferedWriter & writer) { if(v->value().tag==3){ sprintf(buf, "Reachable through Vertex %d\n", v->id); writer.write(buf); } } }; class ReachCombiner:public Combiner { public: virtual void combine(char & old, const char & new_msg) { old|=new_msg; } }; int main(int argc, char* argv[]){ init_workers(); WorkerParams param; param.input_path="/directed"; param.output_path="/reach"; param.force_write=true; param.native_dispatcher=false; ReachWorker worker; ReachCombiner combiner; worker.setCombiner(&combiner); worker.run(param); worker_finalize(); return 0; }