diff --git a/Wc b/Wc new file mode 100755 index 0000000000000000000000000000000000000000..d15d2d85a73f16d87c4d3a21e442e7a02c528382 Binary files /dev/null and b/Wc differ diff --git a/Wr b/Wr new file mode 100755 index 0000000000000000000000000000000000000000..95470cf196f077380be1b7dcc3d1a25291623a6e Binary files /dev/null and b/Wr differ diff --git a/src/Node.cpp b/src/Node.cpp index 011aacd17f78f619882ed35b20ed16b9f1d843b4..09da2a4879492fabc9be6f01435cb96a8712e75c 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -120,6 +120,7 @@ int Node::failureDetection(){ //3. for already failed nodes, check to see if curr_time - time stamp > T_cleanup //4. If yes, remove node from membership list vector<tuple<string,string,string>> removedVec; + vector<string> messagesToSend; for(auto& element: this->membershipList){ tuple<string,string,string> keyTuple = element.first; tuple<int, int, int> valueTuple = element.second; @@ -225,9 +226,22 @@ int Node::failureDetection(){ workerRing->removeNode(deletedNodePostion); workerRing->addNode(nextIp, nextId); string workToDo = "", mapleS = ""; - vector<string> messagesToSend; - auto vecCopy(workerProcessing[get<0>(keyTuple)]); + for (auto &e : mapleSending){ + if (get<0>(e.second).compare(get<0>(keyTuple))) continue; + vector<int> temp = randItems(1, fileList[get<0>(e.first)]); + string tempIp = hashRing->getValue(temp[0]); + auto task = make_tuple(get<0>(e.first), get<1>(e.first)); + mapleSending[e.first] = make_tuple(tempIp, 0); + string processor = ""; + for (auto &worker : workerTasks){ + if (worker.second.count(task) > 0) processor = worker.first; + } + mapleS = tempIp + "::" + processor + "::" + exe + "::" + get<0>(e.first) + "::" + get<1>(e.first); + if (get<1>(e.second) == 0) messagesToSend.push_back(mapleS); + } + if (workerTasks.find(get<0>(keyTuple)) != workerTasks.end()){ + auto vecCopy(workerTasks[get<0>(keyTuple)]); for (auto el : vecCopy) { workerProcessing[nextIp].push_back(make_tuple(get<0>(el), get<1>(el))); workerTasks[nextIp].insert(make_tuple(get<0>(el), get<1>(el))); @@ -238,8 +252,9 @@ int Node::failureDetection(){ } else workToDo += get<0>(el); } - if (isJuicePhase) messagesToSend.push_back("JUICE::" + exe + "::" + sdfsOut + "::" + workToDo); + if (isJuicePhase) messagesToSend.push_back("JUICE::" + nextIp + "::" + exe + "::" + sdfsOut + "::" + workToDo); } + //come back workerProcessing.erase(get<0>(keyTuple)); workerTasks.erase(get<0>(keyTuple)); struct dirent *entry = nullptr; @@ -252,24 +267,6 @@ int Node::failureDetection(){ } } closedir(dp); - for (auto &e : mapleSending){ - if (get<0>(e.second).compare(get<0>(keyTuple))) continue; - if (get<1>(e.second) == 1) continue; - vector<int> temp = randItems(1, fileList[get<0>(e.first)]); - string tempIp = hashRing->getValue(temp[0]); - auto task = make_tuple(get<0>(e.first), get<1>(e.first)); - mapleSending[e.first] = make_tuple(tempIp, 0); - string processor = ""; - for (auto &worker : workerTasks){ - if (worker.second.count(task) > 0) processor = worker.first; - } - mapleS = tempIp + "::" + processor + "::" + exe + "::" + get<0>(e.first) + "::" + get<1>(e.first); - messagesToSend.push_back(mapleS); - } - for (auto mapleMsg : messagesToSend) { - cout << "[FIX] " << mapleMsg << endl; - tcpServent->mapleMessages.push(mapleMsg); - } } } } @@ -282,6 +279,16 @@ int Node::failureDetection(){ } } } + set<string> aliveIps; + for (auto alive : hashRing->nodePositions) aliveIps.insert(hashRing->getValue(alive)); + for (auto pendMsg : messagesToSend){ + string scopy(pendMsg); + vector<string> s = splitString(pendMsg, "::"); + if (aliveIps.count(s[0]) == 0 && s[0].compare("JUICE")) continue; + if (aliveIps.count(s[1]) == 0) continue; + cout << "[FIX] " << scopy << endl; + tcpServent->mapleMessages.push(scopy); + } // O(c*n) operation, but it ensures safety bool leaderRemoved = false; @@ -1155,12 +1162,11 @@ void Node::handleTcpMessage() string execName = EXEC_CMD + inMsg[1]; if (runExecutable(execName, inMsg[3]) < 0) { cout << "[EXEC] ERROR" << endl; break;} string ackStr = nodeInformation.ip + "::" + inMsg[4] + "::" + inMsg[2]; //IP, file, chunk - Messages ackMsg(MAPLEACK, ackStr); tcpServent->sendMessage(leaderIP, TCPPORT, ackMsg.toString()); break; } - //cout << "[CHUNKACK] leader confirming " << inMsg[4] << "::" << inMsg[2] << " was received" << endl; + cout << "[CHUNKACK] leader confirming " << inMsg[4] << "::" << inMsg[2] << " was received" << endl; tuple<string, int> status = mapleSending[make_tuple(inMsg[4], inMsg[2])]; mapleSending[make_tuple(inMsg[4], inMsg[2])] = make_tuple(get<0>(status), 1); break; diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 057e78524685ee3fdc2e5b48953fb242fdc9445b..ba0071b8d2aaba167c6906b350a612c60b5e409b 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -245,8 +245,8 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP vector<string> filesAndSizes = splitString(payload, ","); int returnType = stoi(filesAndSizes[0]), processedCounter = 0; string returnTypeString = "", remoteLocalname = "", execfilename = "", mode = "wb"; - int dirSize = filesAndSizes.size() - 1, fail = 0; - size_t index = 3; + int dirSize = (int)filesAndSizes.size() - 1, fail = 0; + int index = 3; int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE; vector<string> format; char c; @@ -258,7 +258,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP returnTypeString = "JUICEMERGE"; mode = "ab"; index--; - while (index < (filesAndSizes.size() - 2)){ + while (index < (dirSize - 1)){ if (preProcessed.size()) preProcessed += ","; preProcessed += filesAndSizes[index]; index++; diff --git a/src/Threads.cpp b/src/Threads.cpp index 7d2a8dc5f8248cfb7b52fcc34f6fc089545ca24e..450df8d72da8e3e9fa0044e54e0dcfb1d88df111 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -23,18 +23,16 @@ void *runTcpSender(void *tcpSocket) while (1) { while (!tcp->mapleMessages.empty()) { vector<string> msgSplit = splitString(tcp->mapleMessages.front(), "::"); - string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2); - //cout << "[TEST] " << removeSender << endl; - string msgString = ""; - if (removeSender.compare("JUICE") == 0){ + if (msgSplit[0].compare("JUICE") == 0){ + string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2 + msgSplit[1].size() + 2); Messages msg(JUICE, removeSender); - msgString = msg.toString(); + tcp->sendMessage(msgSplit[1], TCPPORT, msg.toString()); } else{ + string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2); Messages msg(CHUNK, removeSender); - msgString = msg.toString(); + //processor, exec, file, start, prefix + tcp->sendMessage(msgSplit[0], TCPPORT, msg.toString()); } - //processor, exec, file, start, prefix - tcp->sendMessage(msgSplit[0], TCPPORT, msgString); tcp->mapleMessages.pop(); } while (!tcp->pendSendMessages.empty()) {