From 2e654e662a3a335d9d130ba097deca8ee45b35b6 Mon Sep 17 00:00:00 2001 From: afederici <ajf5@illinois.edu> Date: Sun, 29 Nov 2020 09:34:09 -0600 Subject: [PATCH] try now --- .DS_Store | Bin 6148 -> 6148 bytes Wc | Bin 189120 -> 189120 bytes Wr | Bin 184720 -> 184720 bytes src/Node.cpp | 1151 +++++++++++++++++++++++++++++++------------------- 4 files changed, 714 insertions(+), 437 deletions(-) diff --git a/.DS_Store b/.DS_Store index 28cec4ad244dc77ce75c401e526282a619bd8688..666eed3262c1264d8536aef30d046f36b010c374 100644 GIT binary patch delta 27 jcmZoMXffDO!NeThdU5i3Cb7vAnG%_zA8h`@Bp?a^lS>Ly delta 27 jcmZoMXffDO!Ni<?K78_dCb7vAnG%_b6*hli5)cIdjd2Nw diff --git a/Wc b/Wc index 587d7dc940a69b04202ac7e76321ffda59bbbfb5..647b6bbefe11636dc2ce2c9182d7985532920625 100755 GIT binary patch delta 68 zcmV-K0K5Od#S6g23xKo%@OT1dzqj&u0nRo8XTP_?uK~KU4mCDmG%z(TZvbCyVQFrc a5orQ4x4dEkEj$J_H#K81VYly|0?|YIxEogh delta 68 zcmX?bl>5L@?uIRlA1aul*Khw+!FbLL#5lB`an~k3BST}0loY*uhWOmX%)IG>nM_98 X_argt+OV0Wr6pM;ZGS(H>7p|L`Q012 diff --git a/Wr b/Wr index 2979ec3dc482f628efef6d8ebf053e0ef5209e29..a455eb1735fdf9d0b701d09f3860ed50e925db06 100755 GIT binary patch delta 68 zcmV-K0K5N?qYIFu3xKo%NO%Hgzqd+w0ZuUjXur3bsR5;}4mU7iF=J&eZvbCyVQFrc a?)(8Vw|`dxJT?Y6VKg^nIk&-$0?$K;<{9n) delta 68 zcmbQRn0vxv?uIRl9u-V6>$iJVF!~vS7_(L~E?vu)Y-wa>W~7(T5TBcvnK%92A4a3? X_2Eo5W^5)VDHaBX+Yk0Kop%NRaYq>@ diff --git a/src/Node.cpp b/src/Node.cpp index 92870cc..d001d85 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -1,20 +1,12 @@ #include "../inc/Node.h" - -//add another function to Node class for failure detection -//call function before sender (heartbeat) after listenForHeartbeat - -Node::Node() -{ - // create a udp object +Node::Node(){ udpServent = new UdpSocket(); tcpServent = new TcpSocket(); hashRing = new HashRing(); + workerRing = new HashRing(); localTimestamp = 0; heartbeatCounter = 0; - //time(&startTimestamp); - // byteSent = 0; - // byteReceived = 0; runningMode = ALL2ALL; activeRunning = false; prepareToSwitch = false; @@ -22,108 +14,69 @@ Node::Node() leaderPosition = -1; proposedTime = 0; electedTime = 0; + isJuicePhase = false; joinTimestamp = ""; + exe = ""; + sdfsPre = ""; possibleSuccessorIP = ""; leaderIP = ""; leaderPort = ""; + maplejuiceClear = false; isBlackout = true; + struct sigaction sa; + sa.sa_handler = sigchld_handler; // reap all dead processes + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + if (sigaction(SIGCHLD, &sa, NULL) == -1) { + perror("sigaction"); + exit(1); + } } -Node::Node(ModeType mode) -{ - // create a udp object - udpServent = new UdpSocket(); - tcpServent = new TcpSocket(); - hashRing = new HashRing(); - localTimestamp = 0; - heartbeatCounter = 0; - //time(&startTimestamp); - // byteSent = 0; - // byteReceived = 0; - runningMode = mode; - activeRunning = false; - prepareToSwitch = false; - logWriter = new Logger(LOGGING_FILE_NAME); - leaderPosition = -1; - proposedTime = 0; - electedTime = 0; - joinTimestamp = ""; - possibleSuccessorIP = ""; - leaderIP = ""; - leaderPort = ""; - isBlackout = true; -} +Node::Node(ModeType mode) : Node() { runningMode = mode; } void Node::startActive() { - membershipList.clear(); + queue<string> empty; + swap( maplejuiceQ , empty ); + resetMapleJuice(); restartElection(); // inserting its own into the list time(&startTimestamp); - string startTime = ctime(&startTimestamp); - startTime = startTime.substr(0, startTime.find("\n")); - tuple<string,string,string> mapKey(nodeInformation.ip, nodeInformation.port, startTime); - tuple<int, int, int> valueTuple(nodeInformation.heartbeatCounter, nodeInformation.timestamp, 0); - membershipList[mapKey] = valueTuple; - - debugMembershipList(); + string startTime = updateNodeHeartbeatAndTime(); + debugMembershipList(this); joinTimestamp = startTime; // for hashRing getPositionOnHashring(); // update its hashRingPosition } -void Node::computeAndPrintBW(double diff) -{ -#ifdef LOG_VERBOSE - cout << "total " << udpServent->byteSent << " bytes sent" << endl; - cout << "total " << udpServent->byteReceived << " bytes received" << endl; - printf("elasped time is %.2f s\n", diff); -#endif - if (diff > 0) { - double bandwidth = udpServent->byteSent/diff; - string message = "["+to_string(this->localTimestamp)+"] B/W usage: "+to_string(bandwidth)+" bytes/s"; -#ifdef LOG_VERBOSE - printf("%s\n", message.c_str()); -#endif - this->logWriter->printTheLog(BANDWIDTH, message); - } -} - -void Node::updateNodeHeartbeatAndTime() +string Node::updateNodeHeartbeatAndTime() { string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); tuple<string, string, string> keyTuple(nodeInformation.ip, nodeInformation.port,startTime); tuple<int, int, int> valueTuple(heartbeatCounter, localTimestamp, 0); this->membershipList[keyTuple] = valueTuple; + return startTime; } string Node::populateMembershipMessage() { //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag string mem_list_to_send = ""; - //Assume destination already exists in the membership list of this node, just a normal heartbeat switch (this->runningMode) { case GOSSIP: - for (auto& element: this->membershipList) { - tuple<string, string, string> keyTuple = element.first; - tuple<int, int, int> valueTuple = element.second; - mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; - mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; - } - break; - + return populateIntroducerMembershipMessage(); // code re-use default: string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); mem_list_to_send += nodeInformation.ip + "," + nodeInformation.port + "," + startTime + ","; mem_list_to_send += to_string(heartbeatCounter) + "," + to_string(0) + "\n"; - break; + return mem_list_to_send; } - return mem_list_to_send; } -string Node::populateIntroducerMembershipMessage(){ +string Node::populateIntroducerMembershipMessage() { string mem_list_to_send = ""; for (auto& element: this->membershipList) { tuple<string, string, string> keyTuple = element.first; @@ -134,53 +87,29 @@ string Node::populateIntroducerMembershipMessage(){ return mem_list_to_send; } -/** - * - * HeartbeatToNode: Sends a string version of the membership list to the receiving node. The receiving node will convert the string to - * a <string, long> map where the key is the Addresss (IP + PORT) and value is the heartbeat counter. We then compare the Member. - * - **/ int Node::heartbeatToNode() { - // 3. prepare to send heartbeating, and + string msg; string mem_list_to_send = populateMembershipMessage(); vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); - - //Now we have messages ready to send, need to invoke UDP client to send #ifdef LOG_VERBOSE cout << "pick " << targetNodes.size() << " of " << this->membershipList.size()-1; cout << " members" << endl; #endif - - // 4. do gossiping for (uint i=0; i<targetNodes.size(); i++) { - //cout << targetNodes[i].first << "/" << targetNodes[i].second << endl; Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); - string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); #ifdef LOG_VERBOSE cout << "[Gossip]" << message.c_str() << endl; #endif this->logWriter->printTheLog(GOSSIPTO, message); - - //cout << mem_list_to_send.size() << " Bytes sent..." << endl; - // byteSent += mem_list_to_send.size(); if (isLeader) { - //Messages msg(LEADERHEARTBEAT, mem_list_to_send); - if (isBlackout) { - string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } else { - string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - } else { - //Messages msg(HEARTBEAT, mem_list_to_send); - string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); + if (isBlackout) msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); + else msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); } + else msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); } - return 0; } @@ -197,12 +126,12 @@ int Node::failureDetection(){ cout << "checking " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; #endif if ((get<0>(keyTuple).compare(nodeInformation.ip) == 0) && (get<1>(keyTuple).compare(nodeInformation.port) == 0)) { - // do not check itself #ifdef LOG_VERBOSE - cout << "skip it" << endl; + cout << "do not check itself" << endl; #endif continue; } + //node has not failed if(get<2>(valueTuple) == 0){ if(localTimestamp - get<1>(valueTuple) > T_timeout){ //cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; @@ -266,21 +195,78 @@ int Node::failureDetection(){ } } + ////////////////////////////////////////////////////// + //1) remove from HashRing + //2) if processing, reassign + //2a) if no extra nodes, assign to successor, else add new node to ring + //3) if a sender, reassign replica holders as new senders for each thing sent + if (workerRing->size() > 0){ + vector<tuple<string,string,string>> aliveNodes; + for (auto &e : membershipList) aliveNodes.push_back(e.first); + vector<tuple<string,string,string>> mapleNodes; + int nextId; + size_t newSize = workerRing->nodePositions.size()-1; + if (newSize <= hashRing->nodePositions.size()){ + nextId = workerRing->getSuccessor(deletedNodePostion); + } else { + while (1){ + mapleNodes.clear(); + mapleNodes = randItems(1, aliveNodes); + Member m(get<0>(mapleNodes[0]), get<1>(mapleNodes[0])); + nextId = hashingId(m, get<2>(mapleNodes[0])); + if (workerRing->getValue(nextId).compare("No node found") == 0) break; + } + Messages startMsg(PHASESTART, "filling in for failed worker"); + tcpServent->sendMessage(get<0>(mapleNodes[0]), TCPPORT, startMsg.toString()); + workerRing->addNode(get<0>(mapleNodes[0]), nextId); + } + workerRing->removeNode(deletedNodePostion); + string newNode = workerRing->getValue(nextId); + //if deleted from workerTasks its been fully processed and doesnt need to be re-scheduled + auto vecCopy(workerProcessing[get<0>(keyTuple)]); + if (workerTasks.find(get<0>(keyTuple)) != workerTasks.end()){ + string workToDo = ""; + for (auto el : vecCopy) { + workToDo += get<0>(el); + workerProcessing[newNode].push_back(make_tuple(get<0>(el), get<1>(el))); + workerTasks[newNode].insert(make_tuple(get<0>(el), get<1>(el))); + } + struct dirent *entry = nullptr; + DIR *dp = nullptr; + string match = "tmp-" + get<0>(keyTuple) + "-"; + int matchLen = match.size(); + if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} + while ((entry = readdir(dp))){ + if (strncmp(entry->d_name, match.c_str(), matchLen) == 0) { remove(entry->d_name); } + } + if (isJuicePhase){ + Messages outMsg(JUICE, exe + "::" + sdfsOut + "::" + workToDo); + tcpServent->sendMessage(newNode, TCPPORT, outMsg.toString()); + } + } + //come back + workerProcessing.erase(get<0>(keyTuple)); + workerTasks.erase(get<0>(keyTuple)); + + for (auto &e : mapleSending[get<0>(keyTuple)]){ + vector<int> temp = randItems(1, fileList[get<0>(e)]); + auto task = make_tuple(get<0>(e), get<1>(e)); + mapleSending[hashRing->getValue(temp[0])].push_back(task); + string processor = ""; + for (auto &worker : workerTasks){ + if (worker.second.count(task) > 0) processor = worker.first; + } + //sender + "::" + processor + "::" + exe + "::" + s; + string mapleS = hashRing->getValue(temp[0]) + "::" + processor + "::" + exe + "::" + get<0>(e)+ "::" + get<1>(e); + tcpServent->mapleMessages.push(mapleS); + } + } } - } - } - else{ + } else { //check for cleanup on already failed nodes if(localTimestamp - get<1>(valueTuple) > T_cleanup){ - // core dumped happened here; bug fix auto iter = this->membershipList.find(keyTuple); if (iter != this->membershipList.end()) { - //cout << "Got " << get<0>(iter->first) << "/" << get<1>(iter->first) << "/" << get<2>(iter->first); - //cout << " with " << to_string(get<0>(iter->second)) << "/"; - //cout << to_string(get<1>(iter->second)) << "/"; - //cout << to_string(get<2>(iter->second)) << endl; - //cout << this->membershipList[keyTuple] - //this->membershipList.erase(iter); removedVec.push_back(keyTuple); } } @@ -299,12 +285,10 @@ int Node::failureDetection(){ } this->membershipList.erase(iter); - string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(removedVec[i])+"/"+get<1>(removedVec[i])+"/"+get<2>(removedVec[i])+": REMOVED FROM LOCAL MEMBERSHIP LIST"; cout << "[REMOVE]" << message.c_str() << endl; this->logWriter->printTheLog(REMOVE, message); - - //this->debugMembershipList(); + //debugMembershipList(this); } } if (this->membershipList.size()==1 || leaderRemoved) { // Only me or leader failed, restart leader election @@ -315,13 +299,10 @@ int Node::failureDetection(){ return 0; } - int Node::joinSystem(Member introducer) { string mem_list_to_send = populateMembershipMessage(); - //Messages msg(JOIN, mem_list_to_send); string msg = populateSDFSFileList(JOIN, mem_list_to_send); - string message = "["+to_string(this->localTimestamp)+"] sent a request to "+introducer.ip+"/"+introducer.port+", I am "+nodeInformation.ip+"/"+nodeInformation.port; cout << "[JOIN]" << message.c_str() << endl; this->logWriter->printTheLog(JOINGROUP, message); @@ -332,23 +313,18 @@ int Node::joinSystem(Member introducer) int Node::requestSwitchingMode() { string message = nodeInformation.ip+","+nodeInformation.port; - //Messages msg(SWREQ, message); string msg = populateSDFSFileList(SWREQ, message); for(auto& element: this->membershipList) { tuple<string,string,string> keyTuple = element.first; - //tuple<int, int, int> valueTuple = element.second; cout << "[SWITCH] sent a request to " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << endl; udpServent->sendMessage(get<0>(keyTuple), get<1>(keyTuple), msg); } return 0; } -int Node::SwitchMyMode() -{ - // wait for a while - sleep(T_switch); - // empty all messages - udpServent->qMessages = queue<string>(); +int Node::SwitchMyMode() { + sleep(T_switch); // wait for a while + udpServent->qMessages = queue<string>(); // empty all messages switch (this->runningMode) { case GOSSIP: { this->runningMode = ALL2ALL; @@ -363,76 +339,30 @@ int Node::SwitchMyMode() default: break; } - // finishing up - prepareToSwitch = false; + prepareToSwitch = false; // finishing up return 0; } -int Node::listenToHeartbeats() -{ - //look in queue for any strings --> if non empty, we have received a message and need to check the membership list - +int Node::listenToHeartbeats() { // 1. deepcopy and handle queue queue<string> qCopy(udpServent->qMessages); udpServent->qMessages = queue<string>(); - - int size = qCopy.size(); + size_t size = qCopy.size(); //cout << "Got " << size << " messages in the queue" << endl; //cout << "checking queue size " << nodeOwn->udpServent->qMessages.size() << endl; - - // 2. merge membership list - for (int j = 0; j < size; j++) { + for (size_t j = 0; j < size; j++) { //cout << qCopy.front() << endl; - readMessage(qCopy.front()); - - // Volunteerily leave - if(this->activeRunning == false){ - return 0; - } - // byteReceived += qCopy.front().size(); + handleUdpMessage(qCopy.front()); + if(this->activeRunning == false) return 0; qCopy.pop(); } - return 0; } -void Node::debugMembershipList() -{ - cout << "Membership list [" << this->membershipList.size() << "]:" << endl; - if (isLeader) { - cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; - } else { - cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; - } - string message = ""; - - for (auto& element: this->membershipList) { - tuple<string,string,string> keyTuple = element.first; - tuple<int, int, int> valueTuple = element.second; - - if (nodeInformation.ip.compare(get<0>(keyTuple))==0) { // Myself - if (isLeader) { - message += "[L/M] "; - } else { - message += "[M] "; - } - } else if (leaderIP.compare(get<0>(keyTuple))==0) { - message += "[L] "; - } else { - if (isLeader) { - message += " "; - } else { - message += " "; - } - } - - message += get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple); - message += ": "+to_string(get<0>(valueTuple))+"/"+to_string(get<1>(valueTuple))+"/"+to_string(get<2>(valueTuple))+"\n"; - } - cout << message.c_str() << endl; - this->logWriter->printTheLog(MEMBERS, message); -} - +/* + * Take a hearbeat message, if the member doesn't exist add it, update hashring, and disseminate out memberList + * If it exists, check for failure, and if there is update fail flag, otherwise try ot update heartbeat +*/ void Node::processHeartbeat(string message) { bool changed = false; vector<string> incomingMembershipList = splitString(message, "\n"); @@ -446,10 +376,7 @@ void Node::processHeartbeat(string message) { } membershipListEntry.clear(); membershipListEntry = splitString(list_entry, ","); - if (membershipListEntry.size() != 5) { - // circumvent craching - continue; - } + if (membershipListEntry.size() != 5) continue; int incomingHeartbeatCounter = stoi(membershipListEntry[3]); int failFlag = stoi(membershipListEntry[4]); @@ -459,16 +386,13 @@ void Node::processHeartbeat(string message) { // Volunteerily leave if you are marked as failed if(failFlag == 1){ this->activeRunning = false; - string message = "["+to_string(this->localTimestamp)+"] node "+this->nodeInformation.ip+"/"+this->nodeInformation.port+" is left"; cout << "[VOLUNTARY LEAVE]" << message.c_str() << endl; this->logWriter->printTheLog(LEAVE, message); return; } - - // do not check itself heartbeat #ifdef LOG_VERBOSE - cout << "skip it" << endl; + cout << "do not check itself heartbeat" << endl; #endif continue; } @@ -501,7 +425,6 @@ void Node::processHeartbeat(string message) { if(incomingHeartbeatCounter > currentHeartbeatCounter){ get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; get<1>(this->membershipList[mapKey]) = localTimestamp; - // get<2>(this->membershipList[mapKey]) = failFlag; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); #ifdef LOG_VERBOSE cout << "[UPDATE]" << message.c_str() << endl; @@ -526,43 +449,11 @@ void Node::processHeartbeat(string message) { break; } } - } else { - // do nothing } } } - // If membership list changed in all-to-all, full membership list will be sent - if(changed && this->runningMode == ALL2ALL){ - string mem_list_to_send = populateIntroducerMembershipMessage(); - vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); - - for (uint i=0; i<targetNodes.size(); i++) { - Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); - - string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); -#ifdef LOG_VERBOSE - cout << "[Gossip]" << message.c_str() << endl; -#endif - this->logWriter->printTheLog(GOSSIPTO, message); - - if (isLeader) { - //Messages msg(LEADERHEARTBEAT, mem_list_to_send); - if (isBlackout) { - string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } else { - string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - } else { - //Messages msg(HEARTBEAT, mem_list_to_send); - string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - - } - } + if(changed && this->runningMode == ALL2ALL) heartbeatToNode(); } void Node::setUpLeader(string message, bool pending) @@ -579,9 +470,9 @@ void Node::setUpLeader(string message, bool pending) if (pending != isBlackout) { if (isBlackout) { - cout << "[BLACKOUT] Leader is ready now" << endl; + //cout << "[BLACKOUT] Leader is ready now" << endl; } else { - cout << "[BLACKOUT] Leader is busy now" << endl; + //cout << "[BLACKOUT] Leader is busy now" << endl; } } if (pending) { @@ -594,24 +485,19 @@ void Node::setUpLeader(string message, bool pending) /** * given a string message which contains a membership list, we will take the string, split it by returns, and then split it by commas, to then compare the heartbeat counters * of each IP,PORT,timestamp tuple with the membership list of the receiving Node. - * * Found help on how to do string processing part of this at https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c */ -void Node::readMessage(string message){ - - // decapsulate with specific messages - //cout << "readMessage " << message << endl; +void Node::handleUdpMessage(string message){ + //cout << "handleUdpMessage " << message << endl; string deMeg = decapsulateMessage(message); bool pending = true; - //cout << "readMessage deMeg " << deMeg << endl; - + //cout << "handleUdpMessage deMeg " << deMeg << endl; Messages msg(deMeg); switch (msg.type) { case LEADERHEARTBEAT: // Note: not for Gossip-style, only All-to-All //cout << "LEADERHEARTBEAT: " << msg.payload << endl; pending = false; - case LEADERPENDING: - setUpLeader(msg.payload, pending); + case LEADERPENDING: setUpLeader(msg.payload, pending); case HEARTBEAT: case JOINRESPONSE:{ processHeartbeat(msg.payload); @@ -624,13 +510,11 @@ void Node::readMessage(string message){ Member member(fields[0], fields[1]); int checkPosition = hashingId(member, fields[2]); if (checkHashNodeCollision(checkPosition)) { - //Messages response(JOINREJECT, ""); string response = populateSDFSFileList(JOINREJECT, ""); udpServent->sendMessage(fields[0], fields[1], response); } else { string introducerMembershipList; introducerMembershipList = populateIntroducerMembershipMessage(); - //Messages response(JOINRESPONSE, introducerMembershipList); string response = populateSDFSFileList(JOINRESPONSE, introducerMembershipList); udpServent->sendMessage(fields[0], fields[1], response); } @@ -646,7 +530,6 @@ void Node::readMessage(string message){ //Messages msgReply(SWRESP, messageReply); string msgReply = populateSDFSFileList(SWRESP, messageReply); udpServent->sendMessage(fields[0], fields[1], msgReply); - prepareToSwitch = true; } break; @@ -660,7 +543,6 @@ void Node::readMessage(string message){ break; } case JOINREJECT: { - // TODO: the node should leave cout << "[JOINREJECT] There is a collision, and I have to leave..." << endl; this->activeRunning = false; pthread_exit(NULL); @@ -669,29 +551,7 @@ void Node::readMessage(string message){ default: break; } - //debugMembershipList(); -} - -vector<string> Node::splitString(string s, string delimiter){ - vector<string> result; - size_t pos_start = 0, pos_end, delim_len = delimiter.length(); - string token; - - while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { - token = s.substr (pos_start, pos_end - pos_start); - pos_start = pos_end + delim_len; - result.push_back (token); - } - - result.push_back (s.substr (pos_start)); - return result; -} - -int Node::hashingId(Member nodeMember, string joinTime) -{ - string toBeHashed = "NODE::" + nodeMember.ip + "::" + nodeMember.port + "::" + joinTime; - int ringPosition = hash<string>{}(toBeHashed) % HASHMODULO; - return ringPosition; + //debugMembershipList(this); } int Node::getPositionOnHashring(){ @@ -792,29 +652,23 @@ void Node::restartElection() // haven't tested yet leaderPort = ""; } -void Node::leaderCreateHashRing() -{ - // The leader or notes creates hashRing +void Node::leaderCreateHashRing() { hashRing->clear(); for (auto& element: this->membershipList) { // update hashRing tuple<string, string, string> keyTuple = element.first; Member member(get<0>(keyTuple), get<1>(keyTuple)); int pos = hashingId(member, get<2>(keyTuple)); - //hashRing->addNode("NODE::"+get<0>(keyTuple), pos); // since we don't store file, remove NODE hashRing->addNode(get<0>(keyTuple), pos); } - //hashRing->debugHashRing(); } -void Node::proposeToBeLeader() -{ - // Start election +void Node::proposeToBeLeader() { Messages msg(ELECTION, to_string(hashRingPosition)); cout << "[ELECTION] Propose to be leader, send to " << possibleSuccessorIP << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); } -void Node::processElection(Messages messages) +void Node::electionMessageHandler(Messages messages) { switch (messages.type) { case ELECTION: { // check id @@ -860,26 +714,19 @@ void Node::processElection(Messages messages) } } -void Node::processTcpMessages() +void Node::tcpElectionProcessor() { queue<string> qCopy(tcpServent->qMessages); tcpServent->qMessages = queue<string>(); - - int size = qCopy.size(); //cout << "Got " << size << " TCP messages" << endl; - - for (int j=0; j<size; j++) { + for (size_t j=0; j<qCopy.size(); j++) { //cout << qCopy.front() << endl; Messages msg(qCopy.front()); //cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { case ELECTION: - case ELECTIONACK: { - processElection(msg); - break; - } - default: - break; + case ELECTIONACK: electionMessageHandler(msg); + default: break; } qCopy.pop(); } @@ -946,21 +793,54 @@ void Node::checkFileListConsistency(){ } } -void Node::processRegMessages() +vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() +{ + vector<tuple<string, string, string>> availableNodesInfo; + vector<tuple<string, string, string>> selectedNodesInfo; + vector<int> indexList; + int availableNodes = 0; + for(auto& element: this->membershipList){ + tuple<string, string, string> keyPair = element.first; + tuple<int, int, int> valueTuple = element.second; + //dont gossip to self or failed nodes + if(get<0>(keyPair).compare(this->nodeInformation.ip) && (get<2>(valueTuple) != 1)){ + availableNodesInfo.push_back(keyPair); + indexList.push_back(availableNodes++); + } + } + switch (this->runningMode) { + case GOSSIP: { + return randItems(N_b, availableNodesInfo); + } + //ALL2ALL + default: { + return availableNodesInfo; + } + } +} + +void Node::handleMaplejuiceQ(){ + if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout){ + for (auto &e: fileList) { + if (e.second.size() < 4) return; //make sure replication is good first + } + string msgCopy(maplejuiceQ.front()); + cout << "[QUEUE] sending next maple/juice to be processed " << msgCopy << endl; + tcpServent->regMessages.push(msgCopy); + maplejuiceQ.pop(); + } +} + +void Node::handleTcpMessage() { //Before we do anything here, we should have the leader check to see if the file list is consistent or not. - //We do this by: - //1. Checking the files in the filelist, making sure each one has 4 entries. If not, then we need to rereplicate. - // We can initiate a PUT, put pending request, setting as -1, -1, and then last one as target node that we want to replicate to (new node to replace the one that failed) if(isLeader){ checkFileListConsistency(); } queue<string> qCopy(tcpServent->regMessages); tcpServent->regMessages = queue<string>(); - int size = qCopy.size(); //cout << "Got " << size << " TCP messages" << endl; - for (int j=0; j<size; j++) { // cout << qCopy.front() << endl; vector<string> msgSplit = splitString(qCopy.front(), "::"); @@ -970,26 +850,445 @@ void Node::processRegMessages() } string payload = ""; for(uint k = 1; k < msgSplit.size(); k++){ - if(k == msgSplit.size() - 1){ - payload += msgSplit[k]; - } else { - payload += msgSplit[k] + "::"; - } + if(k == msgSplit.size() - 1) payload += msgSplit[k]; + else payload += msgSplit[k] + "::"; } MessageType msgType = static_cast<MessageType>(stoi(msgSplit[0])); Messages msg(msgType, payload); - // cout << "Has " << msg.type << " with " << msg.payload << endl; + vector<string> inMsg = splitString(msg.payload, "::"); + //cout << "[TCP] Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { - case PUTACK: { - vector<string> inMsg = splitString(msg.payload, "::"); - if(inMsg.size() >= 4){ - string inMsgIP = inMsg[0]; - string sdfsfilename = inMsg[1]; - string localfilename = inMsg[2]; - string remoteLocalname = inMsg[3]; + case JUICESTART: { + if (workerRing->size()) { + maplejuiceQ.push(msg.toString()); + cout << "[JUICE] maple or juice in progress" << endl; + break; + } + cout << "[JUICE] Debug: " << msg.toString() << endl; + if (inMsg.size() < 6) { cout << "[JUICE] message too short " << to_string(inMsg.size()) << endl; break; } + //juice_exe num_juices sdfs_intermediate_dir sdfs_out_file delete={0,1} hash_or_range={0,1} + cout << "[JUICE] Leader starting new Juice phase" << endl; + isJuicePhase = true; + string includedDebug = ""; + sdfsOut = inMsg[3], sdfsPre = inMsg[2] + "-"; + exe = inMsg[0]; + maplejuiceClear = (stoi(inMsg[4])) ? true : false; + int workers = stoi(inMsg[1]); + int isRangePartition = stoi(inMsg[5]); + int ringSize = hashRing->nodePositions.size(); + if (workers > ringSize-1) workers = ringSize-1; + vector<string> directory; + vector<tuple<string,string,string>> aliveNodes; + //cout << "[DIRECTORY] - " << sdfsPre; + for (auto &e: fileList){ + if (strncmp(e.first.c_str(), sdfsPre.c_str(), sdfsPre.size()) == 0){ + //cout << e.first << ", "; + directory.push_back(e.first); + } + } + //cout << endl; + sort(directory.begin(), directory.end()); + for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first); + vector<tuple<string,string,string>> juiceNodes = randItems(workers, aliveNodes); + for (auto &e : juiceNodes) { + Member m(get<0>(e), get<1>(e)); + workerRing->addNode(get<0>(e), hashingId(m, get<2>(e))); + if (includedDebug.size()) includedDebug += " | "; + includedDebug += get<0>(e); + Messages startMsg(PHASESTART, "start juice"); + tcpServent->sendMessage(get<0>(e), TCPPORT, startMsg.toString()); + } + cout << "[JUICE] " << includedDebug << " are the worker nodes" << endl; + if (isRangePartition){ + int rangeSplit = (int) (round(double(directory.size()) / double(workers))); + int workerAssigned = 0; + int fileIndex = 0; + for (auto &e: directory){ + string processor = get<0>(juiceNodes[workerAssigned]); + workerProcessing[processor].push_back(make_tuple(e, "0")); //dont care about line # + fileIndex++; + if (fileIndex >= ((workerAssigned+1)*rangeSplit)) { workerAssigned++; } + } + } + else { + for (auto &e: directory){ + string processor = workerRing->getValue(workerRing->locateClosestNode(e)); + workerProcessing[processor].push_back(make_tuple(e, "0")); //dont care about line # + } + } + for (auto &work : workerProcessing) { + string msg = exe + "::" + sdfsOut + "::"; + int comma = 0; + for (auto &f : work.second){ + if (comma) msg += ","; + comma = 1; + msg += get<0>(f); + } + Messages outMsg(JUICE, msg); + tcpServent->sendMessage(work.first, TCPPORT, outMsg.toString()); + } + break; + } + case JUICE: { + auto piecesOfWork = splitString(inMsg[2], ","); + string retry = ""; + int comma = 0; + string header = ""; + time_t fileTimestamp; + time(&fileTimestamp); + string match = "tmp-", output = "tmp-" + inMsg[1] + "-" + to_string(fileTimestamp); + for (string s : piecesOfWork){ + if (localFilelist.find(s) == localFilelist.end()) + { + Messages outMsg(DNSGET, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + s + "::" + s); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + if (comma) retry += ","; + comma = 1; + retry += s; + } + else + { + cout << "[JUICE] " << s << " has arrived" << endl; + string execName = EXEC_CMD + inMsg[0]; + string fileName = localFilelist[s]; + if (runExecutable(execName, fileName) < 0) { cout << "[EXEC] ERROR" << endl; break;} + string scopy(s); + auto splitter = splitString(scopy, "-"); + string strToMerge = "tmp-" + splitter[1]; + if (header.size()) header += ","; + header += splitter[1]; + ofstream keyFile; + keyFile.open(output, ios::app | ios::out); + ifstream toMerge(strToMerge); + if (!toMerge.is_open() || !keyFile.is_open()) { + cout << "bad file permissions for " << strToMerge << " and/or " << output << endl; + if (keyFile.is_open()) keyFile.close(); + break; + } + keyFile << toMerge.rdbuf(); + keyFile.close(); + remove(strToMerge.c_str()); + } + } + FILE *fp = fopen(output.c_str(), "rb"); + fseek(fp, 0, SEEK_END); + int size = ftell(fp); + fseek(fp, 0, SEEK_SET); + fclose(fp); + cout << "[TEST] merging " << to_string(size) << " bytes" << endl; + string mergeMsg = leaderIP + "::" + to_string(JUICEACK) + "::" + inMsg[1] + "::" + header + "::"+ output + "," + to_string(size) + "::"; + if (header.size()) tcpServent->mergeMessages.push(mergeMsg); + if (retry.size()) { + Messages outMsg(JUICE, inMsg[0] + "::" + inMsg[1] + "::" + retry); + tcpServent->regMessages.push(outMsg.toString()); + } + break; + } + + case JUICEACK: { + if (!isLeader) break; + vector<string> completedJuices = splitString(inMsg[1], ","); + for (string &task : completedJuices){ + cout << "[JUICEACK] task: " << task << " status: ..."; + string matchStr = sdfsPre + task; + auto element = make_tuple(matchStr, "0"); + auto it = find(workerProcessing[inMsg[0]].begin(), workerProcessing[inMsg[0]].end(), element); + if (it != workerProcessing[inMsg[0]].end()) { + cout << "completed." << endl; + workerProcessing[inMsg[0]].erase(it); + } + } + cout << endl; + if (!workerProcessing[inMsg[0]].size()) workerProcessing.erase(inMsg[0]); + if (!workerProcessing.size()) { + cout <<"[JUICEACK] replicate final results " << endl; + Messages outMsg(DNS, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + sdfsOut + "::" + sdfsOut + "::" + "-1" + "::" + "-1" + "::"); + tcpServent->regMessages.push(outMsg.toString()); + if (maplejuiceClear){ + cout << "[JUICEACK] clearing files.... " << endl; + for (auto &f : fileList){ + if (strncmp(f.first.c_str(), sdfsPre.c_str(), sdfsPre.size()) == 0){ + Messages outMsg(DELETE, nodeInformation.ip + "::" + f.first); + tcpServent->regMessages.push(outMsg.toString()); + } + } + } + cout << "[JUICE] ------------ complete ---------- " << endl; + resetMapleJuice(); + } + break; + } + case PHASESTART: { + //TODO: reset things when you reach the end of MAPLESTART OR JUICESTART + //that if the work dictionaries are empty, theres nothing to do and just end the phase + //right now youll just stay stuck in the phase + cout << "[PHASESTART] go " << inMsg[0] << endl; + resetMapleJuice(); + break; + } + case MAPLESTART: { + //leader only function + if (workerRing->size()) { + maplejuiceQ.push(msg.toString()); + cout << "[MAPLE] maple or juice in progress" << endl; + break; + } + cout << "[MAPLE] Leader starting new Maple phase" << endl; + if (inMsg.size() < 4) break; + mapleKeys.clear(); + isJuicePhase = false; + string exe = inMsg[0], num_maples = inMsg[1], sdfs_dir = inMsg[3] + "-", s = ""; + sdfsPre = inMsg[2] + "-"; + int workers = stoi(num_maples), ringSize = hashRing->nodePositions.size(); + if (workers > ringSize-1) workers = ringSize-1; + int total_lines = 0, start = 0, id = 0; + vector<tuple<string,int>> directory; + vector<tuple<string,string,string>> aliveNodes; + //3 workers and a master is a condition set for correct working of the program. + //This assumption is similarly made in other places based on the design specification of 3 simul fails + if (ringSize <= 3){ + cout << "[ERROR] Not enough nodes for Maple. Need 4 minimum (3 workers, 1 leader)" << endl; + break; + } + //cout << "[DIRECTORY] " << sdfs_dir; + for (auto &e: fileSizes){ + //cout << e.first << " | " << to_string(get<1>(e.second)) << " "; + if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){ + //cout << " was a match "; + directory.push_back(make_tuple(e.first, get<1>(e.second))); + total_lines += get<1>(e.second); + } + } + //cout << endl << "[MAPLE] need to process " << to_string(total_lines) << endl; + for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first); + vector<tuple<string,string,string>> mapleNodes = randItems(workers, aliveNodes); + string includedDebug = ""; + for (auto &e : mapleNodes) { + Member m(get<0>(e), get<1>(e)); + workerRing->addNode(get<0>(e), hashingId(m, get<2>(e))); + if (includedDebug.size()) includedDebug += " | "; + includedDebug += get<0>(e); + } + vector<string> messagesToSend; //used so we get our full assignments before scheduling + //cout << "[MAPLE] " << includedDebug << " are the worker nodes" << endl; + for (auto &e: directory){ + start = 0; + string file = get<0>(e); + int lines = get<1>(e); + //cout << "[MAPLE] file: " << file << " - " << to_string(lines) << endl; + while (start < lines){ + s = file + "::" + to_string(start); + id = workerRing->locateClosestNode(s); + srand(time(NULL)); + vector<int> temp = randItems(1, fileList[file]); + string sender = hashRing->getValue(temp[0]); //because files are part of sdfs anyone can be the sender + string processor = workerRing->getValue(id); //processor is a maple worker + workerProcessing[processor].push_back(make_tuple(file, to_string(start))); + workerTasks[processor].insert(make_tuple(file, to_string(start))); + //cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl; + mapleSending[sender].push_back(make_tuple(file, to_string(start))); + string maplemsg = sender + "::" + processor + "::" + exe + "::" + s; + //sender, processor, exec, file, start + messagesToSend.push_back(maplemsg); + start = start + T_maples; + } + } + for (auto mapleMsg : messagesToSend) tcpServent->mapleMessages.push(mapleMsg); + break; + } - cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl; + case CHUNK: { + //processor, exec, sdfs, start + int end = stoi(inMsg[3]) + T_maples, start = stoi(inMsg[3]); + string starts = inMsg[3] + "," + to_string(end); + string localfile = "", sdfsFile = inMsg[2]; + if (localFilelist.find(inMsg[2]) != localFilelist.end()) localfile = localFilelist[inMsg[2]]; + else localfile = inMsg[2]; + + vector<string> unDirectory = splitString(sdfsFile, "-"); //get rid of timestamp + string fileDest = "tmp-"+inMsg[3]+"-"+sdfsFile.substr(unDirectory[0].size()+1); + + int lineCounter = -1, numbytes = 0, readLines = 0; + ifstream file(localfile); + string str; + while (getline(file, str) && (lineCounter < end - 1)) + { + lineCounter++; + if (lineCounter < start) continue; + numbytes += (str.size()); + readLines++; + } + //cout << "[SENDLINES] " << to_string(readLines) << endl; + file.clear(); // clear fail and eof bits + file.seekg(0); // back to the start! + + string toSend = localfile + "," + to_string(numbytes); + string header = sdfsFile + "," + inMsg[1] + "," + inMsg[3]; + //processor, return type, file dest, header, toSend, starts + string sendMsg = inMsg[0] + "::" + to_string(CHUNKACK) + "::" + fileDest + "::" + header + "::" + toSend + "::" + starts; + this->tcpServent->mergeMessages.push(sendMsg); + break; + } + + case CHUNKACK: { + //cout << "[CHUNKACK] receiving the put worked!" << endl; + //IP, exec, start, temp, sdfs file + if (!isLeader) { + //forward to know that the file was put okay + this->tcpServent->sendMessage(leaderIP, TCPPORT, msg.toString()); + string execName = EXEC_CMD + inMsg[1]; + if (runExecutable(execName, inMsg[3]) < 0) { cout << "[EXEC] ERROR" << endl; break;} + string ackStr = nodeInformation.ip + "::" + inMsg[4] + "::" + inMsg[2]; //IP, file, chunk + + Messages ackMsg(MAPLEACK, ackStr); + tcpServent->sendMessage(leaderIP, TCPPORT, ackMsg.toString()); + break; + } + //cout << "[CHUNKACK] leader confirming " << inMsg[4] << "::" << inMsg[2] << " was received" << endl; + vector<tuple<string,string>> temp; + for (auto &e : mapleSending[inMsg[0]]){ + if (get<0>(e).compare(inMsg[4]) == 0){ + if (get<1>(e).compare(inMsg[2]) == 0){ + continue; + } + } + temp.push_back(e); + } + if (temp.size()) mapleSending[inMsg[0]] = temp; + else mapleSending.erase(inMsg[0]); + break; + } + + case MAPLEACK: { + if (isLeader){ + vector<tuple<string,string>> temp; + cout << "[MAPLEACK] " << inMsg[0] << " processed " << inMsg[1] << "," << inMsg[2] << " | remaining: "; + for (auto &e : workerTasks[inMsg[0]]){ + if (!get<0>(e).compare(inMsg[1]) && !get<1>(e).compare(inMsg[2])){ + temp.push_back(e); + } + else cout << "(" << get<0>(e) << "," << get<1>(e) << ") | "; + } + cout << endl; + for (auto &e : temp) workerTasks[inMsg[0]].erase(e); + if (!workerTasks[inMsg[0]].size()) { + Messages outMsg(STARTMERGE, ""); + this->tcpServent->sendMessage(inMsg[0], TCPPORT, outMsg.toString()); + } + } + break; + } + + case STARTMERGE: { + if (!isLeader){ + auto entireDir = splitString(tcpServent->getDirMetadata(), ","); + int filesHere = entireDir.size() / 2; + Messages outMsg(STARTMERGE, nodeInformation.ip + "::" + to_string(filesHere)); + this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + string toProcess = ""; + int index = 0, range = entireDir.size(), partition = 10; + while (index < range){ + while ((index < range) && (partition > 0)){ + if (toProcess.size()) toProcess += ","; + toProcess += (entireDir[index] + "," + entireDir[index+1]); + index += 2; + partition--; + } + //cout << "[STARTMERGE] " << toProcess << endl; + string sendMsg = leaderIP + "::" + to_string(MAPLEACK) + "::" + "::" + "::" + toProcess + "::"; + this->tcpServent->mergeMessages.push(sendMsg); + partition = 10; + toProcess = ""; + } + } + else{ + filesAtWorker[inMsg[0]] = stoi(inMsg[1]); + } + break; + } + + case MERGECOMPLETE: { + struct dirent *entry = nullptr; + DIR *dp = nullptr; + string match = "tmp-" + inMsg[0] + "-"; + auto mergedFiles = splitString(inMsg[1], ","); + set<string> mergeSet; + string toInsert = ""; + for (auto &e: mergedFiles) mergeSet.insert(match + e); + filesAtWorker[inMsg[0]] -= mergedFiles.size(); + if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} + //cout << "[MERGECOMPLETE] processing files matching " << match << " and replacing with prefix: " << sdfsPre << ". Files left: " << to_string(filesAtWorker[inMsg[0]]) << endl; + while ((entry = readdir(dp))){ + if (mergeSet.count(entry->d_name)){ + string entryName(entry->d_name); + toInsert = entryName.substr(match.size()); + mapleKeys.insert(toInsert); + string mapleOutput = sdfsPre + toInsert; + ofstream keyFile; + keyFile.open(mapleOutput, ios::app); + ifstream toMerge(entry->d_name); + if (!toMerge.is_open() || !keyFile.is_open()) { + cout << "bad file permissions for " << entry->d_name << " and/or " << mapleOutput << endl; + mapleKeys.erase(toInsert); + break; + } + keyFile << toMerge.rdbuf(); + keyFile.close(); + remove(entry->d_name); + mergeSet.erase(entry->d_name); + } + } + if (filesAtWorker[inMsg[0]] <= 0){ + int oldSize = workerTasks.size(); + workerTasks.erase(inMsg[0]); + cout << "[WORKERS] " << to_string(oldSize) << " -> " << to_string(workerTasks.size()) << " remaining" << endl; + } + //Done with maple phase + if (!workerTasks.size()) { + auto it = mapleKeys.begin(); + while (it != mapleKeys.end()){ + string file = sdfsPre + (*it); + it++; + isBlackout = true; + updateFileList(file, hashRingPosition); + fileSizes[file] = make_tuple(-1, -1); + int closestNode = hashRing->locateClosestNode(file); + int pred = hashRing->getPredecessor(closestNode); + int succ = hashRing->getSuccessor(closestNode); + if (!hashRing->getValue(closestNode).compare(nodeInformation.ip)) { + closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + } + if (hashRing->getValue(pred).compare(nodeInformation.ip)==0) { + pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + } + if (hashRing->getValue(succ).compare(nodeInformation.ip)==0) { + succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + } + pendingRequests[file] = tuple<int, int, int>(closestNode, pred, succ); + pendingRequestSent[file] = tuple<int, int, int>(true, false, false); + pendingSenderRequests[file] = tuple<string, string, string>(nodeInformation.ip, "", ""); + string sendMsg = hashRing->getValue(closestNode)+"::"+file+"::"+file+"::"; + this->tcpServent->pendSendMessages.push(sendMsg); + } + cout << "[MAPLE] ------------ complete ---------- " << endl; + resetMapleJuice(); + } + break; + } + + case MERGEFAIL: { + cout << "[MERGEFAIL] retrying!!!!!!!!! at " << nodeInformation.ip << endl; + Messages outMsg(STARTMERGE, ""); + this->tcpServent->sendMessage(inMsg[0], TCPPORT, outMsg.toString()); + break; + } + + case PUTACK: { + if(inMsg.size() >= 4){ + string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; + string localfilename = inMsg[2], remoteLocalname = inMsg[3]; + //cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl; localFilelist[sdfsfilename] = localfilename; Messages outMsg(ACK, to_string(this->hashRingPosition)+"::"+sdfsfilename+"::"+remoteLocalname); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); @@ -998,14 +1297,13 @@ void Node::processRegMessages() } case DELETE: { if (isLeader) { - vector<string> inMsg = splitString(msg.payload, "::"); if(inMsg.size() >= 2){ - string inMsgIP = inMsg[0]; - string sdfsfilename = inMsg[1]; - - cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; + string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; + //cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; + if (localFilelist.find(sdfsfilename) != localFilelist.end()) remove(localFilelist[sdfsfilename].c_str()); localFilelist.erase(sdfsfilename); fileList.erase(sdfsfilename); + fileSizes.erase(sdfsfilename); // This is TCP, so we don't need to ACK } } @@ -1013,44 +1311,43 @@ void Node::processRegMessages() } case DNSGET: { if(isLeader){ - // Do replicating to the node - //isBlackout = true; - vector<string> inMsg = splitString(msg.payload, "::"); - cout << "msg.payload " << msg.payload << endl; + //cout << "msg.payload " << msg.payload << endl; if(inMsg.size() >= 4){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); int selectedNodePosition = nodePosition; - string sdfsfilename = inMsg[2]; - string localfilename = inMsg[3]; - cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl; + string sdfsfilename = inMsg[2], localfilename = inMsg[3]; + //cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl; vector<int> positions = fileList[sdfsfilename]; if (positions.size() == 0) { // the file is not available - cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl; + //cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl; fileList.erase(sdfsfilename); + fileSizes.erase(sdfsfilename); Messages outMsg(GETNULL, sdfsfilename+": the file is not available::"); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); - //isBlackout = false; break; } - cout << "[DNSGET] we have "; + unsigned seed = chrono::system_clock::now().time_since_epoch().count(); + shuffle (positions.begin(), positions.end(), default_random_engine(seed)); for (uint i=0; i<positions.size(); i++) { // pick any node other than the requested node - cout << positions[i] << " "; - if (positions[i]!=nodePosition) { + //cout << positions[i] << " "; + if (positions[i]!=nodePosition && positions[i]!=hashRingPosition) { selectedNodePosition = positions[i]; } } - cout << endl; - cout << "[DNSGET] we picks " << selectedNodePosition << endl; + if (selectedNodePosition==nodePosition) selectedNodePosition = hashRingPosition; + //cout << endl; + //cout << "[DNSGET] we picks " << selectedNodePosition << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(-1, -1, nodePosition); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, true, true); string nodeIP = hashRing->getValue(selectedNodePosition); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>("", "", nodeIP); Messages outMsg(REREPLICATEGET, to_string(nodePosition) + "::" + sdfsfilename+ "::" +localfilename); - cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos "; + //cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos "; cout << to_string(nodePosition) << endl; this->tcpServent->sendMessage(nodeIP, TCPPORT, outMsg.toString()); + } } break; @@ -1060,40 +1357,41 @@ void Node::processRegMessages() if(isLeader){ // Check hashring, get positions and send out DNS ANS isBlackout = true; - vector<string> inMsg = splitString(msg.payload, "::"); - if(inMsg.size() >= 4){ + if(inMsg.size() >= 7){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); string sdfsfilename = inMsg[2]; string localfilename = inMsg[3]; - - cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; - cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; - //this->localFilelist[sdfsfilename] = localfilename; + long int size = stol(inMsg[4]); + int lines = stoi(inMsg[5]); + string overwriteFilename = inMsg[6]; + //cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; + //cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; // update fileList, client itself is one of the replicas updateFileList(sdfsfilename, nodePosition); - hashRing->debugHashRing(); + fileSizes[sdfsfilename] = make_tuple(size, lines); + //hashRing->debugHashRing(); int closestNode = hashRing->locateClosestNode(sdfsfilename); int pred = hashRing->getPredecessor(closestNode); int succ = hashRing->getSuccessor(closestNode); if (hashRing->getValue(closestNode).compare(inMsgIP)==0) { closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - cout << "[DNS] we need one more node " << closestNode << endl; + //cout << "[DNS] we need one more node " << closestNode << endl; } if (hashRing->getValue(pred).compare(inMsgIP)==0) { pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - cout << "[DNS] we need one more node " << pred << endl; + //cout << "[DNS] we need one more node " << pred << endl; } if (hashRing->getValue(succ).compare(inMsgIP)==0) { succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - cout << "[DNS] we need one more node " << succ << endl; + //cout << "[DNS] we need one more node " << succ << endl; } - cout << "[DNS] we have nodes [" << closestNode << " (closestNode), "; - cout << pred << " (pred), " << succ << " (succ)], reply " << closestNode << endl; + //cout << "[DNS] we have nodes [" << closestNode << " (closestNode), "; + //cout << pred << " (pred), " << succ << " (succ)], reply " << closestNode << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(closestNode, pred, succ); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, false, false); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(inMsgIP, "", ""); - Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename); + Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename + "::" + overwriteFilename); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); } } @@ -1102,24 +1400,18 @@ void Node::processRegMessages() } case DNSANS:{ // Read the answer and send a PUT msg to dest - vector<string> inMsg = splitString(msg.payload, "::"); - if(inMsg.size() >= 3){ + if(inMsg.size() >= 4){ int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); //cout << "nodeIP " << nodeIP << endl; - - cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; - cout << " using localfilename: " << inMsg[1] << endl; - - string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"; + //cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP << " using localfilename: " << inMsg[1] << endl; + string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, inMsg[1], inMsg[2], ""); } break; } case REREPLICATEGET: { - vector<string> inMsg = splitString(msg.payload, "::"); if (inMsg.size() >= 3) { int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address @@ -1127,33 +1419,29 @@ void Node::processRegMessages() string sdfsfilename = inMsg[1]; string remoteLocalfilename = inMsg[2]; string localfilename = this->localFilelist[sdfsfilename]; - cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; - cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; + //cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; + //cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); } break; } case REREPLICATE:{ // Read the answer and send a PUT msg to dest - vector<string> inMsg = splitString(msg.payload, "::"); if (inMsg.size() >= 2) { int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); string sdfsfilename = inMsg[1]; string localfilename = this->localFilelist[sdfsfilename]; - cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; - cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; + //cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; + //cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, ""); } break; } case GETNULL: { - vector<string> inMsg = splitString(msg.payload, "::"); if (inMsg.size() >= 1) { cout << "[GETNULL] " << inMsg[0] << endl; } @@ -1161,16 +1449,14 @@ void Node::processRegMessages() } case ACK:{ - vector<string> inMsg = splitString(msg.payload, "::"); if (inMsg.size() >= 3) { string nodePosition = inMsg[0]; string sdfsfilename = inMsg[1]; string localfilename = inMsg[2]; localFilelist[sdfsfilename] = localfilename; - Messages outMsg(LEADERACK, this->nodeInformation.ip + "::" + to_string(this->hashRingPosition) + "::" + msg.payload); - cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename; - cout << " on node " << nodePosition << ", and ACK back to the leader" << endl; + //cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename; + //cout << " on node " << nodePosition << ", and ACK back to the leader" << endl; this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); } @@ -1178,8 +1464,6 @@ void Node::processRegMessages() } case LEADERACK:{ if(isLeader){ - //TODO: tick the list off - vector<string> inMsg = splitString(msg.payload, "::"); if(inMsg.size() >= 4){ string inMsgIP = inMsg[0]; int inMsgnodePosition = stoi(inMsg[1]); @@ -1187,7 +1471,7 @@ void Node::processRegMessages() string sdfsfilename = inMsg[3]; string replicatedNodeIP = hashRing->getValue(nodePosition); - cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl; + //cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl; string closestNodeIP = ""; // update fileList @@ -1195,44 +1479,44 @@ void Node::processRegMessages() updateFileList(sdfsfilename, nodePosition); vector<int> temp; - cout << "pendingRequests: "; + //cout << "pendingRequests: "; if (get<0>(pendingRequests[sdfsfilename]) == nodePosition) { closestNodeIP = hashRing->getValue(get<0>(pendingRequests[sdfsfilename])); temp.push_back(-1); } else { temp.push_back(get<0>(pendingRequests[sdfsfilename])); } - cout << temp[0] << " (sent: " << get<0>(pendingRequestSent[sdfsfilename]); - cout << ", from " << get<0>(pendingSenderRequests[sdfsfilename]) << "), "; + //cout << temp[0] << " (sent: " << get<0>(pendingRequestSent[sdfsfilename]); + //cout << ", from " << get<0>(pendingSenderRequests[sdfsfilename]) << "), "; if (get<1>(pendingRequests[sdfsfilename]) == nodePosition) { temp.push_back(-1); } else { temp.push_back(get<1>(pendingRequests[sdfsfilename])); } - cout << temp[1] << " (sent: " << get<1>(pendingRequestSent[sdfsfilename]); - cout << ", from " << get<1>(pendingSenderRequests[sdfsfilename]) << "), "; + //cout << temp[1] << " (sent: " << get<1>(pendingRequestSent[sdfsfilename]); + //cout << ", from " << get<1>(pendingSenderRequests[sdfsfilename]) << "), "; if (get<2>(pendingRequests[sdfsfilename]) == nodePosition) { temp.push_back(-1); } else { temp.push_back(get<2>(pendingRequests[sdfsfilename])); } - cout << temp[2] << " (sent:" << get<2>(pendingRequestSent[sdfsfilename]); - cout << ", from " << get<2>(pendingSenderRequests[sdfsfilename]) << ")" << endl; + //cout << temp[2] << " (sent:" << get<2>(pendingRequestSent[sdfsfilename]); + //cout << ", from " << get<2>(pendingSenderRequests[sdfsfilename]) << ")" << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(temp[0], temp[1], temp[2]); if(get<1>(pendingRequests[sdfsfilename]) == -1 && get<2>(pendingRequests[sdfsfilename])== -1){ pendingRequests.erase(sdfsfilename); pendingRequestSent.erase(sdfsfilename); pendingSenderRequests.erase(sdfsfilename); - cout << "[LEADERACK] 3 or more Replicated files are done" << endl; + //cout << "[LEADERACK] 3 or more Replicated files are done" << endl; isBlackout = false; break; } if((get<1>(pendingRequests[sdfsfilename])!=-1) && (!get<1>(pendingRequestSent[sdfsfilename]))){ Messages outMsg(REREPLICATE, to_string(get<1>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); // cout << "Sending out rereplicate to " << inMsgIP << "with message " << outMsg.toString() << endl; - cout << "[LEADERACK] Ask node incoming " << inMsgIP << " to replicate on pos "; - cout << to_string(get<1>(pendingRequests[sdfsfilename])) << endl; + //cout << "[LEADERACK] Ask node incoming " << inMsgIP << " to replicate on pos "; + //cout << to_string(get<1>(pendingRequests[sdfsfilename])) << endl; this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), true, get<2>(pendingRequestSent[sdfsfilename])); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), inMsgIP, get<2>(pendingSenderRequests[sdfsfilename])); @@ -1240,8 +1524,8 @@ void Node::processRegMessages() if((get<2>(pendingRequests[sdfsfilename]) != -1) && (!get<2>(pendingRequestSent[sdfsfilename]))){ Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); // cout << "Sending out rereplicate to " << closestNodeIP << "with message " << outMsg.toString() << endl; - cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos "; - cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl; + //cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos "; + //cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl; this->tcpServent->sendMessage(closestNodeIP, TCPPORT, outMsg.toString()); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), get<1>(pendingRequestSent[sdfsfilename]), true); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), get<1>(pendingSenderRequests[sdfsfilename]), inMsgIP); @@ -1257,28 +1541,6 @@ void Node::processRegMessages() } } -/** - * Store the given filename in your sdfs filename, discard the original name, and - * give it a new name. The hashing will be done based on this sdfs filename. - * - * Can be called by any node, this one will be called by sender - * -*/ -// int Node::putFileSender(string filename, string sdfsfilename){ -// tcpServent->sendFile(leaderIP, leaderPort, filename); -// return 0; - -// } - -// int Node::putFileMaster(string sdfsfilename){ -// return 0; -// } - -// int Node::putFileReeiver(string sdfsfilename){ -// return 0; - -// } - void Node::listLocalFiles(){ cout << "sdfsfilename ---> localfilename" << endl; for (auto& element: localFilelist) { @@ -1286,22 +1548,6 @@ void Node::listLocalFiles(){ } } -void Node::debugSDFSFileList() { - cout << "sdfsfilename ---> positions,..." << endl; - for (auto& element: fileList) { - cout << element.first << " ---> "; - for (uint i=0; i<element.second.size(); i++) { - cout << element.second[i]; - if (i == element.second.size()-1) { - continue; - } else { - cout << ", "; - } - } - cout << endl; - } -} - void Node::listSDFSFileList(string sdfsfilename) { bool found = false; vector<int> foundPositions; @@ -1313,22 +1559,25 @@ void Node::listSDFSFileList(string sdfsfilename) { } } if (found) { + cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; if (foundPositions.size() > 0) { - cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; - cout << "=========" << endl; for (uint i=0; i<foundPositions.size(); i++) { string storedIP = hashRing->getValue(foundPositions[i]); cout << storedIP << " at " << foundPositions[i] << endl; } - } else { - cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; - cout << "=== Current list is empty ===" << endl; - } + } else { cout << "=== Current list is empty ===" << endl; } } else { cout << "sdfsfilename " << sdfsfilename << " is not existed" << endl; } } +/* + * Leader sends out fileList in the following string format: + * first 2 bytes are filename len, FILENAME msg type, filename itself, + * 2 bytes for the number of positions the file has, FILEPOSITION msg type, + * and a string of a commas seperated list of positions following that, ending in null byte. + * All files are encapsulated in this way and joined to make one string +*/ string Node::encapsulateFileList() { string enMeg = ""; @@ -1345,31 +1594,49 @@ string Node::encapsulateFileList() } //cout << "sdfsfilename " << sdfsfilename << endl; //cout << "positions " << positions << endl; - char *cstr = new char[sdfsfilename.length()+positions.length()+7]; + string size = to_string(get<0>(fileSizes[sdfsfilename])) + "," + to_string(get<1>(fileSizes[sdfsfilename])); + char *cstr = new char[sdfsfilename.length()+positions.length()+size.length()+3+3+3+1]; size_t len = sdfsfilename.length()+3; - cstr[0] = len & 0xff; - cstr[1] = (len >> 8) & 0xff; - if (cstr[1] == 0) { // avoid null - cstr[1] = 0xff; + int index = 0; + cstr[index++] = len & 0xff; + cstr[index] = (len >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; } + index++; //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); - cstr[2] = FILENAME; + cstr[index++] = FILENAME; for (uint i=0; i<sdfsfilename.length(); i++) { - cstr[i+3] = sdfsfilename.c_str()[i]; + cstr[index+i] = sdfsfilename.c_str()[i]; } + index += sdfsfilename.length(); size_t len2 = positions.length()+3; - cstr[sdfsfilename.length()+3] = len2 & 0xff; - cstr[sdfsfilename.length()+4] = (len2 >> 8) & 0xff; - if (cstr[sdfsfilename.length()+4] == 0) { // avoid null - cstr[sdfsfilename.length()+4] = 0xff; + cstr[index++] = len2 & 0xff; + cstr[index] = (len2 >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; } + index++; //printf("cstr[3] %x, cstr[4] %x\n", cstr[0], cstr[1]); - cstr[sdfsfilename.length()+5] = FILEPOSITIONS; + cstr[index++] = FILEPOSITIONS; //printf("cstr[%lu] %d\n", sdfsfilename.length()+2, cstr[sdfsfilename.length()+2]); for (uint i=0; i<positions.length(); i++) { - cstr[sdfsfilename.length()+6+i] = positions.c_str()[i]; + cstr[index+i] = positions.c_str()[i]; + } + index += positions.length(); + size_t len3 = size.length()+3; + cstr[index++] = len3 & 0xff; + cstr[index] = (len3 >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; + } + index++; + cstr[index++] = FILESIZE; + for (uint i=0; i<size.length(); i++) { + cstr[index+i] = size.c_str()[i]; } - cstr[sdfsfilename.length()+positions.length()+6] = '\0'; + index += size.length(); + cstr[index] = '\0'; //printf("cstrFile %s\n", cstr); string enMegFile(cstr); //cout << "enMegFile " << enMegFile << endl; @@ -1380,6 +1647,7 @@ string Node::encapsulateFileList() return enMeg; } +//(len, PayloadType, message, \0) encoding where len is 2 bytes. string Node::encapsulateMessage(map<PayloadType,string> payloads) { string enMeg = ""; @@ -1390,7 +1658,6 @@ string Node::encapsulateMessage(map<PayloadType,string> payloads) //cout << "message " << message << endl; //cout << "message.length " << message.length() << endl; //cout << "type " << type << endl; - char *cstr = new char[message.length()+4]; size_t len = message.length()+3; cstr[0] = len & 0xff; @@ -1419,6 +1686,7 @@ void Node::decapsulateFileList(string payload) int size = payload.length(); uint pos = 0; fileList.clear(); + fileSizes.clear(); string lastFilename = ""; while (size > 0) { size_t length; @@ -1455,6 +1723,11 @@ void Node::decapsulateFileList(string payload) fileList[lastFilename] = positions; break; } + case FILESIZE: { + vector<string> temp = splitString(deMegPart, ","); + fileSizes[lastFilename] = make_tuple(stol(temp[0]),stoi(temp[1])); + break; + } default: break; } @@ -1472,8 +1745,9 @@ void Node::decapsulateFileList(string payload) } } for (uint i=0; i<fileToDelete.size(); i++) { + if (localFilelist.find(fileToDelete[i]) != localFilelist.end()) remove(localFilelist[fileToDelete[i]].c_str()); localFilelist.erase(fileToDelete[i]); - cout << "[DELETE] sdfsfilename " << fileToDelete[i] << endl; + //cout << "[DELETE] sdfsfilename " << fileToDelete[i] << endl; } } } @@ -1537,9 +1811,12 @@ string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) return enMeg; } -void Node::findNodesWithFile(string sdfsfilename){ - /*tuple<int, int, int> nodes = fileList[sdfsfilename]; - cout << hashRing->getValue(get<0>(nodes)) << endl; - cout << hashRing->getValue(get<1>(nodes)) << endl; - cout << hashRing->getValue(get<2>(nodes)) << endl;*/ +void Node::resetMapleJuice(){ + cleanupTmpFiles("tmp-"); + workerProcessing.clear(); + workerTasks.clear(); + mapleSending.clear(); + filesAtWorker.clear(); + workerRing->clear(); + isJuicePhase = false; } -- GitLab