#include "./system/subg-dev.h" using namespace std; struct TriangleContext{ public: int count; VertexID last_id; TriangleContext(){} friend ibinstream& operator<<(ibinstream& m, const TriangleContext& v) { m << v.count; m << v.last_id; return m; } friend obinstream& operator>>(obinstream& m, TriangleContext& v) { m >> v.count; m >> v.last_id; return m; } friend ifbinstream& operator<<(ifbinstream& m, const TriangleContext& v) { m << v.count; m << v.last_id; return m; } friend ofbinstream& operator>>(ofbinstream& m, TriangleContext & v) { m >> v.count; m >> v.last_id; return m; } }; class CountAgg:public Aggregator { private: unsigned long long count; public: virtual ~CountAgg(){} virtual void init(){ count = 0; } virtual void stepPartial(TriangleContext & v) { count += v.count; } virtual void stepFinal(unsigned long long* part) { count += *part; } virtual unsigned long long* finishPartial(){ return &count; } virtual unsigned long long* finishFinal(){ return &count; } }; class TriangleTask:public Task{ public: virtual bool compute(SubgraphT & g, ContextType & context, vector & frontier){ VertexT lastV; lastV.id = context.last_id; frontier.push_back(&lastV); int size = frontier.size(); for(int i = 0; i< size - 1 ; i++){ AdjVtxList & adj = frontier[i]->getAdjList(); int size_i = 0; //pos in frontier[i]’s adj to search next int size_j = adj.size(); for(int j = i+1; j < size; j++){ VertexID id_j = frontier[j]->id; ////frontier[j]’s ID while((size_i < size_j) && (adj[size_i].id < id_j)) size_i++; if(size_i >= size_j) break; if(adj[size_i].id == id_j ){ context.count ++; size_i++; } } } return false; } }; class TriangleWorker:public Worker { public: virtual VertexT* respond(VertexT * v){ KeyT vid = v->id; VertexT * tmp = new VertexT; tmp->id = vid; AdjVtxList & adjList= v->getAdjList(); AdjVtxIter vIter = adjList.begin(); while((vIter < adjList.end()) && (vIter->id < vid)) vIter++; tmp->adjList.insert(tmp->adjList.end(), vIter, adjList.end()); return tmp; } virtual void seedTask_gene(VertexT * v){ AdjVtxList & adjList= v->getAdjList(); if(adjList.size() <= 1) return; AdjVtxList candidates; //the vertexes to be pulled in the next round VertexID vid = v->id; AdjVtxIter vIter = adjList.begin(); while((vIter < adjList.end()-1) && (vIter->id <= vid)) vIter++; candidates.insert(candidates.end(), vIter, adjList.end()-1); if (candidates.size() >= 1) { TriangleTask * task = new TriangleTask; task->pull(candidates); task->context.count = 0; task->context.last_id = adjList[adjList.size()-1].id; addTask(task); } } virtual void print_result(){ unsigned long long* agg=(unsigned long long*)getAgg(); if (_my_rank == 0) { cout << "The sum of all triangles is " << *agg << endl; } } virtual VertexT* toVertex(char* line) { VertexT* v = new VertexT; char * pch; pch=strtok(line, " "); v->id=atoi(pch); strtok(NULL,"\t"); while((pch=strtok(NULL, " ")) != NULL) { AdjVertex item; item.id = atoi(pch); pch=strtok(NULL, " "); item.wid = atoi(pch); v->adjList.push_back(item); } sort(v->adjList.begin(), v->adjList.end()); return v; } }; int main(int argc, char* argv[]) { init_worker(&argc, &argv); WorkerParams param; param.local_root = argv[1]; //an local tmp folder param.input_path = argv[2]; //input path in HDFS param.output_path = argv[3]; //output path in HDFS param.force_write=true; param.native_dispatcher=false; //---------- TriangleWorker worker; CountAgg agg; worker.setAggregator(&agg,0); worker.run(param, 1000000); worker_finalize(); return 0; }