#define USE_ATTRIBUTE #include "./system/subg-dev.h" using namespace std; struct gMatchContext{ public: unsigned long long count; int round; gMatchContext(){} friend ibinstream& operator<<(ibinstream& m, const gMatchContext& v) { m << v.count; m << v.round; return m; } friend obinstream& operator>>(obinstream& m, gMatchContext& v) { m >> v.count; m >> v.round; return m; } friend ifbinstream& operator<<(ifbinstream& m, const gMatchContext& v) { m << v.count; m << v.round; return m; } friend ofbinstream& operator>>(ofbinstream& m, gMatchContext & v) { m >> v.count; m >> v.round; return m; } }; class CountAgg:public Aggregator //Context = unsigned long long { private: unsigned long long count; public: virtual ~CountAgg(){} virtual void init(){ count = 0; } virtual void stepPartial(gMatchContext & v) { count += v.count; } virtual void stepFinal(unsigned long long * part) { count += *part; } virtual unsigned long long* finishPartial(){ return &count; } virtual unsigned long long* finishFinal(){ return &count; } }; class gMatchTask:public Task{ public: bool hasLabel(NodeT & node, char label){ AdjNodeList & adjList = node.getAdjList(); for(int i= 0 ; i < adjList.size(); i++){ if(adjList[i].attr == label){ return true; } } return false; } void print_Q(vector & Q){ for(int i = 0; i < Q.size(); i++){ cout << Q[i] << " "; } cout << endl; } void backtrack(int level, SubgraphT & g, vector & Q, vector & nodes, unsigned long long & count){ level++; if(level == 1){ //match C for(int i = 0 ; i < nodes.size(); i++){ Q.push_back(nodes[i]); NodeT * c = g.getNode(nodes[i]); AdjNodeList & adjList = c->getAdjList(); vector b_vec; for(int j = 0 ; j < adjList.size(); j++){ if(adjList[j].attr == 'b') b_vec.push_back(adjList[j].id); } if(b_vec.size() >= 2){ for(int j = 0; j < b_vec.size(); j++){ VertexID n = b_vec[j]; NodeT * b = g.getNode(n); if(hasLabel(*b,'a')){ //match in_b Q.push_back(n); vector out_b = b_vec; out_b.erase(out_b.begin()+j); backtrack(level, g, Q, out_b, count); Q.pop_back(); } } } Q.pop_back(); } }else if(level == 2){ //match out_b for(int i = 0 ; i < nodes.size(); i++){ Q.push_back(nodes[i]); NodeT * out_b = g.getNode(nodes[i]); AdjNodeList & adj = out_b->getAdjList(); for(int j= 0 ; j < adj.size(); j++){ if(adj[j].attr == 'd'){ Q.push_back(adj[j].id); // print_Q(Q); count++; Q.pop_back(); } } Q.pop_back(); } } } virtual bool compute(SubgraphT & g, ContextType & context, vector & frontier){ int & round = context.round; round++; if(round == 1){ hash_map label_b; vector label_c; //get the b,c vertexes from a.adjList for(int i = 0 ; i < frontier.size(); i++){ if(frontier[i]->attr == 'b') label_b[frontier[i]->id] = frontier[i]; else if(frontier[i]->attr == 'c') label_c.push_back(frontier[i]); } set bcheck; for(int k = 0 ; k < label_c.size(); k++){ VertexT * node_c = label_c[k]; AdjVtxList & adjList = node_c->getAdjList(); AdjVtxList in_bset; //b vertexes already exist in subg AdjVtxList out_bset; //b vertexes need to be pulled in next round for(int i = 0 ; i < adjList.size(); i++){ AdjVertex & node = adjList[i]; if(node.attr == 'b'){ if(label_b.find(node.id) != label_b.end()){ //find node b in label_b in_bset.push_back(node); }else{ out_bset.push_back(node); } } } bool match_c = false; //in following two cases: //c is matched //case 1: //has only one in_b in a.adjList and at least one out_b to be pulled if(in_bset.size() == 1 && out_bset.size() > 0){ match_c = true; for(int i = 0 ; i < out_bset.size(); i++){ bcheck.insert(out_bset[i]); } } //case 2: //has at least two in_b in a.adjList, all of them are the candidates of out_b, pull them all else if(in_bset.size() >= 2){ match_c = true; for(int i = 0 ; i < in_bset.size(); i++){ bcheck.insert(in_bset[i]); } for(int i = 0 ; i < out_bset.size(); i++){ bcheck.insert(out_bset[i]); } } if(match_c){ NodeT c(node_c->id, node_c->attr); NodeT & a = g.getNodes()[0]; g.addEdge(a,c); for(int i = 0 ; i < in_bset.size(); i++){ AdjVertex & v = in_bset[i]; if(g.hasNode(v.id)){ NodeT * b = g.getNode(v.id); g.addEdge(*b,c); }else{ NodeT & a = g.getNodes()[0]; NodeT b(v.id, v.attr); g.addEdge(b,c); g.addEdge(a,b); g.addNode(b); } } g.addNode(c); } } //pull bcheck set::iterator sIter; for(sIter = bcheck.begin(); sIter != bcheck.end(); sIter++){ pull(*sIter); } return true; }else if(round == 2){ //get all c vertexes in subg vector & tmp = g.getNodes(); set c_set; for(int i = 0 ; i < tmp.size(); i++) if(tmp[i].attr == 'c') c_set.insert(tmp[i].id); for(int i = 0 ; i < frontier.size(); i++){ VertexT * b_p = frontier[i]; AdjVtxList & adjList = b_p->getAdjList(); set c_nb; AdjVtxList d_vec; for(int j = 0 ; j < adjList.size(); j++){ AdjVertex & n = adjList[j]; if(n.attr == 'c'){ c_nb.insert(n.id); }else if(n.attr == 'd'){ d_vec.push_back(n); } } if(!d_vec.empty()){ if(g.hasNode(b_p->id)){ for(int j = 0; j < d_vec.size(); j++){ AdjVertex & v = d_vec[j]; NodeT * b = g.getNode(b_p->id); if(g.hasNode(v.id)){ NodeT * d = g.getNode(v.id); g.addEdge(*b,*d); }else{ NodeT d(v.id, v.attr); g.addEdge(*b,d); g.addNode(d); } } }else{ set interC; set::iterator sIter; set_intersection(c_nb.begin(), c_nb.end(), c_set.begin(),c_set.end(), inserter(interC, interC.begin())); NodeT b(b_p->id, b_p->attr); for(sIter = interC.begin(); sIter != interC.end(); sIter++){ NodeT * c = g.getNode(*sIter); g.addEdge(b,*c); } for(int j = 0; j < d_vec.size(); j++){ AdjVertex & v = d_vec[j]; if(g.hasNode(v.id)){ NodeT * d = g.getNode(v.id); g.addEdge(b,*d); }else{ NodeT d(v.id,v.attr); g.addEdge(b,d); g.addNode(d); } } g.addNode(b); } } } //count subgraph by backtracking vector & nodes = g.getNodes(); VertexID a = nodes[0].id; vector c_nodes; for(int i = 1; i < nodes.size(); i++){ if(nodes[i].attr == 'c') c_nodes.push_back(nodes[i].id); } vector Q; Q.push_back(a); unsigned long long & count = context.count; backtrack(0, g, Q, c_nodes, count); return false; } } }; class gMatchWorker:public Worker { public: virtual void seedTask_gene(VertexT * v){ if(v->attr == 'a'){ AdjVtxList & adjList= v->getAdjList(); AdjVtxList label_b; AdjVtxList label_c; for(int i = 0 ; i < adjList.size(); i++){ if(adjList[i].attr == 'b') label_b.push_back(adjList[i]); else if(adjList[i].attr == 'c') label_c.push_back(adjList[i]); } if(!label_b.empty() && !label_c.empty()){ gMatchTask * task = new gMatchTask; task->pull(label_b); task->pull(label_c); NodeT node; v->setNode(node); task->subG.addNode(node); task->context.count = 0; task->context.round = 0; addTask(task); } } } virtual void print_result(){ unsigned long long* agg=(unsigned long long*)getAgg(); if (_my_rank == 0) { cout << "The size of matched subgraph is " << *agg << endl; } } virtual VertexT* toVertex(char* line) { VertexT* v = new VertexT; char * pch; pch=strtok(line, " "); v->id=atoi(pch); pch=strtok(NULL, " "); v->attr= *pch; strtok(NULL,"\t"); while((pch=strtok(NULL, " ")) != NULL) { AdjVertex item; item.id = atoi(pch); pch=strtok(NULL, " "); item.attr= *pch; pch=strtok(NULL, " "); item.wid = atoi(pch); v->adjList.push_back(item); } return v; } }; int main(int argc, char* argv[]) { init_worker(&argc, &argv); WorkerParams param; param.local_root = argv[1]; //an local tmp folder param.input_path = argv[2]; //input path in HDFS param.output_path = argv[3]; //output path in HDFS param.force_write=true; param.native_dispatcher=false; //---------- gMatchWorker worker; CountAgg agg; worker.setAggregator(&agg,0); worker.run(param, 1000000); //cachesize worker_finalize(); return 0; }