diff --git a/.DS_Store b/.DS_Store index a92f0e8b3dc2154df370e680759186f4f45cbfa9..b7ac86341431de932ecac05905aaffe5b05547fa 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/Wc b/Wc index 45fdddda2c0937027dda46c63dcdde62e4d1bc8f..7a48f640692644ffdf69ab771730c20c533e5b04 100755 Binary files a/Wc and b/Wc differ diff --git a/Wr b/Wr index af286f9ac8dcbcdaa54f7b37e68f0a995e0234a8..87fe453185225e8a06c5e0b95729d6cb6cd80140 100755 Binary files a/Wr and b/Wr differ diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index 838ddb2036ef85d0b58514532de0fa2ea14e7724..1235c3ff5d4581f9c3d31b1e1cb110fa0a21c09f 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -35,7 +35,9 @@ enum MessageType { MERGE, //merge messages used for combining keys at end of maple phased STARTMERGE, MERGECOMPLETE, - MERGEFAIL + MERGEFAIL, + REPACK, + REP }; enum PayloadType { diff --git a/inc/Node.h b/inc/Node.h index 7a30e433844cf7317487598b5b2c0b762a5ac3d2..72eff0d05f83a5a4175e806143c9bca3d07d3873 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -101,6 +101,7 @@ public: map<string, tuple<long int, int>> fileSizes; //used so master can partition in the map phase tuple is (bytes, lines) HashRing *workerRing; //hashring of worker nodes maple/juice map<string, vector<tuple<string, string>>> mapleSending; //originIP -> (file, chunk_start); + map<string, map<tuple<string,int>, int>> outputReplication; set<string> mapleKeys; //keys produced in maple phase map<string, int> filesAtWorker; queue<string> maplejuiceQ; @@ -144,6 +145,7 @@ public: void replicateKeys(); void resetMapleJuice(); void handleMaplejuiceQ(); + void debugOutputRep(); private: string populateMembershipMessage(); //membershipList to string based on mode type string populateIntroducerMembershipMessage(); //entire membership list to string diff --git a/inc/Utils.h b/inc/Utils.h index 1d6c414d6b6569b94eae224d4606781ec60aedec..4d42ec292ee2013559d4923e6ee34887a1c55e52 100644 --- a/inc/Utils.h +++ b/inc/Utils.h @@ -55,9 +55,8 @@ vector<T> randItems(int numItems, vector<T> toChoose){ vector<int> indexList; int availableNodes = toChoose.size(); for (int i = 0; i < availableNodes; i++) indexList.push_back(i); - if (availableNodes <= numItems) return toChoose; int nodeCount = 0; - while (nodeCount < numItems) { + while (nodeCount < numItems && (availableNodes > 0)) { int randomNum = rand() % availableNodes; selectedNodesInfo.push_back(toChoose[indexList[randomNum]]); indexList.erase(indexList.begin() + randomNum); diff --git a/src/Node.cpp b/src/Node.cpp index e84cdab5f04a1c0de795adf9ed0f62844d223379..5be4cbad9748ad0d45fe46707c7bbe2a580c24f9 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -59,6 +59,33 @@ string Node::updateNodeHeartbeatAndTime() return startTime; } +void Node::debugOutputRep(){ + //outputReplication + set<int> members; + set<string> membersIP; + string debug = "[DEBUGREP] ---------------------------\n"; + vector<tuple<string,string,int>> toDel; + for (auto pos : hashRing->nodePositions) { members.insert(pos); membersIP.insert(hashRing->getValue(pos)); } + for (auto &mapping : outputReplication){ + for (auto &reps: mapping.second){ + if (membersIP.count(get<0>(reps.first)) || members.count(get<1>(reps.first))){ + toDel.push_back(make_tuple(mapping.first, get<0>(reps.first), get<1>(reps.first))); + } + else{ + debug += ("FILE: " + mapping.first + " | (" + get<0>(reps.first) + "->"); + debug += (to_string(get<1>(reps.first)) + ")" + " REPFACTOR: " + to_string(reps.second)); + debug += "\n"; + } + } + } + for (auto &item : toDel){ + outputReplication[get<0>(item)].erase(make_tuple(get<1>(item),get<2>(item))); + if (outputReplication[get<0>(item)].size() == 0) outputReplication.erase(get<0>(item)); + } + debug += "[DEBUGREP] ---------------------------\n"; + cout << debug << endl; +} + string Node::populateMembershipMessage() { //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag @@ -265,6 +292,30 @@ int Node::failureDetection(){ } } + //outputReplication + set<int> locations; + for (auto &mapping : outputReplication){ + locations.clear(); + for (auto &curLoc : fileList[mapping.first]) locations.insert(curLoc); + for (auto &pairings : mapping.second){ + if (deletedNodePostion == get<1>(pairings.first)){ + //failed as receiver + int repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + while (locations.count(repTarget) != 0) { + repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + } + outputReplication[mapping.first][make_tuple(get<0>(pairings.first),repTarget)] = pairings.second; + Messages outMsg(REP, hashRing->getValue(repTarget)+"::"+to_string(pairings.second)+"::"+mapping.first+"::"+to_string(get<0>(fileSizes[mapping.first]))); + tcpServent->sendMessage(get<0>(pairings.first), TCPPORT, outMsg.toString()); + } + if (deletedNode.ip.compare(get<0>(pairings.first)) == 0){ + vector<int> randSender = randItems(1,fileList[mapping.first]); + outputReplication[mapping.first][make_tuple(hashRing->getValue(randSender[0]),get<1>(pairings.first))] = pairings.second; + Messages outMsg(REP, hashRing->getValue(get<1>(pairings.first))+"::"+to_string(pairings.second)+"::"+mapping.first+"::"+to_string(get<0>(fileSizes[mapping.first]))); + tcpServent->sendMessage(get<0>(pairings.first), TCPPORT, outMsg.toString()); + } + } + } } } @@ -821,10 +872,7 @@ vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() } void Node::handleMaplejuiceQ(){ - if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout && !tcpServent->mapleMessages.size()){ - for (auto &e: fileList) { - if (e.second.size() < 2) return; //make sure replication is good first - } + if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout && !tcpServent->mapleMessages.size() && !outputReplication.size()){ string msgCopy(maplejuiceQ.front()); cout << "[QUEUE] sending next maple/juice to be processed " << msgCopy << endl; tcpServent->regMessages.push(msgCopy); @@ -1248,27 +1296,22 @@ void Node::handleTcpMessage() auto it = mapleKeys.begin(); while (it != mapleKeys.end()){ string file = sdfsPre + (*it); - it++; - isBlackout = true; updateFileList(file, hashRingPosition); - fileSizes[file] = make_tuple(-1, -1); - int closestNode = hashRing->locateClosestNode(file); - int pred = hashRing->getPredecessor(closestNode); - int succ = hashRing->getSuccessor(closestNode); - if (!hashRing->getValue(closestNode).compare(nodeInformation.ip)) { - closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - } - if (hashRing->getValue(pred).compare(nodeInformation.ip)==0) { - pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + FILE *fp = fopen(file.c_str(), "rb"); + fseek(fp, 0, SEEK_END); + int size = ftell(fp); + fseek(fp, 0, SEEK_SET); + fclose(fp); + fileSizes[file] = make_tuple(size, -1); + int repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + while (hashRing->getValue(repTarget).compare(nodeInformation.ip)==0) { + repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); } - if (hashRing->getValue(succ).compare(nodeInformation.ip)==0) { - succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - } - pendingRequests[file] = tuple<int, int, int>(closestNode, pred, succ); - pendingRequestSent[file] = tuple<int, int, int>(true, false, false); - pendingSenderRequests[file] = tuple<string, string, string>(nodeInformation.ip, "", ""); - string sendMsg = hashRing->getValue(closestNode)+"::"+file+"::"+file+"::"; - this->tcpServent->pendSendMessages.push(sendMsg); + outputReplication[file][make_tuple(nodeInformation.ip,repTarget)] = 3; + //mergeFiles(string ip, string port, string handler, string filedest, string header, string toSend, string starts) + string sendMsg = hashRing->getValue(repTarget) + "::" + to_string(REPACK) + "::" + file + "::" + to_string(3) + "::" + file + "," + to_string(get<0>(fileSizes[file])) + "::"; + this->tcpServent->mergeMessages.push(sendMsg); + it++; } cout << "[MAPLE] ------------ complete ---------- " << endl; resetMapleJuice(); @@ -1410,6 +1453,43 @@ void Node::handleTcpMessage() } break; } + case REP: { + string sendMsg = inMsg[0] + "::" + to_string(REPACK) + "::" + inMsg[2] + "::" + inMsg[1] + "::" + inMsg[2] + "," + inMsg[3] + "::"; + this->tcpServent->mergeMessages.push(sendMsg); + break; + } + case REPACK: { + //senderIP, receiverHash, newreceiverHash, file, repfactor + if (isLeader){ + int repFactor = stoi(inMsg[4]) - 1; + int receiverHash = stoi(inMsg[1]); + string receiverIP = hashRing->getValue(receiverHash); + outputReplication[inMsg[3]].erase(make_tuple(inMsg[0],receiverHash)); + if (repFactor > 0) outputReplication[inMsg[3]][make_tuple(receiverIP,stoi(inMsg[2]))] = repFactor; + if (outputReplication[inMsg[3]].size() == 0) outputReplication.erase(inMsg[3]); + } + else{ //ip,file,size,rep + int repFactor = stoi(inMsg[3]) - 1; + int repTarget = 0; + updateFileList(inMsg[1], hashRingPosition); + fileSizes[inMsg[1]] = make_tuple(stoi(inMsg[2]), -1); + if (repFactor > 0){ + set<int> locations; + for (auto &e: fileList[inMsg[1]]) locations.insert(e); + repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + while (locations.count(repTarget)!=0) { + repTarget = hashRing->getRandomNode(tuple<int, int, int>(hashRingPosition, hashRingPosition, hashRingPosition)); + } + } + Messages outMsg(REPACK, inMsg[0] + "::" + to_string(hashRingPosition) + "::" + to_string(repTarget) +"::" + inMsg[1] + "::" + to_string(repFactor)); + this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + if (repFactor > 0){ + string sendMsg = hashRing->getValue(repTarget) + "::" + to_string(REPACK) + "::" + inMsg[1] + "::" + to_string(repFactor) + "::" + inMsg[1] + "," + inMsg[2] + "::"; + this->tcpServent->mergeMessages.push(sendMsg); + } + } + break; + } case REREPLICATEGET: { if (inMsg.size() >= 3) { int nodePosition = stoi(inMsg[0]); @@ -1812,6 +1892,8 @@ string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) void Node::resetMapleJuice(){ cleanupTmpFiles("tmp-"); + for (auto &e: outputReplication) e.second.clear(); + outputReplication.clear(); workerProcessing.clear(); workerTasks.clear(); mapleSending.clear(); diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 6ca3bee6fe4e6ccd90de8753687f7569dad3e57e..ff30bf72bb3d219f27b4e0b7f1d56359d845cb7e 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -244,12 +244,13 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP vector<string> filesAndSizes = splitString(payload, ","); int returnType = stoi(filesAndSizes[0]), processedCounter = 0; string returnTypeString = "", remoteLocalname = "", execfilename = "", mode = "wb"; - int dirSize = filesAndSizes.size(), fail = 0; + size_t dirSize = filesAndSizes.size(); + int fail = 0; size_t index = 3; int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE; vector<string> format; char c; - string preProcessed = ""; + string preProcessed = "", repFactor = ""; string filedest = filesAndSizes[1], processed = "", filename = "", sdfsfilename = ""; if (returnType == MAPLEACK) returnTypeString = "MAPLEMERGE"; @@ -270,6 +271,10 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP //cout << "[PUT] sdfs: " << sdfsfilename << ", remoteLocal: " << remoteLocalname << endl; index = 4; } + if (returnType == REPACK){ + returnTypeString = "REPACK"; + repFactor = filesAndSizes[2]; + } if (returnType == CHUNKACK) { returnTypeString = "CHUNK"; sdfsfilename = filesAndSizes[2]; //sdfs file @@ -363,6 +368,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP else if (returnType == JUICEACK){ Messages ack(JUICEACK, returnIP + "::" + preProcessed); regMessages.push(ack.toString()); } else if (returnType == CHUNKACK){ Messages ack(CHUNKACK, returnIP + "::" + execfilename + "::" + filesAndSizes[4] + "::" + filename + "::" + sdfsfilename); regMessages.push(ack.toString()); } else if (returnType == PUTACK){ Messages ack(PUTACK, returnIP + "::" + sdfsfilename + "::" + filename+ "::" + remoteLocalname); regMessages.push(ack.toString()); } + else if (returnType == REPACK && !fail){ Messages ack(REPACK, returnIP + "::" + filename + "::" + to_string(filesize) + "::" + repFactor); regMessages.push(ack.toString()); } else { cout << "[MERGE bad return type " << to_string(returnType) << endl;} break; } @@ -386,6 +392,8 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP case STARTMERGE: case MERGECOMPLETE: case MERGEFAIL: + case REPACK: + case REP: case DNS:{ //cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl; regMessages.push(payloadMessage); //handle from queue diff --git a/src/Threads.cpp b/src/Threads.cpp index 55a0f69ba0b343f051dd0c8e506e7eeb4ed26281..5400fb9fdb7c2126f32915fb2e41f76873e1444f 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -97,6 +97,8 @@ void *runSenderThread(void *node) //5b. check for regular TCP messages nodeOwn->handleTcpMessage(); + if (nodeOwn->isLeader) nodeOwn->debugOutputRep(); + // 6. check leader (If hashRing is sent via heartbeat, then we have a leader) if (!nodeOwn->checkLeaderExist()) { // If no leader nodeOwn->tcpElectionProcessor();