diff --git a/Wc b/Wc index 7d0a7a47ad0b836354ff01771b0109aef81d136b..9362530b9059e1b7be4f4bb6f021ab87049d23cc 100755 Binary files a/Wc and b/Wc differ diff --git a/Wr b/Wr index 2d4c046a3d59a7139bf105f8ab8efddbb91ceda1..bfec6873eedecb0c4d66fc802e363bc41a4a669d 100755 Binary files a/Wr and b/Wr differ diff --git a/src/Node.cpp b/src/Node.cpp index cbd4f097742740ef35a148a7634f7ee5bad73387..362a895b57500f64b674c86b03b67c153a0574fc 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -765,41 +765,33 @@ void Node::checkFileListConsistency(){ } for (auto& element: fileList) { if(element.second.size() < 4){ - set<int> currentLocs; - for (auto e: element.second) currentLocs.insert(e); - vector<int> aliveNodes; - for (auto &e : membershipList) { - Member m(get<0>(e.first), get<1>(e.first)); - int id = hashingId(m, get<2>(e.first)); - if ((currentLocs.count(id) == 0) && (get<2>(e.second) == 0)) { - Member m(get<0>(e.first), get<1>(e.first)); - aliveNodes.push_back(id); - } - } - int neededReps = 4 - element.second.size(); - auto mapleNodes = randItems(neededReps, aliveNodes); - int index = 0; - for (auto &mn : mapleNodes){ - string nodeInfo = hashRing->getValue(mn); - Messages outMsg(DNSGET, nodeInfo + "::" + to_string(mn) + "::" + element.first + "::"); - tuple<int, int, int> req = pendingRequests[element.first]; - if (index == 0){ - pendingRequests[element.first] = make_tuple(get<0>(req), get<1>(req), mn); - get<2>(pendingRequestSent[element.first]) = true; - } - if (index == 1) { - pendingRequests[element.first] = make_tuple(get<0>(req), mn, get<2>(req)); - get<1>(pendingRequestSent[element.first]) = true; - } - if (index == 2) { - pendingRequests[element.first] = make_tuple(mn, get<1>(req), get<2>(req)); - get<0>(pendingRequestSent[element.first]) = true; + cout << "Need to rereplicate " << element.first << endl; + //First check the closest node, successor and predecessor + int closestNodePostion = hashRing->locateClosestNode(element.first); + int pred = hashRing->getPredecessor(closestNodePostion); + int succ = hashRing->getSuccessor(closestNodePostion); + while (succ == pred || succ == closestNodePostion) succ = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ)); + int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ)); + vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode}; + for(unsigned int i = 0; i < nodesToCheck.size(); i++){ + if (!isInVector(element.second, nodesToCheck[i])) + { + string nodeInfo = hashRing->getValue(nodesToCheck[i]); + Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); + tuple<int, int, int> request = pendingRequests[element.first]; + if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ + //cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; + break; + } + pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); + pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + break; } - tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); - index++; } } } + } vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() @@ -843,7 +835,7 @@ void Node::handleMaplejuiceQ(){ void Node::handleTcpMessage() { //Before we do anything here, we should have the leader check to see if the file list is consistent or not. - //checkFileListConsistency(); + checkFileListConsistency(); queue<string> qCopy(tcpServent->regMessages); tcpServent->regMessages = queue<string>(); int size = qCopy.size(); diff --git a/src/Threads.cpp b/src/Threads.cpp index a0d3518d671affb2c150bc19c29e9c37176c5871..55a0f69ba0b343f051dd0c8e506e7eeb4ed26281 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -93,7 +93,6 @@ void *runSenderThread(void *node) nodeOwn->heartbeatToNode(); //5a. check for queue maple/juice messages - if (nodeOwn->isLeader) nodeOwn->checkFileListConsistency(); nodeOwn->handleMaplejuiceQ(); //5b. check for regular TCP messages nodeOwn->handleTcpMessage();