diff --git a/.DS_Store b/.DS_Store index b7ac86341431de932ecac05905aaffe5b05547fa..666eed3262c1264d8536aef30d046f36b010c374 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/Wc b/Wc index 587d7dc940a69b04202ac7e76321ffda59bbbfb5..647b6bbefe11636dc2ce2c9182d7985532920625 100755 Binary files a/Wc and b/Wc differ diff --git a/Wr b/Wr index 2979ec3dc482f628efef6d8ebf053e0ef5209e29..a455eb1735fdf9d0b701d09f3860ed50e925db06 100755 Binary files a/Wr and b/Wr differ diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index 1235c3ff5d4581f9c3d31b1e1cb110fa0a21c09f..838ddb2036ef85d0b58514532de0fa2ea14e7724 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -35,9 +35,7 @@ enum MessageType { MERGE, //merge messages used for combining keys at end of maple phased STARTMERGE, MERGECOMPLETE, - MERGEFAIL, - REPACK, - REP + MERGEFAIL }; enum PayloadType { diff --git a/inc/Node.h b/inc/Node.h index 72eff0d05f83a5a4175e806143c9bca3d07d3873..7a30e433844cf7317487598b5b2c0b762a5ac3d2 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -101,7 +101,6 @@ public: map<string, tuple<long int, int>> fileSizes; //used so master can partition in the map phase tuple is (bytes, lines) HashRing *workerRing; //hashring of worker nodes maple/juice map<string, vector<tuple<string, string>>> mapleSending; //originIP -> (file, chunk_start); - map<string, map<tuple<string,int>, int>> outputReplication; set<string> mapleKeys; //keys produced in maple phase map<string, int> filesAtWorker; queue<string> maplejuiceQ; @@ -145,7 +144,6 @@ public: void replicateKeys(); void resetMapleJuice(); void handleMaplejuiceQ(); - void debugOutputRep(); private: string populateMembershipMessage(); //membershipList to string based on mode type string populateIntroducerMembershipMessage(); //entire membership list to string diff --git a/inc/Utils.h b/inc/Utils.h index 4d42ec292ee2013559d4923e6ee34887a1c55e52..1d6c414d6b6569b94eae224d4606781ec60aedec 100644 --- a/inc/Utils.h +++ b/inc/Utils.h @@ -55,8 +55,9 @@ vector<T> randItems(int numItems, vector<T> toChoose){ vector<int> indexList; int availableNodes = toChoose.size(); for (int i = 0; i < availableNodes; i++) indexList.push_back(i); + if (availableNodes <= numItems) return toChoose; int nodeCount = 0; - while (nodeCount < numItems && (availableNodes > 0)) { + while (nodeCount < numItems) { int randomNum = rand() % availableNodes; selectedNodesInfo.push_back(toChoose[indexList[randomNum]]); indexList.erase(indexList.begin() + randomNum); diff --git a/src/Node.cpp b/src/Node.cpp index 3c2eaa07e28dacbdca3fe394fa12f302b4423962..d001d85598f8a44e9b0e24dcb96ff17709e38cf5 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -59,61 +59,6 @@ string Node::updateNodeHeartbeatAndTime() return startTime; } -void Node::debugOutputRep(){ - //outputReplication - set<int> members; - set<string> membersIP; - set<int> locations; - //string debug = "[DEBUGREP] ---------------------------\n", - string randSenderIp; - vector<tuple<string,string,int>> toDel; - tuple<string,int> minKey; - int repTarget; - srand(time(NULL)); - for (auto pos : hashRing->nodePositions) { members.insert(pos); membersIP.insert(hashRing->getValue(pos)); } - for (auto &mapping : outputReplication){ - locations.clear(); - for (auto &f: fileList[mapping.first]) locations.insert(f); - for (auto &reps: mapping.second){ - randSenderIp = ""; - repTarget = -1; - if (reps.second == 0) { - toDel.push_back(make_tuple(mapping.first, get<0>(reps.first), get<1>(reps.first))); - continue; - } - if (!membersIP.count(get<0>(reps.first))){ - vector<int> randSender = randItems(1, fileList[mapping.first]); - randSenderIp = hashRing->getValue(randSender[0]); - } - if (!members.count(get<1>(reps.first))){ - int index = 0; - repTarget = hashRingPosition; - while (locations.count(repTarget) && index < 20){ - repTarget = hashRing->getRandomNode(tuple<int,int,int>(repTarget,repTarget,repTarget)); - index++; - } - if (repTarget == -1) repTarget = hashRing->getSuccessor(leaderPosition); - } - if (randSenderIp.size() || (repTarget != -1)){ - if (repTarget == -1) repTarget = get<1>(reps.first); - if (!randSenderIp.size()) randSenderIp = get<0>(reps.first); - outputReplication[mapping.first][make_tuple(randSenderIp,repTarget)] = reps.second; - Messages outMsg(REP, randSenderIp + "::" + hashRing->getValue(repTarget)+"::"+to_string(reps.second)+"::"+mapping.first+"::"+to_string(get<0>(fileSizes[mapping.first]))); - tcpServent->regMessages.push(outMsg.toString()); - toDel.push_back(make_tuple(mapping.first, get<0>(reps.first), get<1>(reps.first))); - cout << "[REPLACE] OLD: " << get<0>(reps.first) << " -> " << to_string(get<1>(reps.first)) << " | NEW: " << randSenderIp << " -> " << to_string(repTarget) << endl; - } - else toDel.push_back(make_tuple(mapping.first, get<0>(reps.first), get<1>(reps.first))); - } - } - for (auto &item : toDel){ - outputReplication[get<0>(item)].erase(make_tuple(get<1>(item),get<2>(item))); - if (outputReplication[get<0>(item)].size() == 0) outputReplication.erase(get<0>(item)); - } - //debug += "[DEBUGREP] ---------------------------\n"; - //if (added) cout << debug << endl; -} - string Node::populateMembershipMessage() { //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag @@ -188,33 +133,68 @@ int Node::failureDetection(){ } //node has not failed if(get<2>(valueTuple) == 0){ - //timeout passed, set as failed if(localTimestamp - get<1>(valueTuple) > T_timeout){ //cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; //cout << "local time " << localTimestamp << " vs. " << get<1>(valueTuple) << endl; get<1>(this->membershipList[keyTuple]) = localTimestamp; get<2>(this->membershipList[keyTuple]) = 1; + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple)+": Local Failure"; cout << "[FAIL]" << message.c_str() << endl; this->logWriter->printTheLog(FAIL, message); if(isLeader){ + // clearn up fileList Member deletedNode(get<0>(keyTuple), get<1>(keyTuple)); int deletedNodePostion = hashingId(deletedNode, get<2>(keyTuple)); hashRing->removeNode(deletedNodePostion); - //for each file, remove the deleted node from its location vector for (auto& element: fileList) { vector<int> newEntry; for(unsigned int i = 0; i < element.second.size(); i++){ if(element.second[i] != deletedNodePostion){ newEntry.push_back(element.second[i]); } - else { - outputReplication[element.first][make_tuple("-1", -1)] = 1; - } } fileList[element.first] = newEntry; } + // chech if the failure is the sender in pending requests + for (auto& senders: pendingSenderRequests) { + string sdfsfilename = senders.first; + tuple<string,string,string> sender = senders.second; + if ((get<0>(keyTuple).compare(get<0>(sender))==0) && + get<0>(pendingRequestSent[sdfsfilename]) && + (get<0>(pendingRequests[sdfsfilename])!=-1)) { + // it sent out, and is not finished, cannot help + // we lost the data from the client + cout << "[PUT] client itself fails, we cannot help, remove request" << endl; + isBlackout = false; + pendingRequests.erase(sdfsfilename); + pendingRequestSent.erase(sdfsfilename); + continue; + } + if ((get<0>(keyTuple).compare(get<1>(sender))==0) && + get<1>(pendingRequestSent[sdfsfilename]) && + (get<1>(pendingRequests[sdfsfilename])!=-1)) { + // the sender fails during 2nd pass + // replace the sent + cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; + if (get<2>(pendingRequests[sdfsfilename])!=-1) { + tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename])); + } else { + pendingRequests.erase(sdfsfilename); + } + continue; + } + if ((get<0>(keyTuple).compare(get<2>(sender))==0) && + get<2>(pendingRequestSent[sdfsfilename]) && + (get<2>(pendingRequests[sdfsfilename])!=-1)) { + // it sent out, but replicates are failed + // restart again + cout << "[PUT/REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; + pendingRequests.erase(sdfsfilename); + } + } + ////////////////////////////////////////////////////// //1) remove from HashRing //2) if processing, reassign @@ -222,9 +202,7 @@ int Node::failureDetection(){ //3) if a sender, reassign replica holders as new senders for each thing sent if (workerRing->size() > 0){ vector<tuple<string,string,string>> aliveNodes; - for (auto &e : membershipList) { - if (get<2>(e.second) == 0) aliveNodes.push_back(e.first); - } + for (auto &e : membershipList) aliveNodes.push_back(e.first); vector<tuple<string,string,string>> mapleNodes; int nextId; size_t newSize = workerRing->nodePositions.size()-1; @@ -232,7 +210,8 @@ int Node::failureDetection(){ nextId = workerRing->getSuccessor(deletedNodePostion); } else { while (1){ - vector<tuple<string,string,string>> mapleNodes = randItems(1, aliveNodes); + mapleNodes.clear(); + mapleNodes = randItems(1, aliveNodes); Member m(get<0>(mapleNodes[0]), get<1>(mapleNodes[0])); nextId = hashingId(m, get<2>(mapleNodes[0])); if (workerRing->getValue(nextId).compare("No node found") == 0) break; @@ -282,50 +261,9 @@ int Node::failureDetection(){ tcpServent->mapleMessages.push(mapleS); } } - - // chech if the failure is the sender in pending requests - for (auto& senders: pendingSenderRequests) { - string sdfsfilename = senders.first; - tuple<string,string,string> sender = senders.second; - if ((get<0>(keyTuple).compare(get<0>(sender))==0) && - get<0>(pendingRequestSent[sdfsfilename]) && - (get<0>(pendingRequests[sdfsfilename])!=-1)) - { - // it sent out, and is not finished, cannot help - // we lost the data from the client - //cout << "[PUT] client itself fails, we cannot help, remove request" << endl; - isBlackout = false; - pendingRequests.erase(sdfsfilename); - pendingRequestSent.erase(sdfsfilename); - continue; - } - if ((get<0>(keyTuple).compare(get<1>(sender))==0) && - get<1>(pendingRequestSent[sdfsfilename]) && - (get<1>(pendingRequests[sdfsfilename])!=-1)) - { - // the sender fails during 2nd pass - // replace the sent - //cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; - if (get<2>(pendingRequests[sdfsfilename])!=-1) { - tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename])); - } else { - pendingRequests.erase(sdfsfilename); - } - continue; - } - if ((get<0>(keyTuple).compare(get<2>(sender))==0) && - get<2>(pendingRequestSent[sdfsfilename]) && - (get<2>(pendingRequests[sdfsfilename])!=-1)) - { - pendingRequests.erase(sdfsfilename); - } - } } - } - } - //check for cleanup on already failed nodes - else{ + } else { //check for cleanup on already failed nodes if(localTimestamp - get<1>(valueTuple) > T_cleanup){ auto iter = this->membershipList.find(keyTuple); if (iter != this->membershipList.end()) { @@ -812,6 +750,49 @@ void Node::updateFileList(string sdfsfilename, int nodePosition) } } +//Can only be called when we are the leader node, since we are going to be calling REREPLICATE here, and checking the global file list. +//Called in ProcessRegMessages before we do anything else, since we want to check the global file list consistency before we process the other messages +//In the Queue. +void Node::checkFileListConsistency(){ + if (membershipList.size() < 4) { + cout << "[ERROR] The number of members are too small, we need at least 4" << endl; + return; + } + for (auto& element: fileList) { + if(element.second.size() < 4){ + //Need to rereplicate --> do this one at a time + //First check the closest node, successor and predecessor + int closestNodePostion = hashRing->locateClosestNode(element.first); + int pred = hashRing->getPredecessor(closestNodePostion); + int succ = hashRing->getSuccessor(closestNodePostion); + int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ)); + vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode}; + for(unsigned int i = 0; i < nodesToCheck.size(); i++){ + if (!isInVector(element.second, nodesToCheck[i])) + { + string nodeInfo = hashRing->getValue(nodesToCheck[i]); + Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); + tuple<int, int, int> request = pendingRequests[element.first]; + if(get<0>(request) == 0 && get<1>(request) == 0 && get<2>(request) == 0){ + pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); + pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + break; + } + if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ + cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; + break; + } + pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); + pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + break; + } + } + } + } +} + vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() { vector<tuple<string, string, string>> availableNodesInfo; @@ -839,7 +820,10 @@ vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() } void Node::handleMaplejuiceQ(){ - if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout && !tcpServent->mapleMessages.size() && !outputReplication.size()){ + if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout){ + for (auto &e: fileList) { + if (e.second.size() < 4) return; //make sure replication is good first + } string msgCopy(maplejuiceQ.front()); cout << "[QUEUE] sending next maple/juice to be processed " << msgCopy << endl; tcpServent->regMessages.push(msgCopy); @@ -849,6 +833,10 @@ void Node::handleMaplejuiceQ(){ void Node::handleTcpMessage() { + //Before we do anything here, we should have the leader check to see if the file list is consistent or not. + if(isLeader){ + checkFileListConsistency(); + } queue<string> qCopy(tcpServent->regMessages); tcpServent->regMessages = queue<string>(); int size = qCopy.size(); @@ -1014,20 +1002,8 @@ void Node::handleTcpMessage() if (!workerProcessing[inMsg[0]].size()) workerProcessing.erase(inMsg[0]); if (!workerProcessing.size()) { cout <<"[JUICEACK] replicate final results " << endl; - updateFileList(sdfsOut, hashRingPosition); - FILE *fp = fopen(sdfsOut.c_str(), "rb"); - fseek(fp, 0, SEEK_END); - int size = ftell(fp); - fseek(fp, 0, SEEK_SET); - fclose(fp); - fileSizes[sdfsOut] = make_tuple(size, -1); - int repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); - while (hashRing->getValue(repTarget).compare(nodeInformation.ip)==0) { - repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); - } - outputReplication[sdfsOut][make_tuple(nodeInformation.ip,repTarget)] = 3; - string sendMsg = hashRing->getValue(repTarget) + "::" + to_string(REPACK) + "::" + sdfsOut + "::" + to_string(3) + "::" + sdfsOut + "," + to_string(get<0>(fileSizes[sdfsOut])) + "::"; - this->tcpServent->mergeMessages.push(sendMsg); + Messages outMsg(DNS, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + sdfsOut + "::" + sdfsOut + "::" + "-1" + "::" + "-1" + "::"); + tcpServent->regMessages.push(outMsg.toString()); if (maplejuiceClear){ cout << "[JUICEACK] clearing files.... " << endl; for (auto &f : fileList){ @@ -1038,7 +1014,6 @@ void Node::handleTcpMessage() } } cout << "[JUICE] ------------ complete ---------- " << endl; - isJuicePhase = false; resetMapleJuice(); } break; @@ -1274,22 +1249,27 @@ void Node::handleTcpMessage() auto it = mapleKeys.begin(); while (it != mapleKeys.end()){ string file = sdfsPre + (*it); + it++; + isBlackout = true; updateFileList(file, hashRingPosition); - FILE *fp = fopen(file.c_str(), "rb"); - fseek(fp, 0, SEEK_END); - int size = ftell(fp); - fseek(fp, 0, SEEK_SET); - fclose(fp); - fileSizes[file] = make_tuple(size, -1); - int repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); - while (hashRing->getValue(repTarget).compare(nodeInformation.ip)==0) { - repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + fileSizes[file] = make_tuple(-1, -1); + int closestNode = hashRing->locateClosestNode(file); + int pred = hashRing->getPredecessor(closestNode); + int succ = hashRing->getSuccessor(closestNode); + if (!hashRing->getValue(closestNode).compare(nodeInformation.ip)) { + closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); } - outputReplication[file][make_tuple(nodeInformation.ip,repTarget)] = 3; - //mergeFiles(string ip, string port, string handler, string filedest, string header, string toSend, string starts) - string sendMsg = hashRing->getValue(repTarget) + "::" + to_string(REPACK) + "::" + file + "::" + to_string(3) + "::" + file + "," + to_string(get<0>(fileSizes[file])) + "::"; - this->tcpServent->mergeMessages.push(sendMsg); - it++; + if (hashRing->getValue(pred).compare(nodeInformation.ip)==0) { + pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + } + if (hashRing->getValue(succ).compare(nodeInformation.ip)==0) { + succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + } + pendingRequests[file] = tuple<int, int, int>(closestNode, pred, succ); + pendingRequestSent[file] = tuple<int, int, int>(true, false, false); + pendingSenderRequests[file] = tuple<string, string, string>(nodeInformation.ip, "", ""); + string sendMsg = hashRing->getValue(closestNode)+"::"+file+"::"+file+"::"; + this->tcpServent->pendSendMessages.push(sendMsg); } cout << "[MAPLE] ------------ complete ---------- " << endl; resetMapleJuice(); @@ -1365,7 +1345,7 @@ void Node::handleTcpMessage() pendingSenderRequests[sdfsfilename] = tuple<string, string, string>("", "", nodeIP); Messages outMsg(REREPLICATEGET, to_string(nodePosition) + "::" + sdfsfilename+ "::" +localfilename); //cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos "; - //cout << to_string(nodePosition) << endl; + cout << to_string(nodePosition) << endl; this->tcpServent->sendMessage(nodeIP, TCPPORT, outMsg.toString()); } @@ -1431,49 +1411,6 @@ void Node::handleTcpMessage() } break; } - case REP: { - if (inMsg[0].compare(nodeInformation.ip)){ - Messages forward(REP, inMsg[0] + "::" + inMsg[1] + "::" + inMsg[2] + "::" + inMsg[3] + "::" + inMsg[4]); - this->tcpServent->sendMessage(inMsg[0], TCPPORT, forward.toString()); - } - else{ - string sendMsg = inMsg[1] + "::" + to_string(REPACK) + "::" + inMsg[3] + "::" + inMsg[2] + "::" + inMsg[3] + "," + inMsg[4] + "::"; - cout << "[REP] " << sendMsg << endl; - this->tcpServent->mergeMessages.push(sendMsg); - } - break; - } - case REPACK: { - //senderIP, receiverHash, newreceiverHash, file, repfactor - if (isLeader){ - int repFactor = stoi(inMsg[4]); - int receiverHash = stoi(inMsg[1]); - updateFileList(inMsg[3], receiverHash); - string receiverIP = hashRing->getValue(receiverHash); - outputReplication[inMsg[3]].erase(make_tuple(inMsg[0],receiverHash)); - if (repFactor > 0) outputReplication[inMsg[3]][make_tuple(receiverIP,stoi(inMsg[2]))] = repFactor; - if (outputReplication[inMsg[3]].size() == 0) outputReplication.erase(inMsg[3]); - } - else{ //ip,file,size,rep - int repFactor = stoi(inMsg[3]) - 1; - int repTarget = 0; - if (repFactor > 0){ - set<int> locations; - for (auto &e: fileList[inMsg[1]]) locations.insert(e); - repTarget = hashRing->getSuccessor(hashRingPosition); - while (locations.count(repTarget) || (hashRing->getValue(repTarget).compare(leaderIP) == 0)) { - repTarget = hashRing->getSuccessor(repTarget); - } - } - Messages outMsg(REPACK, inMsg[0] + "::" + to_string(hashRingPosition) + "::" + to_string(repTarget) +"::" + inMsg[1] + "::" + to_string(repFactor)); - this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); - if (repFactor > 0){ - string sendMsg = hashRing->getValue(repTarget) + "::" + to_string(REPACK) + "::" + inMsg[1] + "::" + to_string(repFactor) + "::" + inMsg[1] + "," + inMsg[2] + "::"; - this->tcpServent->mergeMessages.push(sendMsg); - } - } - break; - } case REREPLICATEGET: { if (inMsg.size() >= 3) { int nodePosition = stoi(inMsg[0]); @@ -1876,10 +1813,10 @@ string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) void Node::resetMapleJuice(){ cleanupTmpFiles("tmp-"); - outputReplication.clear(); workerProcessing.clear(); workerTasks.clear(); mapleSending.clear(); filesAtWorker.clear(); workerRing->clear(); + isJuicePhase = false; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index ff30bf72bb3d219f27b4e0b7f1d56359d845cb7e..6ca3bee6fe4e6ccd90de8753687f7569dad3e57e 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -244,13 +244,12 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP vector<string> filesAndSizes = splitString(payload, ","); int returnType = stoi(filesAndSizes[0]), processedCounter = 0; string returnTypeString = "", remoteLocalname = "", execfilename = "", mode = "wb"; - size_t dirSize = filesAndSizes.size(); - int fail = 0; + int dirSize = filesAndSizes.size(), fail = 0; size_t index = 3; int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE; vector<string> format; char c; - string preProcessed = "", repFactor = ""; + string preProcessed = ""; string filedest = filesAndSizes[1], processed = "", filename = "", sdfsfilename = ""; if (returnType == MAPLEACK) returnTypeString = "MAPLEMERGE"; @@ -271,10 +270,6 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP //cout << "[PUT] sdfs: " << sdfsfilename << ", remoteLocal: " << remoteLocalname << endl; index = 4; } - if (returnType == REPACK){ - returnTypeString = "REPACK"; - repFactor = filesAndSizes[2]; - } if (returnType == CHUNKACK) { returnTypeString = "CHUNK"; sdfsfilename = filesAndSizes[2]; //sdfs file @@ -368,7 +363,6 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP else if (returnType == JUICEACK){ Messages ack(JUICEACK, returnIP + "::" + preProcessed); regMessages.push(ack.toString()); } else if (returnType == CHUNKACK){ Messages ack(CHUNKACK, returnIP + "::" + execfilename + "::" + filesAndSizes[4] + "::" + filename + "::" + sdfsfilename); regMessages.push(ack.toString()); } else if (returnType == PUTACK){ Messages ack(PUTACK, returnIP + "::" + sdfsfilename + "::" + filename+ "::" + remoteLocalname); regMessages.push(ack.toString()); } - else if (returnType == REPACK && !fail){ Messages ack(REPACK, returnIP + "::" + filename + "::" + to_string(filesize) + "::" + repFactor); regMessages.push(ack.toString()); } else { cout << "[MERGE bad return type " << to_string(returnType) << endl;} break; } @@ -392,8 +386,6 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP case STARTMERGE: case MERGECOMPLETE: case MERGEFAIL: - case REPACK: - case REP: case DNS:{ //cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl; regMessages.push(payloadMessage); //handle from queue diff --git a/src/Threads.cpp b/src/Threads.cpp index 3531a86ba0d8d281433c2bcb29944a2f0b16ec9b..55a0f69ba0b343f051dd0c8e506e7eeb4ed26281 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -95,11 +95,8 @@ void *runSenderThread(void *node) //5a. check for queue maple/juice messages nodeOwn->handleMaplejuiceQ(); //5b. check for regular TCP messages - if (nodeOwn->isLeader) nodeOwn->debugOutputRep(); - nodeOwn->handleTcpMessage(); - // 6. check leader (If hashRing is sent via heartbeat, then we have a leader) if (!nodeOwn->checkLeaderExist()) { // If no leader nodeOwn->tcpElectionProcessor();