#define SECONDARY_KEY_MSG #define RADIUS 4 #include "ol/pregel-ol-dev.h" #include "invidx.h" #include "utils/type.h" #include "triple.h" struct GRAPHSEARCHNqvalue { set content; //literal vector > literal; vector > llabel; //in-edges vector nb_id; vector > elabel; }; ibinstream & operator<<(ibinstream &m, const GRAPHSEARCHNqvalue &v) { m << v.content; m << v.literal; m << v.llabel; m << v.elabel; m << v.nb_id; return m; } obinstream & operator>>(obinstream &m, GRAPHSEARCHNqvalue &v) { m >> v.content; m >> v.literal; m >> v.llabel; m >> v.elabel; m >> v.nb_id; return m; } //step 2: define query type in this case is a sentence with many keywords //we shall first find the keywords in the index and activate the corresponding vertexes; //step 3: define query specific type struct GRAPHSEARCHQvalue { char vertex_bitmap; vector ebitmap; vector lbitmap; char final_bitmap;// bit-or of vertex_bitmap & lbitmap, not including ebitmap vector hop; vector source;//from whom the keyword is matched void init(int qnum, int deg, int lnum) { vertex_bitmap = 0; ebitmap.resize(deg, 0); lbitmap.resize(lnum, 0); final_bitmap = 0; hop.resize(qnum, INT_MAX); source.resize(qnum, -1); } }; //step 4: define msg type [keyword_pos, (hop, src)] typedef Msg2ndKey gqmsg; //step 5: define vertex type class GRAPHSEARCHVertex: public VertexOL > { public: int num_of_keywords(){ return get_query().size(); } virtual GRAPHSEARCHQvalue init_value(vector& query) { GRAPHSEARCHQvalue qval; GRAPHSEARCHNqvalue & nqval = nqvalue(); vector& nb_id = nqval.nb_id; int num = query.size(); int deg = nb_id.size(); int lnum = nqval.literal.size(); qval.init(num, deg, lnum); if(superstep() > 1) return qval; set& content = nqval.content; vector >& literal = nqval.literal; vector >& llabel = nqval.llabel; vector >& elabel = nqval.elabel; for(int i = 0; i < num; i++){ set::iterator it = content.find(query[i]); bool is_found = (it != content.end());//query key in this vertex's content if(is_found) { setbit(qval.vertex_bitmap, i); qval.source[i] = id; qval.hop[i] = 0; } //------ for(int j=0; j& nb_id = nqval.nb_id; vector >& elabel = nqval.elabel; int num = num_of_keywords(); GRAPHSEARCHQvalue& qval = qvalue(); gqmsg msg; for(int k=0; k { public: char buf[200]; GRAPHSEARCHWorkerOL() : WorkerOL_auto(false, false, true) { } vector parser; // TODO for every local vertex on a worker? virtual GRAPHSEARCHVertex* toVertex(char *line) { GRAPHSEARCHVertex* v=new GRAPHSEARCHVertex; statement o; const char* l=line; o.skipWhiteSpace(l); v->id = o.readNum(l); o.skipWhiteSpace(l); string cont = o.readResource(l); string context = o.filterURI(cont); parser.clear(); filter(context.c_str(), parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().content.insert(parser[i]); o.skipWhiteSpace(l); o.skipWhiteSpace(l); string subj = o.readResource(l); string subject = o.filter_subject(subj); parser.clear(); filter(subject.c_str(), parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().content.insert(parser[i]); o.skipWhiteSpace(l); while(!o.eol(l)) { char c = o.nextChar(l); if(isdigit(c)) { int nb = o.readNum(l); v->nqvalue().nb_id.push_back(nb); o.skipWhiteSpace(l); //---- string pred = o.readResource(l); string predicate = o.filterURI(pred); set set_; v->nqvalue().elabel.push_back(set_); parser.clear(); filter(predicate.c_str(), parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().elabel.back().insert(parser[i]); o.skipWhiteSpace(l); } else { string obj = o.readNode(l); string object = o.filter_object(obj); set set_; v->nqvalue().literal.push_back(set_); parser.clear(); filter(object.c_str(), parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().literal.back().insert(parser[i]); o.skipWhiteSpace(l); //---- string pred = o.readResource(l); string predicate =o.filterURI(pred); v->nqvalue().llabel.push_back(set_); parser.clear(); filter(predicate.c_str(), parser); for(int i = 0; i < parser.size(); i++) v->nqvalue().llabel.back().insert(parser[i]); o.skipWhiteSpace(l); } } return v; } virtual vector toQuery(char* line) { vector result; filter(line, result); return result; } // TODO called when there comes a query? virtual void init(VertexContainer &vc) { vector query = get_query(); for (int i = 0; i < query.size(); i++) { vector & vposlist = idx().vpos_list(query[i]); for (int j = 0; j < vposlist.size(); j++) { activate(vposlist[j]); } } } // TODO in this case, each worker has an index? virtual void load2Idx(GRAPHSEARCHVertex* v, int position, InvIdx& idx) { //this function will be called in 'load_idx_from_vertexes()'{ set & content = v->nqvalue().content; for(set::iterator i = content.begin(); i != content.end(); ++ i){ idx.insert(*i, position); } for(int i=0; inqvalue().literal.size(); i++) { set & set_ = v->nqvalue().literal[i]; for(set::iterator i = set_.begin(); i != set_.end(); ++ i){ idx.insert(*i, position); } } for(int i=0; inqvalue().llabel.size(); i++) { set & set_ = v->nqvalue().llabel[i]; for(set::iterator i = set_.begin(); i != set_.end(); ++ i){ idx.insert(*i, position); } } for(int i=0; inqvalue().elabel.size(); i++) { set & set_ = v->nqvalue().elabel[i]; for(set::iterator i = set_.begin(); i != set_.end(); ++ i){ idx.insert(*i, position); } } } virtual void idx_init() { load_idx_from_vertexes(); } virtual void dump(GRAPHSEARCHVertex* v, BufferedWriter& writer) { int qnum = get_query().size(); if(isAllOne(v->qvalue().final_bitmap, qnum)){ sprintf(buf,"root = %d", v->id); writer.write(buf); for(int i=0; iqvalue().hop[i], v->qvalue().source[i]); writer.write(buf); } writer.write("\n"); } } }; int main(int argc, char* argv[]) { GRAPHSEARCHWorkerOL worker; if(argc != 3) { if(_my_rank==MASTER_RANK) cout<<"Usage: ./run inputpath outputpath"; return 0; } WorkerParams param; param.input_path=argv[1]; param.output_path=argv[2]; param.force_write=true; param.native_dispatcher=false; //------ //SLCACombiner combiner; //worker.setCombiner(&combiner); worker.run(param); return 0; }