diff --git a/.DS_Store b/.DS_Store index 666eed3262c1264d8536aef30d046f36b010c374..e8e3af3e2bcffb0c60e392afbba877937c42ff68 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/README.md b/README.md index b3a44269a17eae688d6c4d72ca7d08953128fdf7..fa78587fb496931c65023f9e428f035fdfe2a494 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ $ [leader] print the leader/masters IP ``` $ join //at each node $ put local sdfs //every file you want to process in maple, names as follows "dir_name"-"file_name" - $ maple exe workers dir_name output_dir_name + $ maple exe workers output_dir_name src_dir_name $ juice exe workers output_dir_name result_file_name delete_files={0,1} use_range={0,1} ``` diff --git a/Wc b/Wc index 647b6bbefe11636dc2ce2c9182d7985532920625..007bd1685e07db3dce290a4940625a172683c726 100755 Binary files a/Wc and b/Wc differ diff --git a/Wr b/Wr index a455eb1735fdf9d0b701d09f3860ed50e925db06..389caffb443d91d1fee1729caeee5c262245c488 100755 Binary files a/Wr and b/Wr differ diff --git a/inc/Node.h b/inc/Node.h index 7a30e433844cf7317487598b5b2c0b762a5ac3d2..546a1ca1a59bd3a2cf4088ba5460199d6cd7ccc3 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -34,7 +34,7 @@ using namespace std; #define INTRODUCER "fa20-cs425-g02-01.cs.illinois.edu" // VM1 -#define PORT "6000" +#define PORT "2000" #define LOGGING_FILE_NAME "logs.txt" #define EXEC_CMD "./" //python for python script, ./ for an executable @@ -100,7 +100,7 @@ public: map<string, tuple<long int, int>> fileSizes; //used so master can partition in the map phase tuple is (bytes, lines) HashRing *workerRing; //hashring of worker nodes maple/juice - map<string, vector<tuple<string, string>>> mapleSending; //originIP -> (file, chunk_start); + map<tuple<string,string>, tuple<string,int>> mapleSending; //(file,chunk) -> (sender, isSent) set<string> mapleKeys; //keys produced in maple phase map<string, int> filesAtWorker; queue<string> maplejuiceQ; diff --git a/inc/TcpSocket.h b/inc/TcpSocket.h index 4f305dbef19e458a31b7b9a1a84f6c6962306b25..34603c3e8ad3026fa11ae7a0bb45422f05f6f8a6 100644 --- a/inc/TcpSocket.h +++ b/inc/TcpSocket.h @@ -41,7 +41,7 @@ using std::get; #define DEFAULT_TCP_BLKSIZE (128 * 1024) #define BACKLOG 10 -#define TCPPORT "4950" +#define TCPPORT "3000" class TcpSocket { public: //tcp server directly handles PUTs. If put received, request from diff --git a/src/Node.cpp b/src/Node.cpp index d001d85598f8a44e9b0e24dcb96ff17709e38cf5..485174b29a32d713300b461a8c8eeebe1f8ebd20 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -200,66 +200,62 @@ int Node::failureDetection(){ //2) if processing, reassign //2a) if no extra nodes, assign to successor, else add new node to ring //3) if a sender, reassign replica holders as new senders for each thing sent - if (workerRing->size() > 0){ - vector<tuple<string,string,string>> aliveNodes; - for (auto &e : membershipList) aliveNodes.push_back(e.first); - vector<tuple<string,string,string>> mapleNodes; + if (workerRing->size() > 0 && workerRing->getValue(deletedNodePostion).compare("No node found")){ + set<int> currentWorkers; + for (auto &w: workerRing->nodePositions) currentWorkers.insert(w); int nextId; - size_t newSize = workerRing->nodePositions.size()-1; - if (newSize <= hashRing->nodePositions.size()){ + string nextIp = ""; + int newSize = workerRing->nodePositions.size()-1; + int available = hashRing->nodePositions.size()-1; //cant include master node + if (newSize >= available){ nextId = workerRing->getSuccessor(deletedNodePostion); + nextIp = workerRing->getValue(nextId); } else { - while (1){ - mapleNodes.clear(); - mapleNodes = randItems(1, aliveNodes); - Member m(get<0>(mapleNodes[0]), get<1>(mapleNodes[0])); - nextId = hashingId(m, get<2>(mapleNodes[0])); - if (workerRing->getValue(nextId).compare("No node found") == 0) break; + for (auto &w : hashRing->nodePositions){ + if (currentWorkers.count(w) == 0){ + nextId = w; + nextIp = hashRing->getValue(nextId); + break; + } } - Messages startMsg(PHASESTART, "filling in for failed worker"); - tcpServent->sendMessage(get<0>(mapleNodes[0]), TCPPORT, startMsg.toString()); - workerRing->addNode(get<0>(mapleNodes[0]), nextId); } workerRing->removeNode(deletedNodePostion); - string newNode = workerRing->getValue(nextId); - //if deleted from workerTasks its been fully processed and doesnt need to be re-scheduled + workerRing->addNode(nextIp, nextId); + Messages startMsg(PHASESTART, "filling in for failed worker"); + tcpServent->sendMessage(nextIp, TCPPORT, startMsg.toString()); + string workToDo = "", mapleS = ""; + vector<string> messagesToSend; auto vecCopy(workerProcessing[get<0>(keyTuple)]); if (workerTasks.find(get<0>(keyTuple)) != workerTasks.end()){ - string workToDo = ""; for (auto el : vecCopy) { - workToDo += get<0>(el); - workerProcessing[newNode].push_back(make_tuple(get<0>(el), get<1>(el))); - workerTasks[newNode].insert(make_tuple(get<0>(el), get<1>(el))); - } - struct dirent *entry = nullptr; - DIR *dp = nullptr; - string match = "tmp-" + get<0>(keyTuple) + "-"; - int matchLen = match.size(); - if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} - while ((entry = readdir(dp))){ - if (strncmp(entry->d_name, match.c_str(), matchLen) == 0) { remove(entry->d_name); } - } - if (isJuicePhase){ - Messages outMsg(JUICE, exe + "::" + sdfsOut + "::" + workToDo); - tcpServent->sendMessage(newNode, TCPPORT, outMsg.toString()); + workerProcessing[nextIp].push_back(make_tuple(get<0>(el), get<1>(el))); + workerTasks[nextIp].insert(make_tuple(get<0>(el), get<1>(el))); + if (!isJuicePhase){ + string sender = get<0>(mapleSending[make_tuple(get<0>(el), get<1>(el))]); + mapleS = sender + "::" + nextIp + "::" + exe + "::" + get<0>(el) + "::" + get<1>(el); + messagesToSend.push_back(mapleS); + } + else workToDo += get<0>(el); } + if (isJuicePhase) messagesToSend.push_back("JUICE::" + exe + "::" + sdfsOut + "::" + workToDo); } - //come back workerProcessing.erase(get<0>(keyTuple)); workerTasks.erase(get<0>(keyTuple)); - - for (auto &e : mapleSending[get<0>(keyTuple)]){ - vector<int> temp = randItems(1, fileList[get<0>(e)]); - auto task = make_tuple(get<0>(e), get<1>(e)); - mapleSending[hashRing->getValue(temp[0])].push_back(task); + for (auto &e : mapleSending){ + if (get<0>(e.second).compare(get<0>(keyTuple))) continue; + if (get<1>(e.second) == 1) continue; + vector<int> temp = randItems(1, fileList[get<0>(e.first)]); + string tempIp = hashRing->getValue(temp[0]); + auto task = make_tuple(get<0>(e.first), get<1>(e.first)); + mapleSending[e.first] = make_tuple(tempIp, 0); string processor = ""; for (auto &worker : workerTasks){ if (worker.second.count(task) > 0) processor = worker.first; } - //sender + "::" + processor + "::" + exe + "::" + s; - string mapleS = hashRing->getValue(temp[0]) + "::" + processor + "::" + exe + "::" + get<0>(e)+ "::" + get<1>(e); - tcpServent->mapleMessages.push(mapleS); + mapleS = tempIp + "::" + processor + "::" + exe + "::" + get<0>(e.first) + "::" + get<1>(e.first); + messagesToSend.push_back(mapleS); } + for (auto mapleMsg : messagesToSend) tcpServent->mapleMessages.push(mapleMsg); } } } @@ -1036,7 +1032,6 @@ void Node::handleTcpMessage() cout << "[MAPLE] Leader starting new Maple phase" << endl; if (inMsg.size() < 4) break; mapleKeys.clear(); - isJuicePhase = false; string exe = inMsg[0], num_maples = inMsg[1], sdfs_dir = inMsg[3] + "-", s = ""; sdfsPre = inMsg[2] + "-"; int workers = stoi(num_maples), ringSize = hashRing->nodePositions.size(); @@ -1086,7 +1081,7 @@ void Node::handleTcpMessage() workerProcessing[processor].push_back(make_tuple(file, to_string(start))); workerTasks[processor].insert(make_tuple(file, to_string(start))); //cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl; - mapleSending[sender].push_back(make_tuple(file, to_string(start))); + mapleSending[make_tuple(file, to_string(start))] = make_tuple(sender, 0); string maplemsg = sender + "::" + processor + "::" + exe + "::" + s; //sender, processor, exec, file, start messagesToSend.push_back(maplemsg); @@ -1146,17 +1141,8 @@ void Node::handleTcpMessage() break; } //cout << "[CHUNKACK] leader confirming " << inMsg[4] << "::" << inMsg[2] << " was received" << endl; - vector<tuple<string,string>> temp; - for (auto &e : mapleSending[inMsg[0]]){ - if (get<0>(e).compare(inMsg[4]) == 0){ - if (get<1>(e).compare(inMsg[2]) == 0){ - continue; - } - } - temp.push_back(e); - } - if (temp.size()) mapleSending[inMsg[0]] = temp; - else mapleSending.erase(inMsg[0]); + tuple<string, int> status = mapleSending[make_tuple(inMsg[4], inMsg[2])]; + mapleSending[make_tuple(inMsg[4], inMsg[2])] = make_tuple(get<0>(status), 1); break; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 6ca3bee6fe4e6ccd90de8753687f7569dad3e57e..8edb955746f2791841b7d3d45d87cca9ba2d0938 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -125,7 +125,8 @@ void TcpSocket::mergeFiles(string ip, string port, string handler, string filede if (!toSend.size()) return; vector<string> toProcess = splitString(toSend, ","); vector<string> toStart = splitString(starts, ","); - int dirSize = toProcess.size(), mode = stoi(handler); + int dirSize = toProcess.size(); + int mode = stoi(handler); string payload = handler + "," + filedest + "," + header + "," + toSend; payload = to_string(payload.size()) + "," + payload; //cout << "[PUT] payload: " << payload << ", range: " << starts << " sending to " << ip << endl; diff --git a/src/Threads.cpp b/src/Threads.cpp index 55a0f69ba0b343f051dd0c8e506e7eeb4ed26281..7d2a8dc5f8248cfb7b52fcc34f6fc089545ca24e 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -25,9 +25,16 @@ void *runTcpSender(void *tcpSocket) vector<string> msgSplit = splitString(tcp->mapleMessages.front(), "::"); string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2); //cout << "[TEST] " << removeSender << endl; - Messages msg(CHUNK, removeSender); + string msgString = ""; + if (removeSender.compare("JUICE") == 0){ + Messages msg(JUICE, removeSender); + msgString = msg.toString(); + } else{ + Messages msg(CHUNK, removeSender); + msgString = msg.toString(); + } //processor, exec, file, start, prefix - tcp->sendMessage(msgSplit[0], TCPPORT, msg.toString()); + tcp->sendMessage(msgSplit[0], TCPPORT, msgString); tcp->mapleMessages.pop(); } while (!tcp->pendSendMessages.empty()) {