#include "./system/subg-dev.h" #include using namespace std; typedef map> Graph; class CountAgg:public Aggregator //Context = int { private: int count; mutex mtx; public: virtual ~CountAgg(){} virtual void init(){ count = 0; } virtual void stepPartial(int & v) { if(v > count){ if( _my_rank == MASTER_RANK){ lock_guard lck(mtx); //in master, compute thread may change the count, lock it for thread safety count = v; } else count = v; } } virtual void stepFinal(int* part) { if(*part > count){ if(_my_rank == MASTER_RANK){ lock_guard lck(mtx); //in master, context_sync thread may change the count, lock it for thread safety count = *part; } else count = *part; } } virtual int* finishPartial(){ return &count; } virtual int* finishFinal(){ return &count; } }; class CliqueTask:public Task{ //Context = int public: struct vtxDegree //VertexID with its degree { VertexID vid; int degree; inline bool operator<(const vtxDegree& rhs) const { return degree > rhs.degree; } }; /* * Output: ListR * sort the vertexes based on their degrees in descending order */ void getListR( Graph g, vector & ListR){ Graph::iterator gIter; vector vtxVec; for( gIter = g.begin(); gIter != g.end(); gIter++ ){ vtxDegree item; item.vid = gIter->first; item.degree = gIter->second.size(); vtxVec.push_back(item); } sort(vtxVec.begin(), vtxVec.end()); for(int i = 0 ; i < vtxVec.size(); i++){ ListR.push_back(vtxVec[i].vid); } } /* * Output color, update the ListR based on color * */ //refer: An efficient branch-and-bound algorithm for finding a maximum clique //Link: http://dl.acm.org/citation.cfm?id=1783736 void colorSort(Graph & g, vector & ListR, map & color){ vector> CG; for(int i = 0 ; i < ListR.size(); i++){ VertexID p = ListR[i]; bool match = false; //is matched in current CGs or not for(int j = 0 ; j < CG.size(); j++){ set & C = CG[j]; set interVtx; set_intersection(C.begin(), C.end(), g[p].begin(), g[p].end(), inserter(interVtx, interVtx.begin())); if(interVtx.empty()){ //if the intersection of C & p.adjList is empty, add the p into C C.insert(p); match = true; break; } } if(!match){ //add a new C into CG if p is mismatched with current all Colors set newC; newC.insert(p); CG.push_back(newC); } } ListR.clear(); // LIstR is sorted by increasing order of color value set::iterator cIter; for(int i = 0 ; i < CG.size(); i++){ for(cIter = CG[i].begin(); cIter != CG[i].end(); cIter++){ ListR.push_back(*cIter); color[*cIter] = i + 1; } } } /* * Input : * Graph g: the subgraph calculated at current stage * ListR: the list of vertexes in increasing order of color * Q: the candidates of maxclique calculated before current stage * maxSize: the size of MaxClique mined out currently */ void maxClique(Graph & g, vector & ListR, map & color, vector & Q, int & maxSize){ while(!ListR.empty()){ VertexID p = ListR.back(); ListR.pop_back(); if( Q.size() + color[p] > maxSize){ Q.push_back(p); set & adjList = g[p]; vector tmp = ListR; sort(tmp.begin(), tmp.end()); //the ListR should be sorted by VertexID before intersecting, requested by std::set_intersection set interVtx; set_intersection(tmp.begin(), tmp.end(), adjList.begin(), adjList.end(), inserter(interVtx, interVtx.begin())); if( !interVtx.empty()){ map> subg; set::iterator gIter; for(gIter = interVtx.begin(); gIter != interVtx.end(); gIter++){ set & adjList = g[*gIter]; set & subAdj = subg[*gIter]; set_intersection(interVtx.begin(), interVtx.end(), adjList.begin(), adjList.end(), inserter(subAdj, subAdj.begin())); } vector ListR; map color; getListR(subg, ListR); colorSort(subg, ListR, color); maxClique(subg, ListR, color, Q, maxSize); } else if(Q.size() > maxSize){ maxSize = Q.size(); } Q.pop_back(); }else{ return; } } } virtual bool compute(SubgraphT & g, ContextType & context, vector & frontier){ int maxSize = *(int*)getAgg(); vector Q; Q.push_back(g.getNodes()[0].id); int size = frontier.size(); if(size != 0){ set subg; for(int i = 0 ; i < size; i++) subg.insert(frontier[i]->id); map> tempG; //get the subgraph and their adjLists based on frontier, which is the adjList of seed point for(int i = 0 ; i < size; i++){ AdjVtxList & adjList = frontier[i]->getAdjList(); vector tmpAdjList; for(int k = 0 ;k < adjList.size(); k++) tmpAdjList.push_back(adjList[k].id); set & subAdj = tempG[frontier[i]->id]; set_intersection(tmpAdjList.begin(), tmpAdjList.end(), subg.begin(), subg.end(), inserter(subAdj, subAdj.begin())); } vector ListR; map color; getListR(tempG, ListR); colorSort(tempG, ListR, color); maxClique(tempG, ListR, color, Q, maxSize); }else if(Q.size() > maxSize){ maxSize = Q.size(); } //get the maxClique in current task context = maxSize; return false; } }; class CliqueWorker:public Worker { public: CliqueWorker(){ SLEEP_TIME = 5; } virtual void seedTask_gene(VertexT * v){ AdjVtxList & adjList= v->getAdjList(); AdjVtxList candidates; //the vertexes to be pulled in the next round VertexID vid = v->id; AdjVtxIter vIter = adjList.begin(); while((vIter < adjList.end()) && (vIter->id <= vid)) vIter++; candidates.insert(candidates.end(), vIter, adjList.end()); int maxSize = *(int*)getAgg(); if (candidates.size() >= maxSize) { CliqueTask * task = new CliqueTask; task->pull(candidates); NodeT node; v->setNode(node); task->subG.addNode(node); task->context = 0; addTask(task); } } virtual void print_result(){ int* agg=(int*)getAgg(); if (_my_rank == 0) { cout << "The size of max clique is " << *agg << endl; } } virtual VertexT* toVertex(char* line) { VertexT* v = new VertexT; char * pch; pch=strtok(line, " "); v->id=atoi(pch); strtok(NULL,"\t"); while((pch=strtok(NULL, " ")) != NULL) { AdjVertex item; item.id = atoi(pch); pch=strtok(NULL, " "); item.wid = atoi(pch); v->adjList.push_back(item); } sort(v->adjList.begin(), v->adjList.end()); return v; } }; int main(int argc, char* argv[]) { init_worker(&argc, &argv); WorkerParams param; param.local_root = argv[1]; //an local tmp folder param.input_path = argv[2]; //input path in HDFS param.output_path = argv[3]; //output path in HDFS param.force_write=true; param.native_dispatcher=false; //------ CliqueWorker worker; CountAgg agg; worker.setAggregator(&agg,0); worker.run(param, 1000000); //cachesize worker_finalize(); return 0; }