diff --git a/Wc b/Wc index d6b044c26b13deec26661164bb52afa8df9e32c9..2f25d268fed9a59e20f7423d32e5c70e2b7a4571 100755 Binary files a/Wc and b/Wc differ diff --git a/Wr b/Wr index fa64bb38e3524ad4d0ed0b8613405882448bc84f..bc977af4de7b2fbfe76e016cb6c4ef4c65cf3bd4 100755 Binary files a/Wr and b/Wr differ diff --git a/inc/Messages.h b/inc/Messages.h index 6f2c3100cbc583197a19eb5dfc28ad4bb8dc39f9..0bf7d4fe79cb9db35a65c9bfe890104c63e6debd 100644 --- a/inc/Messages.h +++ b/inc/Messages.h @@ -7,6 +7,8 @@ using namespace std; +#define T_maples 50 // lines to process, 2 for testing, probably 100 for real + class Messages { public: MessageType type; diff --git a/inc/Node.h b/inc/Node.h index 6a3f50e5ea9e0d341ba0f88787cb4ec2a09415c2..e6c6831ae61974a585e46f1c51caada4d5b6ffd5 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -46,7 +46,6 @@ using namespace std; #define N_b 5 // how many nodes GOSSIP want to use #define T_election 50 // in T_period #define T_switch 5 // in seconds -#define T_maples 50 // lines to process, 2 for testing, probably 100 for real // void *runUdpServer(void *udpSocket); diff --git a/src/Node.cpp b/src/Node.cpp index 3b5b3488e90b502f2f08a1f1a0dd412b582f1d28..32be8ac06e3395dcd3cca7c0f73deb068b7a4305 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -228,6 +228,7 @@ int Node::failureDetection(){ workerRing->addNode(nextIp, nextId); string workToDo = "", mapleS = ""; for (auto &e : mapleSending){ + srand(time(NULL)); if (get<0>(e.second).compare(get<0>(keyTuple))) continue; vector<int> temp = randItems(1, fileList[get<0>(e.first)]); string tempIp = hashRing->getValue(temp[0]); @@ -289,15 +290,9 @@ int Node::failureDetection(){ if (aliveIps.count(s[1]) == 0) continue; //cout << "[FIX] " << scopy << endl; if (s[0].compare(nodeInformation.ip) == 0) { - if (s[0].compare("JUICE") == 0){ - string removeSender = scopy.substr(s[0].size() + 2 + s[1].size() + 2); - Messages msg(JUICE, removeSender); - tcpServent->regMessages.push(msg.toString()); - } else{ - string removeSender = scopy.substr(s[0].size() + 2); - Messages msg(CHUNK, removeSender); - tcpServent->regMessages.push(msg.toString()); - } + string removeSender = scopy.substr(s[0].size() + 2); + Messages msg(CHUNK, removeSender); + tcpServent->regMessages.push(msg.toString()); } else tcpServent->mapleMessages.push(scopy); } @@ -1098,7 +1093,7 @@ void Node::handleTcpMessage() if (includedDebug.size()) includedDebug += " | "; includedDebug += get<0>(e); } - vector<string> messagesToSend; //used so we get our full assignments before scheduling + map<tuple<string,string,string>,vector<string>> linesForFile; //(sender,processer,file) -> <lines> for this file //cout << "[MAPLE] " << includedDebug << " are the worker nodes" << endl; for (auto &e: directory){ start = 0; @@ -1106,57 +1101,68 @@ void Node::handleTcpMessage() int lines = get<1>(e); //cout << "[MAPLE] file: " << file << " - " << to_string(lines) << endl; while (start < lines){ - s = file + "::" + to_string(start); - id = workerRing->locateClosestNode(s); + id = workerRing->locateClosestNode(file + "::" + to_string(start)); srand(time(NULL)); vector<int> temp = randItems(1, fileList[file]); string sender = hashRing->getValue(temp[0]); //because files are part of sdfs anyone can be the sender string processor = workerRing->getValue(id); //processor is a maple worker - workerProcessing[processor].push_back(make_tuple(file, to_string(start))); - workerTasks[processor].insert(make_tuple(file, to_string(start))); + linesForFile[make_tuple(sender, processor, file)].push_back(to_string(start)); //cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl; - mapleSending[make_tuple(file, to_string(start))] = make_tuple(sender, 0); - string maplemsg = sender + "::" + processor + "::" + exe + "::" + s; - //sender, processor, exec, file, start - messagesToSend.push_back(maplemsg); - start = start + T_maples; + start += T_maples; + } + } + //tasks, processing, sender, and message + for (auto &unitWork : linesForFile){ + string lines = ""; + for (auto &lineNum : unitWork.second) { + if (lines.size()) { lines += ","; } + lines += lineNum; } + auto fileLines = make_tuple(get<2>(unitWork.first), lines); + workerTasks[get<1>(unitWork.first)].insert(fileLines); + workerProcessing[get<1>(unitWork.first)].push_back(fileLines); + mapleSending[fileLines] = make_tuple(get<0>(unitWork.first), 0); + string mapleMsg = get<0>(unitWork.first) + "::" + get<1>(unitWork.first) + "::"; + mapleMsg += (exe + "::" + get<2>(unitWork.first) + "::" + lines); + tcpServent->mapleMessages.push(mapleMsg); } - for (auto mapleMsg : messagesToSend) tcpServent->mapleMessages.push(mapleMsg); break; } case CHUNK: { - //processor, exec, sdfs, start - int end, start; - try {end = stoi(inMsg[3]) + T_maples, start = stoi(inMsg[3]); } - catch(...) { cout << "[ERROR] stoi: bad CHUNK start and end. Last arg is start " << msg.toString() << endl; fflush(stdout);exit(1);} - string starts = inMsg[3] + "," + to_string(end); - + //processor, exec, sdfs, starts string localfile = "", sdfsFile = inMsg[2]; if (localFilelist.find(inMsg[2]) != localFilelist.end()) localfile = localFilelist[inMsg[2]]; else localfile = inMsg[2]; - + vector<string> starts = splitString(inMsg[3], ","); vector<string> unDirectory = splitString(sdfsFile, "-"); //get rid of timestamp - string fileDest = "tmp-"+inMsg[3]+"-"+sdfsFile.substr(unDirectory[0].size()+1); - - int lineCounter = -1, numbytes = 0, readLines = 0; + string fileDest = "tmp-chunk-"+sdfsFile.substr(unDirectory[0].size()+1); + int lineCounter = -1, numbytes = 0, readLines = 0, startInd = 0, start = stoi(starts[startInd]); ifstream file(localfile); string str; - while (getline(file, str) && (lineCounter < end - 1)) + while (getline(file, str)) { lineCounter++; if (lineCounter < start) continue; numbytes += (str.size()); readLines++; + if (lineCounter >= (start+T_maples-1)){ + startInd++; + if (startInd >= starts.size()) break; + else start = stoi(starts[startInd]); + } } file.clear(); // clear fail and eof bits file.seekg(0); // back to the start! - + string chunkStartsFormatted = ""; + for (auto s : starts){ + if (chunkStartsFormatted.size()) chunkStartsFormatted += "-"; + chunkStartsFormatted += s; + } string toSend = localfile + "," + to_string(numbytes); - string header = sdfsFile + "," + inMsg[1] + "," + inMsg[3]; + string header = sdfsFile + "," + inMsg[1] + "," + chunkStartsFormatted; //processor, return type, file dest, header, toSend, starts - string sendMsg = inMsg[0] + "::" + to_string(CHUNKACK) + "::" + fileDest + "::" + header + "::" + toSend + "::" + starts; + string sendMsg = inMsg[0] + "::" + to_string(CHUNKACK) + "::" + fileDest + "::" + header + "::" + toSend + "::" + inMsg[3]; this->tcpServent->mergeMessages.push(sendMsg); break; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index c933d9b1e659b2faf1199331b3298346faff3de1..73c66cbe0cf6db407f19c687ac95d156200085f5 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -130,7 +130,6 @@ void TcpSocket::mergeFiles(string ip, string port, string handler, string filede try { mode = stoi(handler); } catch (...) { cout << "[ERROR] stoi: " << handler << endl; fflush(stdout);exit(1);} string payload = handler + "," + filedest + "," + header + "," + toSend; - //73,11,dir-w100_1606725536,dir-w100,dataset/100.txt,dataset/100.txt,10502025 payload = to_string(payload.size()) + "," + payload; //cout << "[PUT] payload: " << payload << ", range: " << starts << " sending to " << ip << endl; Messages msg(MERGE, payload); @@ -140,12 +139,13 @@ void TcpSocket::mergeFiles(string ip, string port, string handler, string filede } while (index < dirSize){ if (mode == CHUNKACK) { - try { - int si = stoi(toStart[index]); - int sj = stoi(toStart[index+1]); + for (auto startLine : toStart){ + int si; + try { si = stoi(startLine); } + catch (...) { cout << "[ERROR] stoi: " << "send lines " << toStart[index] << "," << toStart[index+1] << endl; fflush(stdout);exit(1);} + int sj = si + T_maples; sendLines(sockfd, toProcess[index], si, sj); } - catch (...) { cout << "[ERROR] stoi: " << "send lines " << toStart[index] << "," << toStart[index+1] << endl; fflush(stdout);exit(1);} } else{ fp = fopen(toProcess[index].c_str(), "rb"); @@ -270,7 +270,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE; vector<string> format; char c; - string preProcessed = ""; + string preProcessed = "", startLines = ""; string filedest = filesAndSizes[1], processed = "", filename = "", sdfsfilename = ""; if (returnType == MAPLEACK) returnTypeString = "MAPLEMERGE"; @@ -295,6 +295,12 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP returnTypeString = "CHUNK"; sdfsfilename = filesAndSizes[2]; //sdfs file execfilename = filesAndSizes[3]; //exec file name + vector<string> reProcessLines = splitString(filesAndSizes[4], "-"); + for (auto re : reProcessLines){ + if (startLines.size()) startLines += ","; + startLines += re; + } + //cout << "[CHUNK] exec: " << execfilename << ", start: " << filesAndSizes[4] << ", sdfs: " << sdfsfilename << endl; index = 5; } @@ -383,7 +389,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP if (fail && (returnType == MAPLEACK)) { Messages ack(MERGEFAIL, returnIP + "::"); regMessages.push(ack.toString()); } else if (returnType == MAPLEACK){ Messages ack(MERGECOMPLETE, returnIP + "::" + processed); regMessages.push(ack.toString()); } else if (returnType == JUICEACK){ Messages ack(JUICEACK, returnIP + "::" + preProcessed); regMessages.push(ack.toString()); } - else if (returnType == CHUNKACK){ Messages ack(CHUNKACK, returnIP + "::" + execfilename + "::" + filesAndSizes[4] + "::" + filename + "::" + sdfsfilename); regMessages.push(ack.toString()); } + else if (returnType == CHUNKACK){ Messages ack(CHUNKACK, returnIP + "::" + execfilename + "::" + startLines + "::" + filename + "::" + sdfsfilename); regMessages.push(ack.toString()); } else if (returnType == PUTACK){ Messages ack(PUTACK, returnIP + "::" + sdfsfilename + "::" + filename+ "::" + remoteLocalname); regMessages.push(ack.toString()); } else { cout << "[MERGE bad return type " << to_string(returnType) << endl;} break;