#include "../inc/Node.h" //TODO, single file for each key, and ussing an append to each replica of it? //keep a local map, if you find a new key, just let the master know about it and hold off on results in buffer until it exists Node::Node(){ udpServent = new UdpSocket(); tcpServent = new TcpSocket(); hashRing = new HashRing(); mapleRing = new HashRing(); localTimestamp = 0; heartbeatCounter = 0; runningMode = ALL2ALL; activeRunning = false; prepareToSwitch = false; logWriter = new Logger(LOGGING_FILE_NAME); leaderPosition = -1; proposedTime = 0; electedTime = 0; joinTimestamp = ""; mapleExe = ""; sdfsPre = ""; possibleSuccessorIP = ""; leaderIP = ""; leaderPort = ""; isBlackout = true; struct sigaction sa; sa.sa_handler = sigchld_handler; // reap all dead processes sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; if (sigaction(SIGCHLD, &sa, NULL) == -1) { perror("sigaction"); exit(1); } } Node::Node(ModeType mode) : Node() { runningMode = mode; } void Node::startActive() { membershipList.clear(); restartElection(); // inserting its own into the list time(&startTimestamp); string startTime = updateNodeHeartbeatAndTime(); debugMembershipList(this); joinTimestamp = startTime; // for hashRing getPositionOnHashring(); // update its hashRingPosition } string Node::updateNodeHeartbeatAndTime() { string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); tuple<string, string, string> keyTuple(nodeInformation.ip, nodeInformation.port,startTime); tuple<int, int, int> valueTuple(heartbeatCounter, localTimestamp, 0); this->membershipList[keyTuple] = valueTuple; return startTime; } string Node::populateMembershipMessage() { //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag string mem_list_to_send = ""; switch (this->runningMode) { case GOSSIP: return populateIntroducerMembershipMessage(); // code re-use default: string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); mem_list_to_send += nodeInformation.ip + "," + nodeInformation.port + "," + startTime + ","; mem_list_to_send += to_string(heartbeatCounter) + "," + to_string(0) + "\n"; return mem_list_to_send; } } string Node::populateIntroducerMembershipMessage() { string mem_list_to_send = ""; for (auto& element: this->membershipList) { tuple<string, string, string> keyTuple = element.first; tuple<int, int, int> valueTuple = element.second; mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; } return mem_list_to_send; } int Node::heartbeatToNode() { string msg; string mem_list_to_send = populateMembershipMessage(); vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); #ifdef LOG_VERBOSE cout << "pick " << targetNodes.size() << " of " << this->membershipList.size()-1; cout << " members" << endl; #endif for (uint i=0; i<targetNodes.size(); i++) { Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); #ifdef LOG_VERBOSE cout << "[Gossip]" << message.c_str() << endl; #endif this->logWriter->printTheLog(GOSSIPTO, message); if (isLeader) { if (isBlackout) msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); else msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); } else msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); udpServent->sendMessage(destination.ip, destination.port, msg); } return 0; } int Node::failureDetection(){ //1. check local membership list for any timestamps whose curr_time - timestamp > T_fail //2. If yes, mark node as local failure, update fail flag to 1 and update timestamp to current time //3. for already failed nodes, check to see if curr_time - time stamp > T_cleanup //4. If yes, remove node from membership list vector<tuple<string,string,string>> removedVec; for(auto& element: this->membershipList){ tuple<string,string,string> keyTuple = element.first; tuple<int, int, int> valueTuple = element.second; #ifdef LOG_VERBOSE cout << "checking " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; #endif if ((get<0>(keyTuple).compare(nodeInformation.ip) == 0) && (get<1>(keyTuple).compare(nodeInformation.port) == 0)) { #ifdef LOG_VERBOSE cout << "do not check itself" << endl; #endif continue; } //node has not failed if(get<2>(valueTuple) == 0){ //timeout passed, set as failed if(localTimestamp - get<1>(valueTuple) > T_timeout){ //cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; //cout << "local time " << localTimestamp << " vs. " << get<1>(valueTuple) << endl; get<1>(this->membershipList[keyTuple]) = localTimestamp; get<2>(this->membershipList[keyTuple]) = 1; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple)+": Local Failure"; cout << "[FAIL]" << message.c_str() << endl; this->logWriter->printTheLog(FAIL, message); if(isLeader){ Member deletedNode(get<0>(keyTuple), get<1>(keyTuple)); int deletedNodePostion = hashingId(deletedNode, get<2>(keyTuple)); hashRing->removeNode(deletedNodePostion); //for each file, remove the deleted node from its location vector for (auto& element: fileList) { vector<int> newEntry; for(unsigned int i = 0; i < element.second.size(); i++){ if(element.second[i] != deletedNodePostion){ newEntry.push_back(element.second[i]); } } fileList[element.first] = newEntry; } ////////////////////////////////////////////////////// //1) remove from HashRing //2) if processing, reassign //2a) if no extra nodes, assign to successor, else add new node to ring //3) if a sender, reassign replica holders as new senders for each thing sent vector<tuple<string,string,string>> aliveNodes; for (auto &e : membershipList) aliveNodes.push_back(e.first); vector<tuple<string,string,string>> mapleNodes; int nextId; if ((mapleRing->nodePositions.size()-1) == hashRing->nodePositions.size()){ nextId = mapleRing->getSuccessor(deletedNodePostion); } else { while (1){ Member m(get<0>(mapleNodes[0]), get<1>(mapleNodes[0])); nextId = hashingId(m, get<2>(mapleNodes[0])); mapleNodes = randItems(1, aliveNodes); if (mapleRing->getValue(nextId).compare("No node found") != 0) continue; break; } mapleRing->addNode(get<0>(mapleNodes[0]), nextId); } mapleRing->removeNode(deletedNodePostion); auto vecCopy(mapleProcessing[get<0>(keyTuple)]); mapleProcessing[get<0>(mapleNodes[0])] = vecCopy; mapleProcessing.erase(get<0>(keyTuple)); for (auto &e : mapleSending[get<0>(keyTuple)]){ vector<int> temp = randItems(1, fileList[get<0>(e)]); mapleSending[hashRing->getValue(temp[0])].push_back(make_tuple(get<0>(e), get<1>(e))); //mapleRing->getValue(id) + "::" maple_exe + ":: " + s + "::" + sdfs_pre; string mapleS = hashRing->getValue(temp[0]) + "::" + mapleExe + "::" + get<0>(e)+ "::" + get<1>(e)+ "::" + sdfsPre; tcpServent->mapleMessages.push(mapleS); } // chech if the failure is the sender in pending requests for (auto& senders: pendingSenderRequests) { string sdfsfilename = senders.first; tuple<string,string,string> sender = senders.second; if ((get<0>(keyTuple).compare(get<0>(sender))==0) && get<0>(pendingRequestSent[sdfsfilename]) && (get<0>(pendingRequests[sdfsfilename])!=-1)) { // it sent out, and is not finished, cannot help // we lost the data from the client cout << "[PUT] client itself fails, we cannot help, remove request" << endl; isBlackout = false; pendingRequests.erase(sdfsfilename); pendingRequestSent.erase(sdfsfilename); continue; } if ((get<0>(keyTuple).compare(get<1>(sender))==0) && get<1>(pendingRequestSent[sdfsfilename]) && (get<1>(pendingRequests[sdfsfilename])!=-1)) { // the sender fails during 2nd pass // replace the sent cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; if (get<2>(pendingRequests[sdfsfilename])!=-1) { tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename])); } else { pendingRequests.erase(sdfsfilename); } continue; } if ((get<0>(keyTuple).compare(get<2>(sender))==0) && get<2>(pendingRequestSent[sdfsfilename]) && (get<2>(pendingRequests[sdfsfilename])!=-1)) { // it sent out, but replicates are failed // restart again cout << "[PUT/REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; pendingRequests.erase(sdfsfilename); } } } } } //check for cleanup on already failed nodes else{ if(localTimestamp - get<1>(valueTuple) > T_cleanup){ auto iter = this->membershipList.find(keyTuple); if (iter != this->membershipList.end()) { removedVec.push_back(keyTuple); } } } } // O(c*n) operation, but it ensures safety bool leaderRemoved = false; for (uint i=0; i<removedVec.size(); i++) { auto iter = this->membershipList.find(removedVec[i]); if (iter != this->membershipList.end()) { if (leaderIP.compare(get<0>(removedVec[i]))==0) { // this is the leader leaderRemoved = true; cout << "[ELECTION] leader " << leaderIP << " is removed" << endl; } this->membershipList.erase(iter); string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(removedVec[i])+"/"+get<1>(removedVec[i])+"/"+get<2>(removedVec[i])+": REMOVED FROM LOCAL MEMBERSHIP LIST"; cout << "[REMOVE]" << message.c_str() << endl; this->logWriter->printTheLog(REMOVE, message); //debugMembershipList(this); } } if (this->membershipList.size()==1 || leaderRemoved) { // Only me or leader failed, restart leader election if (checkLeaderExist()) { // restart if we have a leader restartElection(); } } return 0; } int Node::joinSystem(Member introducer) { string mem_list_to_send = populateMembershipMessage(); string msg = populateSDFSFileList(JOIN, mem_list_to_send); string message = "["+to_string(this->localTimestamp)+"] sent a request to "+introducer.ip+"/"+introducer.port+", I am "+nodeInformation.ip+"/"+nodeInformation.port; cout << "[JOIN]" << message.c_str() << endl; this->logWriter->printTheLog(JOINGROUP, message); udpServent->sendMessage(introducer.ip, introducer.port, msg); return 0; } int Node::requestSwitchingMode() { string message = nodeInformation.ip+","+nodeInformation.port; string msg = populateSDFSFileList(SWREQ, message); for(auto& element: this->membershipList) { tuple<string,string,string> keyTuple = element.first; cout << "[SWITCH] sent a request to " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << endl; udpServent->sendMessage(get<0>(keyTuple), get<1>(keyTuple), msg); } return 0; } int Node::SwitchMyMode() { sleep(T_switch); // wait for a while udpServent->qMessages = queue<string>(); // empty all messages switch (this->runningMode) { case GOSSIP: { this->runningMode = ALL2ALL; cout << "[SWITCH] === from gossip to all-to-all ===" << endl; break; } case ALL2ALL: { this->runningMode = GOSSIP; cout << "[SWITCH] === from all-to-all to gossip ===" << endl; break; } default: break; } prepareToSwitch = false; // finishing up return 0; } int Node::listenToHeartbeats() { // 1. deepcopy and handle queue queue<string> qCopy(udpServent->qMessages); udpServent->qMessages = queue<string>(); int size = qCopy.size(); //cout << "Got " << size << " messages in the queue" << endl; //cout << "checking queue size " << nodeOwn->udpServent->qMessages.size() << endl; for (int j = 0; j < size; j++) { //cout << qCopy.front() << endl; handleUdpMessage(qCopy.front()); if(this->activeRunning == false) return 0; qCopy.pop(); } return 0; } /* * Take a hearbeat message, if the member doesn't exist add it, update hashring, and disseminate out memberList * If it exists, check for failure, and if there is update fail flag, otherwise try ot update heartbeat */ void Node::processHeartbeat(string message) { bool changed = false; vector<string> incomingMembershipList = splitString(message, "\n"); vector<string> membershipListEntry; for(string list_entry: incomingMembershipList){ #ifdef LOG_VERBOSE cout << "handling with " << list_entry << endl; #endif if (list_entry.size() == 0) { continue; } membershipListEntry.clear(); membershipListEntry = splitString(list_entry, ","); if (membershipListEntry.size() != 5) continue; int incomingHeartbeatCounter = stoi(membershipListEntry[3]); int failFlag = stoi(membershipListEntry[4]); tuple<string,string,string> mapKey(membershipListEntry[0], membershipListEntry[1], membershipListEntry[2]); if ((get<0>(mapKey).compare(nodeInformation.ip) == 0) && (get<1>(mapKey).compare(nodeInformation.port) == 0)) { // Volunteerily leave if you are marked as failed if(failFlag == 1){ this->activeRunning = false; string message = "["+to_string(this->localTimestamp)+"] node "+this->nodeInformation.ip+"/"+this->nodeInformation.port+" is left"; cout << "[VOLUNTARY LEAVE]" << message.c_str() << endl; this->logWriter->printTheLog(LEAVE, message); return; } #ifdef LOG_VERBOSE cout << "do not check itself heartbeat" << endl; #endif continue; } map<tuple<string,string,string>, tuple<int, int, int>>::iterator it; it = this->membershipList.find(mapKey); if (it == this->membershipList.end() && failFlag == 0) { tuple<int, int, int> valueTuple(incomingHeartbeatCounter, localTimestamp, failFlag); this->membershipList[mapKey] = valueTuple; updateHashRing(); string message = "["+to_string(this->localTimestamp)+"] new node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" is joined"; cout << "[JOIN]" << message.c_str() << endl; this->logWriter->printTheLog(JOINGROUP, message); changed = true; } else if(it != this->membershipList.end()) { // update heartbeat count and local timestamp if fail flag of node is not equal to 1. If it equals 1, we ignore it. if(get<2>(this->membershipList[mapKey]) != 1){ //if incoming membership list has node with fail flag = 1, but fail flag in local membership list = 0, we have to set fail flag = 1 in local switch (this->runningMode) { case GOSSIP: { if(failFlag == 1){ get<2>(this->membershipList[mapKey]) = 1; get<1>(this->membershipList[mapKey]) = localTimestamp; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+": Disseminated Failure"; cout << "[FAIL]" << message.c_str() << endl; this->logWriter->printTheLog(FAIL, message); } else{ int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]); if(incomingHeartbeatCounter > currentHeartbeatCounter){ get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; get<1>(this->membershipList[mapKey]) = localTimestamp; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); #ifdef LOG_VERBOSE cout << "[UPDATE]" << message.c_str() << endl; #endif this->logWriter->printTheLog(UPDATE, message); } } break; } default: { // ALL2ALL doesn't disseminate int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]); if(incomingHeartbeatCounter > currentHeartbeatCounter){ get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; get<1>(this->membershipList[mapKey]) = localTimestamp; get<2>(this->membershipList[mapKey]) = failFlag; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); #ifdef LOG_VERBOSE cout << "[UPDATE]" << message.c_str() << endl; #endif this->logWriter->printTheLog(UPDATE, message); } break; } } } } } // If membership list changed in all-to-all, full membership list will be sent if(changed && this->runningMode == ALL2ALL) heartbeatToNode(); } void Node::setUpLeader(string message, bool pending) { string msg(message); vector<string> fields = splitString(msg, ","); if(fields.size() >= 3){ Member leader(fields[0], fields[1]); leaderPosition = hashingId(leader, fields[2]); leaderIP = fields[0]; leaderPort = fields[1]; } leaderCreateHashRing(); // local copy of hashRing on each node if (pending != isBlackout) { if (isBlackout) { cout << "[BLACKOUT] Leader is ready now" << endl; } else { cout << "[BLACKOUT] Leader is busy now" << endl; } } if (pending) { isBlackout = true; } else { isBlackout = false; } } /** * given a string message which contains a membership list, we will take the string, split it by returns, and then split it by commas, to then compare the heartbeat counters * of each IP,PORT,timestamp tuple with the membership list of the receiving Node. * Found help on how to do string processing part of this at https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c */ void Node::handleUdpMessage(string message){ //cout << "handleUdpMessage " << message << endl; string deMeg = decapsulateMessage(message); bool pending = true; //cout << "handleUdpMessage deMeg " << deMeg << endl; Messages msg(deMeg); switch (msg.type) { case LEADERHEARTBEAT: // Note: not for Gossip-style, only All-to-All //cout << "LEADERHEARTBEAT: " << msg.payload << endl; pending = false; case LEADERPENDING: setUpLeader(msg.payload, pending); case HEARTBEAT: case JOINRESPONSE:{ processHeartbeat(msg.payload); break; } case JOIN:{ // introducer checks collision here vector<string> fields = splitString(msg.payload, ","); if(fields.size() >= 3){ Member member(fields[0], fields[1]); int checkPosition = hashingId(member, fields[2]); if (checkHashNodeCollision(checkPosition)) { string response = populateSDFSFileList(JOINREJECT, ""); udpServent->sendMessage(fields[0], fields[1], response); } else { string introducerMembershipList; introducerMembershipList = populateIntroducerMembershipMessage(); string response = populateSDFSFileList(JOINRESPONSE, introducerMembershipList); udpServent->sendMessage(fields[0], fields[1], response); } } break; } case SWREQ: { // got a request, send an ack back vector<string> fields = splitString(msg.payload, ","); if (fields.size() == 2) { cout << "[SWITCH] got a request from "+fields[0]+"/"+fields[1] << endl; string messageReply = nodeInformation.ip+","+nodeInformation.port; //Messages msgReply(SWRESP, messageReply); string msgReply = populateSDFSFileList(SWRESP, messageReply); udpServent->sendMessage(fields[0], fields[1], msgReply); prepareToSwitch = true; } break; } case SWRESP: { // got an ack vector<string> fields = splitString(msg.payload, ","); if (fields.size() == 2) { cout << "[SWITCH] got an ack from "+fields[0]+"/"+fields[1] << endl; } break; } case JOINREJECT: { // TODO: the node should leave cout << "[JOINREJECT] There is a collision, and I have to leave..." << endl; this->activeRunning = false; pthread_exit(NULL); break; } default: break; } //debugMembershipList(this); } int Node::getPositionOnHashring(){ hashRingPosition = hashingId(nodeInformation, joinTimestamp); cout << "[ELECTION] This node is at hash position: " << hashRingPosition << endl; return 0; } int Node::updateHashRing(){ bool needToUpdate = true; for(auto& it: membershipList){ needToUpdate = true; string ip = get<0>(it.first); for(int i: hashRing->nodePositions){ if(ip.compare(hashRing->getValue(i)) == 0){ needToUpdate = false; break; } } if(needToUpdate){ Member toBeInserted(ip, get<1>(it.first)); int hashPosition = hashingId(toBeInserted, get<2>(it.first)); hashRing->addNode(ip, hashPosition); } } return 0; } bool Node::checkLeaderExist() { return leaderPosition != -1; } bool Node::checkHashNodeCollision(int checkPosition) { // if True, the position is full for (auto& element: this->membershipList) { tuple<string, string, string> keyTuple = element.first; Member member(get<0>(keyTuple), get<1>(keyTuple)); if (nodeInformation.ip.compare(member.ip)==0) { // myself, skip it continue; } int pos = hashingId(member, get<2>(keyTuple)); if (pos == checkPosition) { return true; } } return false; } bool Node::findWillBeLeader() { bool beLeader = true; vector<int> positions; vector<string> ipAddresses; if (membershipList.size() > 1) { // only 1 member does not need the leader for (auto& element: this->membershipList) { tuple<string, string, string> keyTuple = element.first; Member member(get<0>(keyTuple), get<1>(keyTuple)); int pos = hashingId(member, get<2>(keyTuple)); if (pos < hashRingPosition) { //cout << get<0>(keyTuple) << " with id " << pos << " is smaller" << endl; beLeader = false; } if (nodeInformation.ip.compare(get<0>(keyTuple))!=0) { int posNext = (pos + (HASHMODULO-hashRingPosition)) % HASHMODULO; positions.push_back(posNext); ipAddresses.push_back(get<0>(keyTuple)); } } } else { beLeader = false; } if (positions.size() > 0) { int index = 0; int possibleSuccessor = positions[index]; for (uint i=1; i<positions.size(); i++) { if (positions[i] < possibleSuccessor) { possibleSuccessor = positions[i]; index = i; } } //cout << "[ELECTION] My Possible Successor is " << ipAddresses[index] << endl; possibleSuccessorIP = ipAddresses[index]; } return beLeader; } void Node::restartElection() // haven't tested yet { cout << "[ELECTION] No leader now, restart election..." << endl; electedTime = localTimestamp; isLeader = false; leaderPosition = -1; leaderIP = ""; leaderPort = ""; } void Node::leaderCreateHashRing() { hashRing->clear(); for (auto& element: this->membershipList) { // update hashRing tuple<string, string, string> keyTuple = element.first; Member member(get<0>(keyTuple), get<1>(keyTuple)); int pos = hashingId(member, get<2>(keyTuple)); hashRing->addNode(get<0>(keyTuple), pos); } } void Node::proposeToBeLeader() { Messages msg(ELECTION, to_string(hashRingPosition)); cout << "[ELECTION] Propose to be leader, send to " << possibleSuccessorIP << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); } void Node::electionMessageHandler(Messages messages) { switch (messages.type) { case ELECTION: { // check id int currentId = stoi(messages.payload); if (hashRingPosition > currentId) { //incoming is smaller, just forward cout << "[ELECTION] Got Election, agree on voting: " << messages.payload << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString()); } else if (hashRingPosition < currentId) { //incoming is biger, replace and send it cout << "[ELECTION] Got Election, against this voting " << messages.payload; cout << ", and using my id " << hashRingPosition << endl; Messages msg(ELECTION, to_string(hashRingPosition)); tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); } else { // finish 1st pass cout << "[ELECTION] Got Election, everyone voted on me and start acking" << endl; Messages msg(ELECTIONACK, to_string(hashRingPosition)); tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); } break; } case ELECTIONACK: { int currentId = stoi(messages.payload); if (hashRingPosition == currentId) { // finish 2 pass cout << "[ELECTION] I am the leader now" << endl; isBlackout = false; leaderPosition = hashRingPosition; isLeader = true; leaderIP = nodeInformation.ip; leaderPort = nodeInformation.port; leaderCreateHashRing(); } else { // Not me, just forward cout << "[ELECTION] Pass ACK " << messages.payload << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString()); } electedTime = localTimestamp; // update elected time cout << "[ELECTION] Elected at Local Time " << electedTime << endl; break; } default: break; } } void Node::tcpElectionProcessor() { queue<string> qCopy(tcpServent->qMessages); tcpServent->qMessages = queue<string>(); //cout << "Got " << size << " TCP messages" << endl; for (int j=0; j<qCopy.size(); j++) { //cout << qCopy.front() << endl; Messages msg(qCopy.front()); //cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { case ELECTION: case ELECTIONACK: electionMessageHandler(msg); default: break; } qCopy.pop(); } } void Node::updateFileList(string sdfsfilename, int nodePosition) { if (isLeader) { vector<int> positions = fileList[sdfsfilename]; bool existed = false; for (uint i=0; i<positions.size(); i++) { if (positions[i] == nodePosition) { existed = true; } } if (!existed) { positions.push_back(nodePosition); } vector<int> storedPositionsCopy(positions); fileList[sdfsfilename] = storedPositionsCopy; } } void Node::checkFileListConsistency(){ if (!isLeader) return; if (membershipList.size() < 4) { cout << "[ERROR] The number of members are too small, we need at least 4" << endl; return; } for (auto& element: fileList) { if(element.second.size() < 4){ //Need to rereplicate --> do this one at a time //First check the closest node, successor and predecessor int closestNodePostion = hashRing->locateClosestNode(element.first); int pred = hashRing->getPredecessor(closestNodePostion); int succ = hashRing->getSuccessor(closestNodePostion); int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ)); vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode}; for(unsigned int i = 0; i < nodesToCheck.size(); i++){ if (!isInVector(element.second, nodesToCheck[i])) { string nodeInfo = hashRing->getValue(nodesToCheck[i]); Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); tuple<int, int, int> request = pendingRequests[element.first]; if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; break; } pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); break; } } } } } vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() { vector<tuple<string, string, string>> availableNodesInfo; vector<tuple<string, string, string>> selectedNodesInfo; vector<int> indexList; int availableNodes = 0; for(auto& element: this->membershipList){ tuple<string, string, string> keyPair = element.first; tuple<int, int, int> valueTuple = element.second; //dont gossip to self or failed nodes if(get<0>(keyPair).compare(this->nodeInformation.ip) && (get<2>(valueTuple) != 1)){ availableNodesInfo.push_back(keyPair); indexList.push_back(availableNodes++); } } switch (this->runningMode) { case GOSSIP: { return randItems(N_b, availableNodesInfo); } //ALL2ALL default: { return availableNodesInfo; } } } void Node::handleTcpMessage() { //Before we do anything here, we should have the leader check to see if the file list is consistent or not. checkFileListConsistency(); queue<string> qCopy(tcpServent->regMessages); tcpServent->regMessages = queue<string>(); int size = qCopy.size(); //cout << "Got " << size << " TCP messages" << endl; for (int j=0; j<size; j++) { // cout << qCopy.front() << endl; vector<string> msgSplit = splitString(qCopy.front(), "::"); if (msgSplit.size() < 1){ qCopy.pop(); continue; } string payload = ""; for(uint k = 1; k < msgSplit.size(); k++){ if(k == msgSplit.size() - 1) payload += msgSplit[k]; else payload += msgSplit[k] + "::"; } MessageType msgType = static_cast<MessageType>(stoi(msgSplit[0])); Messages msg(msgType, payload); vector<string> inMsg = splitString(msg.payload, "::"); // cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { case JUICESTART: { if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); break;} //TODO //cleanup tmp-prefix-* regardless as those were real temp files } case MAPLESTART: { //leader only function //currently running something, dont start a new phase if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); cout << "[MAPLE] already mapling" << endl; break;} cout << "[MAPLE] Leader starting new Maple phase" << endl; if (inMsg.size() >= 4){ string mapleExe = inMsg[0], num_maples = inMsg[1], sdfsPre = inMsg[2], sdfs_dir = inMsg[3] + "-"; int workers = stoi(num_maples); if (workers > hashRing->nodePositions.size()-1) workers = hashRing->nodePositions.size()-1; int total_lines = 0; vector<tuple<string,int>> directory; cout << "[DIRECTORY] " << sdfs_dir << endl; for (auto &e: fileSizes){ cout << e.first << " | " << to_string(get<1>(e.second)); if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){ cout << " was a match "; directory.push_back(make_tuple(e.first, get<1>(e.second))); total_lines += get<1>(e.second); } cout << endl; } cout << "[MAPLE] need to process " << to_string(total_lines) << endl; vector<tuple<string,string,string>> aliveNodes; for (auto &e : membershipList) aliveNodes.push_back(e.first); vector<tuple<string,string,string>> mapleNodes = randItems(workers, aliveNodes); string includedDebug = ""; for (auto &e : mapleNodes) { Member m(get<0>(e), get<1>(e)); mapleRing->addNode(get<0>(e), hashingId(m, get<2>(e))); includedDebug += get<0>(e) + ","; } cout << "[MAPLE] " << includedDebug << " are the worker nodes" << endl; int start = 0, id = 0; string s; for (auto &e: directory){ start = 0; cout << "[MAPLE] file: " << get<0>(e) << " - " << to_string(get<1>(e)) << endl; while (start < get<1>(e)){ s = get<0>(e) + "::" + to_string(start); id = mapleRing->locateClosestNode(s); vector<int> temp = randItems(1, fileList[get<0>(e)]); mapleProcessing[mapleRing->getValue(id)].push_back(make_tuple(get<0>(e), to_string(start), mapleRing->getValue(temp[0]))); cout << "[MAPLE] assign file " << get<0>(e) << " at " << to_string(start) << " to " << mapleRing->getValue(temp[0]) << endl; mapleSending[mapleRing->getValue(temp[0])].push_back(make_tuple(get<0>(e), to_string(start))); string maplemsg = mapleRing->getValue(id) + "::" +mapleExe + "::" + s + "::" + sdfsPre; //IP, exec, file, start, prefix tcpServent->mapleMessages.push(maplemsg); start = start + T_maples; } } } break; } case CHUNK: { cout << "[CHUNK] " << "we will put sdfsfilename: " << inMsg[2] << " from chunk: " << inMsg[3] << " to node " << inMsg[0] << endl; int end = stoi(inMsg[3]) + T_maples; string sendMsg = msg.payload + "::" + to_string(end); this->tcpServent->pendSendMessages.push(sendMsg); break; } case CHUNKACK: { cout << "[CHUNKACK] receiving the put worked!" << endl; //IP, exec, start, temp, actual file, prefix if (!isLeader) { //forward to know that the file was put okay this->tcpServent->sendMessage(leaderIP, TCPPORT, msg.toString()); int dataPipe[2]; if (pipe(dataPipe)){ fprintf (stderr, "Pipe failed.\n"); break; } pid_t pid = fork(); if (pid){ //parent process, DONT need to waitpid because of signal handler set up close(dataPipe[1]); handlePipe(dataPipe[0], inMsg[5]); } else if (pid < 0) { fprintf (stderr, "Fork failed.\n"); break; } else { //child process close(dataPipe[0]); dup2(dataPipe[1], 1); //stdout -> write end of pipe string execName = "./" + inMsg[4]; int status = execl(execName.c_str(),execName.c_str(),inMsg[3].c_str(),NULL); if (status < 0) exit(status); } close(dataPipe[0]);close(dataPipe[1]); //go thorugh and process things from datapipe //if processing success, send out TCP MAPLEACK string match = "tmp-" + inMsg[5]; int matchLen = match.size(); struct dirent *entry = nullptr; DIR *dp = nullptr; if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << match << endl; break; } while ((entry = readdir(dp))){ cout << "[FILES] found " << entry->d_name << " looking to match " << to_string(matchLen) << " chars from " << match << endl; if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){ string searcher(entry->d_name); string target = searcher.substr(4); cout << "[CHUNKACK] found " << entry->d_name << endl; Messages outMsg(DNS, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + target + "::" + entry->d_name + "::" + to_string(-1) + "::" + to_string(-1) + "::" + target + "::" + "0"); cout << "[PUT] Got localfilename: " << entry->d_name << " with sdfsfilename: " << target << endl; tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); } } closedir(dp); string ackStr = inMsg[0] + "::" + inMsg[4] + "::" + inMsg[2]; //IP, file, chunk Messages ackMsg(MAPLEACK, ackStr); tcpServent->sendMessage(leaderIP, TCPPORT, ackMsg.toString()); break; } cout << "[CHUNKACK] leader confirming the chunk was received" << endl; vector<tuple<string,string>> temp; for (auto &e : mapleSending[inMsg[0]]){ if (get<0>(e).compare(inMsg[4]) == 0){ if (get<1>(e).compare(inMsg[2]) == 0){ continue; } } temp.push_back(e); } if (temp.size()) mapleSending[inMsg[0]] = temp; else mapleSending.erase(inMsg[0]); break; } case MAPLEACK: { vector<tuple<string,string,string>> temp; for (auto &e : mapleProcessing[inMsg[0]]){ if (get<0>(e).compare(inMsg[1]) == 0){ if (get<1>(e).compare(inMsg[2]) == 0){ continue; } } temp.push_back(e); } if (temp.size()) mapleProcessing[inMsg[0]] = temp; else mapleProcessing.erase(inMsg[0]); break; } case PUTACK: { if(inMsg.size() >= 4){ string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; string localfilename = inMsg[2], remoteLocalname = inMsg[3]; cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl; localFilelist[sdfsfilename] = localfilename; Messages outMsg(ACK, to_string(this->hashRingPosition)+"::"+sdfsfilename+"::"+remoteLocalname); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); } break; } case DELETE: { if (isLeader) { if(inMsg.size() >= 2){ string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; localFilelist.erase(sdfsfilename); fileList.erase(sdfsfilename); fileSizes.erase(sdfsfilename); // This is TCP, so we don't need to ACK } } break; } case DNSGET: { if(isLeader){ cout << "msg.payload " << msg.payload << endl; if(inMsg.size() >= 4){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); int selectedNodePosition = nodePosition; string sdfsfilename = inMsg[2], localfilename = inMsg[3]; cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl; vector<int> positions = fileList[sdfsfilename]; if (positions.size() == 0) { // the file is not available cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl; fileList.erase(sdfsfilename); fileSizes.erase(sdfsfilename); Messages outMsg(GETNULL, sdfsfilename+": the file is not available::"); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); break; } cout << "[DNSGET] we have "; for (uint i=0; i<positions.size(); i++) { // pick any node other than the requested node cout << positions[i] << " "; if (positions[i]!=nodePosition) { selectedNodePosition = positions[i]; } } cout << endl; cout << "[DNSGET] we picks " << selectedNodePosition << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(-1, -1, nodePosition); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, true, true); string nodeIP = hashRing->getValue(selectedNodePosition); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>("", "", nodeIP); Messages outMsg(REREPLICATEGET, to_string(nodePosition) + "::" + sdfsfilename+ "::" +localfilename); cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos "; cout << to_string(nodePosition) << endl; this->tcpServent->sendMessage(nodeIP, TCPPORT, outMsg.toString()); } } break; } case DNS: { // TODO: finish DNS functionality here, send out DNSANS if(isLeader){ // Check hashring, get positions and send out DNS ANS isBlackout = true; if(inMsg.size() >= 8){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); string sdfsfilename = inMsg[2]; string localfilename = inMsg[3]; long int size = stol(inMsg[4]); int lines = stoi(inMsg[5]); string overwriteFilename = inMsg[6]; string overwrite = inMsg[7]; cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; // update fileList, client itself is one of the replicas updateFileList(sdfsfilename, nodePosition); fileSizes[sdfsfilename] = make_tuple(size, lines); hashRing->debugHashRing(); int closestNode = hashRing->locateClosestNode(sdfsfilename); int pred = hashRing->getPredecessor(closestNode); int succ = hashRing->getSuccessor(closestNode); if (hashRing->getValue(closestNode).compare(inMsgIP)==0) { closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); cout << "[DNS] we need one more node " << closestNode << endl; } if (hashRing->getValue(pred).compare(inMsgIP)==0) { pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); cout << "[DNS] we need one more node " << pred << endl; } if (hashRing->getValue(succ).compare(inMsgIP)==0) { succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); cout << "[DNS] we need one more node " << succ << endl; } cout << "[DNS] we have nodes [" << closestNode << " (closestNode), "; cout << pred << " (pred), " << succ << " (succ)], reply " << closestNode << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(closestNode, pred, succ); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, false, false); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(inMsgIP, "", ""); Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename + "::" + overwriteFilename + "::" + overwrite); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); } } break; } case DNSANS:{ // Read the answer and send a PUT msg to dest if(inMsg.size() >= 5){ int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); //cout << "nodeIP " << nodeIP << endl; cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; cout << " using localfilename: " << inMsg[1] << endl; string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]+"::"+inMsg[4]; this->tcpServent->pendSendMessages.push(sendMsg); } break; } case REREPLICATEGET: { if (inMsg.size() >= 3) { int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); string sdfsfilename = inMsg[1]; string remoteLocalfilename = inMsg[2]; string localfilename = this->localFilelist[sdfsfilename]; cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename+"::"+"1"; this->tcpServent->pendSendMessages.push(sendMsg); } break; } case REREPLICATE:{ // Read the answer and send a PUT msg to dest if (inMsg.size() >= 2) { int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); string sdfsfilename = inMsg[1]; string localfilename = this->localFilelist[sdfsfilename]; cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+"::"+"1"; this->tcpServent->pendSendMessages.push(sendMsg); //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, ""); } break; } case GETNULL: { if (inMsg.size() >= 1) { cout << "[GETNULL] " << inMsg[0] << endl; } break; } case ACK:{ if (inMsg.size() >= 3) { string nodePosition = inMsg[0]; string sdfsfilename = inMsg[1]; string localfilename = inMsg[2]; localFilelist[sdfsfilename] = localfilename; Messages outMsg(LEADERACK, this->nodeInformation.ip + "::" + to_string(this->hashRingPosition) + "::" + msg.payload); cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename; cout << " on node " << nodePosition << ", and ACK back to the leader" << endl; this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); } break; } case LEADERACK:{ if(isLeader){ if(inMsg.size() >= 4){ string inMsgIP = inMsg[0]; int inMsgnodePosition = stoi(inMsg[1]); int nodePosition = stoi(inMsg[2]); string sdfsfilename = inMsg[3]; string replicatedNodeIP = hashRing->getValue(nodePosition); cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl; string closestNodeIP = ""; // update fileList updateFileList(sdfsfilename, inMsgnodePosition); updateFileList(sdfsfilename, nodePosition); vector<int> temp; cout << "pendingRequests: "; if (get<0>(pendingRequests[sdfsfilename]) == nodePosition) { closestNodeIP = hashRing->getValue(get<0>(pendingRequests[sdfsfilename])); temp.push_back(-1); } else { temp.push_back(get<0>(pendingRequests[sdfsfilename])); } cout << temp[0] << " (sent: " << get<0>(pendingRequestSent[sdfsfilename]); cout << ", from " << get<0>(pendingSenderRequests[sdfsfilename]) << "), "; if (get<1>(pendingRequests[sdfsfilename]) == nodePosition) { temp.push_back(-1); } else { temp.push_back(get<1>(pendingRequests[sdfsfilename])); } cout << temp[1] << " (sent: " << get<1>(pendingRequestSent[sdfsfilename]); cout << ", from " << get<1>(pendingSenderRequests[sdfsfilename]) << "), "; if (get<2>(pendingRequests[sdfsfilename]) == nodePosition) { temp.push_back(-1); } else { temp.push_back(get<2>(pendingRequests[sdfsfilename])); } cout << temp[2] << " (sent:" << get<2>(pendingRequestSent[sdfsfilename]); cout << ", from " << get<2>(pendingSenderRequests[sdfsfilename]) << ")" << endl; pendingRequests[sdfsfilename] = tuple<int, int, int>(temp[0], temp[1], temp[2]); if(get<1>(pendingRequests[sdfsfilename]) == -1 && get<2>(pendingRequests[sdfsfilename])== -1){ pendingRequests.erase(sdfsfilename); pendingRequestSent.erase(sdfsfilename); pendingSenderRequests.erase(sdfsfilename); cout << "[LEADERACK] 3 or more Replicated files are done" << endl; isBlackout = false; break; } if((get<1>(pendingRequests[sdfsfilename])!=-1) && (!get<1>(pendingRequestSent[sdfsfilename]))){ Messages outMsg(REREPLICATE, to_string(get<1>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); // cout << "Sending out rereplicate to " << inMsgIP << "with message " << outMsg.toString() << endl; cout << "[LEADERACK] Ask node incoming " << inMsgIP << " to replicate on pos "; cout << to_string(get<1>(pendingRequests[sdfsfilename])) << endl; this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), true, get<2>(pendingRequestSent[sdfsfilename])); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), inMsgIP, get<2>(pendingSenderRequests[sdfsfilename])); } if((get<2>(pendingRequests[sdfsfilename]) != -1) && (!get<2>(pendingRequestSent[sdfsfilename]))){ Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); // cout << "Sending out rereplicate to " << closestNodeIP << "with message " << outMsg.toString() << endl; cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos "; cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl; this->tcpServent->sendMessage(closestNodeIP, TCPPORT, outMsg.toString()); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), get<1>(pendingRequestSent[sdfsfilename]), true); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), get<1>(pendingSenderRequests[sdfsfilename]), inMsgIP); } } } break; } default: break; } qCopy.pop(); } } void Node::listLocalFiles(){ cout << "sdfsfilename ---> localfilename" << endl; for (auto& element: localFilelist) { cout << element.first << " ---> " << element.second << endl; } } void Node::listSDFSFileList(string sdfsfilename) { bool found = false; vector<int> foundPositions; for (auto& element: fileList) { if(element.first.compare(sdfsfilename)==0) { // found sdfsfilename found = true; foundPositions = element.second; break; } } if (found) { cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; if (foundPositions.size() > 0) { for (uint i=0; i<foundPositions.size(); i++) { string storedIP = hashRing->getValue(foundPositions[i]); cout << storedIP << " at " << foundPositions[i] << endl; } } else { cout << "=== Current list is empty ===" << endl; } } else { cout << "sdfsfilename " << sdfsfilename << " is not existed" << endl; } } /* * Leader sends out fileList in the following string format: * first 2 bytes are filename len, FILENAME msg type, filename itself, * 2 bytes for the number of positions the file has, FILEPOSITION msg type, * and a string of a commas seperated list of positions following that, ending in null byte. * All files are encapsulated in this way and joined to make one string */ string Node::encapsulateFileList() { string enMeg = ""; if (checkLeaderExist() && isLeader) { for (auto& element: fileList) { string positions = ""; string sdfsfilename = element.first; for (uint i=0; i<element.second.size(); i++) { positions += to_string(element.second[i]); if (i != element.second.size()-1) { positions += ","; } } //cout << "sdfsfilename " << sdfsfilename << endl; //cout << "positions " << positions << endl; string size = to_string(get<0>(fileSizes[sdfsfilename])) + "," + to_string(get<1>(fileSizes[sdfsfilename])); char *cstr = new char[sdfsfilename.length()+positions.length()+size.length()+3+3+3+1]; size_t len = sdfsfilename.length()+3; int index = 0; cstr[index++] = len & 0xff; cstr[index] = (len >> 8) & 0xff; if (cstr[index] == 0) { // avoid null cstr[index] = 0xff; } index++; //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); cstr[index++] = FILENAME; for (uint i=0; i<sdfsfilename.length(); i++) { cstr[index+i] = sdfsfilename.c_str()[i]; } index += sdfsfilename.length(); size_t len2 = positions.length()+3; cstr[index++] = len2 & 0xff; cstr[index] = (len2 >> 8) & 0xff; if (cstr[index] == 0) { // avoid null cstr[index] = 0xff; } index++; //printf("cstr[3] %x, cstr[4] %x\n", cstr[0], cstr[1]); cstr[index++] = FILEPOSITIONS; //printf("cstr[%lu] %d\n", sdfsfilename.length()+2, cstr[sdfsfilename.length()+2]); for (uint i=0; i<positions.length(); i++) { cstr[index+i] = positions.c_str()[i]; } index += positions.length(); size_t len3 = size.length()+3; cstr[index++] = len3 & 0xff; cstr[index] = (len3 >> 8) & 0xff; if (cstr[index] == 0) { // avoid null cstr[index] = 0xff; } index++; cstr[index++] = FILESIZE; for (uint i=0; i<size.length(); i++) { cstr[index+i] = size.c_str()[i]; } index += size.length(); cstr[index] = '\0'; //printf("cstrFile %s\n", cstr); string enMegFile(cstr); //cout << "enMegFile " << enMegFile << endl; enMeg += enMegFile; } //cout << "encapsulateFileList " << enMeg << endl; } return enMeg; } //(len, PayloadType, message, \0) encoding where len is 2 bytes. string Node::encapsulateMessage(map<PayloadType,string> payloads) { string enMeg = ""; //cout << "payloads.size " << payloads.size() << endl; for (auto& element: payloads) { PayloadType type = element.first; string message = element.second; //cout << "message " << message << endl; //cout << "message.length " << message.length() << endl; //cout << "type " << type << endl; char *cstr = new char[message.length()+4]; size_t len = message.length()+3; cstr[0] = len & 0xff; cstr[1] = (len >> 8) & 0xff; if (cstr[1] == 0) { // avoid null cstr[1] = 0xff; } //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); cstr[2] = type; //printf("cstr[2] %x\n", cstr[2]); for (uint i=0; i<message.length(); i++) { cstr[i+3] = message.c_str()[i]; } cstr[message.length()+3] = '\0'; //printf("cstrMsg %s\n", cstr); string enMegPart(cstr); //cout << "enMegPart " << enMegPart << endl; enMeg += enMegPart; } //cout << "encapsulateMessage " << enMeg << endl; return enMeg; } void Node::decapsulateFileList(string payload) { int size = payload.length(); uint pos = 0; fileList.clear(); fileSizes.clear(); string lastFilename = ""; while (size > 0) { size_t length; if ((payload.c_str()[1+pos] & 0xff) == 0xff) { length = 0; } else { length = (payload.c_str()[1+pos]) & 0xff; length = length << 8; } length += (payload.c_str()[0+pos]) & 0xff; PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]); //printf(" len %lu, type %d\n", length, type); char cstr[length]; bzero(cstr, sizeof(cstr)); for (uint i=3; i<length; i++) { cstr[i-3] = payload.c_str()[pos+i]; } string deMegPart(cstr); switch (type) { case FILENAME: { //cout << "FILENAME " << deMegPart << endl; lastFilename = deMegPart; break; } case FILEPOSITIONS: { //cout << "FILEPOSITIONS " << deMegPart << endl; vector<string> temp = splitString(deMegPart, ","); vector<int> positions; for (uint i=0; i<temp.size(); i++) { if (temp[i].compare("")!=0) { positions.push_back(stoi(temp[i])); } } fileList[lastFilename] = positions; break; } case FILESIZE: { vector<string> temp = splitString(deMegPart, ","); fileSizes[lastFilename] = make_tuple(stol(temp[0]),stoi(temp[1])); break; } default: break; } size -= length; pos += length; } // check with local file list if (!isLeader) { vector<string> fileToDelete; for (auto& element: localFilelist) { //cout << "sdfsfilename " << element.first << endl; if (fileList[element.first].size() == 0) { fileToDelete.push_back(element.first); } } for (uint i=0; i<fileToDelete.size(); i++) { localFilelist.erase(fileToDelete[i]); cout << "[DELETE] sdfsfilename " << fileToDelete[i] << endl; } } } string Node::decapsulateMessage(string payload) { int size = payload.length(); uint pos = 0; //cout << "payload " << payload << endl; //cout << "size " << size << endl; string deMeg = ""; while (size > 0) { size_t length; if ((payload.c_str()[1+pos] & 0xff) == 0xff) { length = 0; } else { length = (payload.c_str()[1+pos]) & 0xff; length = length << 8; } length += (payload.c_str()[0+pos] & 0xff); //printf("lengthMeg %x %x %lu\n", payload.c_str()[0+pos], payload.c_str()[1+pos], length); PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]); //printf(" len %lu, type %d\n", length, type); char cstr[length]; bzero(cstr, sizeof(cstr)); for (uint i=3; i<length; i++) { cstr[i-3] = payload.c_str()[pos+i]; } //printf("cstr %s\n", cstr); string deMegPart(cstr); //cout << "deMegPart " << deMegPart << endl; if (type == REGULAR) { deMeg = deMegPart; } else if (type == FILEPAIR) { if (checkLeaderExist() && !isLeader) { //cout << "FILEPAIR " << deMegPart << endl; decapsulateFileList(deMegPart); } } //cout << "size1 " << size << endl; size -= length; pos += length; //cout << "size2 " << size << endl; } //cout << "deMeg " << deMeg << endl; return deMeg; } // piggyback fileList in heartbeat string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) { Messages msg(type, mem_list_to_send); //cout << "populateSDFSFileList " << msg.toString() << endl; map<PayloadType,string> payloads; payloads[REGULAR] = msg.toString(); if (isLeader) { // Only the leader includes the fileList payloads[FILEPAIR] = encapsulateFileList(); } string enMeg = encapsulateMessage(payloads); return enMeg; }