Skip to content
Snippets Groups Projects
Commit b70fc7c5 authored by ajf5's avatar ajf5
Browse files

try again

parents aa1e221f 90259faa
No related branches found
No related tags found
No related merge requests found
No preview for this file type
No preview for this file type
......@@ -765,41 +765,33 @@ void Node::checkFileListConsistency(){
}
for (auto& element: fileList) {
if(element.second.size() < 4){
set<int> currentLocs;
for (auto e: element.second) currentLocs.insert(e);
vector<int> aliveNodes;
for (auto &e : membershipList) {
Member m(get<0>(e.first), get<1>(e.first));
int id = hashingId(m, get<2>(e.first));
if ((currentLocs.count(id) == 0) && (get<2>(e.second) == 0)) {
Member m(get<0>(e.first), get<1>(e.first));
aliveNodes.push_back(id);
}
}
int neededReps = 4 - element.second.size();
auto mapleNodes = randItems(neededReps, aliveNodes);
int index = 0;
for (auto &mn : mapleNodes){
string nodeInfo = hashRing->getValue(mn);
Messages outMsg(DNSGET, nodeInfo + "::" + to_string(mn) + "::" + element.first + "::");
tuple<int, int, int> req = pendingRequests[element.first];
if (index == 0){
pendingRequests[element.first] = make_tuple(get<0>(req), get<1>(req), mn);
get<2>(pendingRequestSent[element.first]) = true;
}
if (index == 1) {
pendingRequests[element.first] = make_tuple(get<0>(req), mn, get<2>(req));
get<1>(pendingRequestSent[element.first]) = true;
}
if (index == 2) {
pendingRequests[element.first] = make_tuple(mn, get<1>(req), get<2>(req));
get<0>(pendingRequestSent[element.first]) = true;
cout << "Need to rereplicate " << element.first << endl;
//First check the closest node, successor and predecessor
int closestNodePostion = hashRing->locateClosestNode(element.first);
int pred = hashRing->getPredecessor(closestNodePostion);
int succ = hashRing->getSuccessor(closestNodePostion);
while (succ == pred || succ == closestNodePostion) succ = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ));
int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ));
vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode};
for(unsigned int i = 0; i < nodesToCheck.size(); i++){
if (!isInVector(element.second, nodesToCheck[i]))
{
string nodeInfo = hashRing->getValue(nodesToCheck[i]);
Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::");
tuple<int, int, int> request = pendingRequests[element.first];
if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){
//cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl;
break;
}
pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]);
pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true);
tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString());
break;
}
tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString());
index++;
}
}
}
}
vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo()
......@@ -843,7 +835,7 @@ void Node::handleMaplejuiceQ(){
void Node::handleTcpMessage()
{
//Before we do anything here, we should have the leader check to see if the file list is consistent or not.
//checkFileListConsistency();
checkFileListConsistency();
queue<string> qCopy(tcpServent->regMessages);
tcpServent->regMessages = queue<string>();
int size = qCopy.size();
......
......@@ -93,7 +93,6 @@ void *runSenderThread(void *node)
nodeOwn->heartbeatToNode();
//5a. check for queue maple/juice messages
if (nodeOwn->isLeader) nodeOwn->checkFileListConsistency();
nodeOwn->handleMaplejuiceQ();
//5b. check for regular TCP messages
nodeOwn->handleTcpMessage();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment