From 17f977ddb237e944b261b382e95bfe968e1da645 Mon Sep 17 00:00:00 2001 From: afederici <ajf5@illinois.edu> Date: Sun, 29 Nov 2020 18:44:41 -0600 Subject: [PATCH] fault taulerance 2 --- Wc | Bin 189120 -> 189120 bytes Wr | Bin 184720 -> 184720 bytes src/Node.cpp | 23 +++++++++++++---------- src/TcpSocket.cpp | 6 +++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Wc b/Wc index cf7fc079211070fa1d779b37f895108514bd1be4..f199fae57be9d45ba1bad4740a9adc9fbe1b03d0 100755 GIT binary patch delta 54 zcmV-60LlNr#S6g23xKo%@OT1NK)3RE0nRo8S3tMIuK~KU1vNP_GB~$^VghJ924!Y5 MVKHR4@16qDLxYSKOaK4? delta 54 zcmV-60LlNr#S6g23xKo%@OT0fG`I420nRo86g0QOuK~KU1vX)3IWxC`VghJ9203PB MV=yzf@16qDLvw=_<NyEw diff --git a/Wr b/Wr index f3746b11f73f4f9382e44b47f7faaa49891ff39a..af7b681799dbda5694777dfcf259713feb989820 100755 GIT binary patch delta 68 zcmV-K0K5N?qYIFu3xKo%NO%HQK(|VG0ZuUjSU|U$sR5;}4q`MhHa9dbZvbCyVQFrc a?)(8Vw|`dxJT?YnF=aVnHn+iz0?$K6+8Cbz delta 68 zcmV-K0K5N?qYIFu3xKo%NO%GiG`C840ZuUj6*RY+sR5;}4mUA3H92N3ZvbCyVQFrc a?)(8Vw|`dxJT?Y1HD)tmW4FPL0?$Jxz8DAq diff --git a/src/Node.cpp b/src/Node.cpp index 0986237..11fb936 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -166,7 +166,7 @@ int Node::failureDetection(){ (get<0>(pendingRequests[sdfsfilename])!=-1)) { // it sent out, and is not finished, cannot help // we lost the data from the client - cout << "[PUT] client itself fails, we cannot help, remove request" << endl; + cout << "[REREPLICATE] client itself fails, we cannot help, remove request" << endl; isBlackout = false; pendingRequests.erase(sdfsfilename); pendingRequestSent.erase(sdfsfilename); @@ -177,7 +177,7 @@ int Node::failureDetection(){ (get<1>(pendingRequests[sdfsfilename])!=-1)) { // the sender fails during 2nd pass // replace the sent - cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; + cout << "[REREPLICATE] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; if (get<2>(pendingRequests[sdfsfilename])!=-1) { tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename])); } else { @@ -190,7 +190,7 @@ int Node::failureDetection(){ (get<2>(pendingRequests[sdfsfilename])!=-1)) { // it sent out, but replicates are failed // restart again - cout << "[PUT/REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; + cout << "[REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; pendingRequests.erase(sdfsfilename); } } @@ -200,7 +200,7 @@ int Node::failureDetection(){ //2) if processing, reassign //2a) if no extra nodes, assign to successor, else add new node to ring //3) if a sender, reassign replica holders as new senders for each thing sent - if (workerRing->size() > 0 && workerRing->getValue(deletedNodePostion).compare("No node found")){ + if (workerRing->size() > 0 && workerRing->getValue(deletedNodePostion).compare("No node found") != 0){ set<int> currentWorkers; for (auto &w: workerRing->nodePositions) currentWorkers.insert(w); int nextId; @@ -215,14 +215,14 @@ int Node::failureDetection(){ if (currentWorkers.count(w) == 0){ nextId = w; nextIp = hashRing->getValue(nextId); + Messages startMsg(PHASESTART, "filling in for failed worker"); + tcpServent->sendMessage(nextIp, TCPPORT, startMsg.toString()); break; } } } workerRing->removeNode(deletedNodePostion); workerRing->addNode(nextIp, nextId); - Messages startMsg(PHASESTART, "filling in for failed worker"); - tcpServent->sendMessage(nextIp, TCPPORT, startMsg.toString()); string workToDo = "", mapleS = ""; vector<string> messagesToSend; auto vecCopy(workerProcessing[get<0>(keyTuple)]); @@ -255,7 +255,10 @@ int Node::failureDetection(){ mapleS = tempIp + "::" + processor + "::" + exe + "::" + get<0>(e.first) + "::" + get<1>(e.first); messagesToSend.push_back(mapleS); } - for (auto mapleMsg : messagesToSend) tcpServent->mapleMessages.push(mapleMsg); + for (auto mapleMsg : messagesToSend) { + cout << "[FIX] " << mapleMsg << endl; + tcpServent->mapleMessages.push(mapleMsg); + } } } } @@ -748,7 +751,7 @@ void Node::updateFileList(string sdfsfilename, int nodePosition) //Can only be called when we are the leader node, since we are going to be calling REREPLICATE here, and checking the global file list. //Called in ProcessRegMessages before we do anything else, since we want to check the global file list consistency before we process the other messages -//In the Queue. +//In the Queue.v void Node::checkFileListConsistency(){ if (membershipList.size() < 4) { cout << "[ERROR] The number of members are too small, we need at least 4" << endl; @@ -772,7 +775,7 @@ void Node::checkFileListConsistency(){ if(get<0>(request) == 0 && get<1>(request) == 0 && get<2>(request) == 0){ pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); - tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + tcpServent->regMessages.push(outMsg.toString()); break; } if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ @@ -781,7 +784,7 @@ void Node::checkFileListConsistency(){ } pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); - tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + tcpServent->regMessages.push(outMsg.toString()); break; } } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 5fc3f3a..dc058af 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -245,7 +245,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP vector<string> filesAndSizes = splitString(payload, ","); int returnType = stoi(filesAndSizes[0]), processedCounter = 0; string returnTypeString = "", remoteLocalname = "", execfilename = "", mode = "wb"; - int dirSize = filesAndSizes.size(), fail = 0; + int dirSize = filesAndSizes.size() - 1, fail = 0; size_t index = 3; int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE; vector<string> format; @@ -280,12 +280,12 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP } cout << "[MERGE] type:" << returnTypeString << ", payload: " << payload << endl; //cout << "[MERGE] type:" << returnTypeString << ", start index: " << to_string(index) << ", correction to extra -> " << extra << endl; - while (index < dirSize - 1){ + while (index < dirSize){ format.clear(); string scopy(filesAndSizes[index]); format = splitString(scopy, "-"); //cut the tmp off filename = (filedest.size()) ? filedest : "tmp-" + returnIP + "-" + format[1]; - cout << "[MERGE] index:" << to_string(index) << " , dest:" << filename << " , size:" << filesAndSizes[index+1] << endl; + //cout << "[MERGE] index:" << to_string(index) << " , dest:" << filename << " , size:" << filesAndSizes[index+1] << endl; numbytes = 0; filesize = stoi(filesAndSizes[index+1]); bytesLeft = filesize; -- GitLab