diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index 2697d2e3eb305a5bd0e2dcb08c886bc29816620a..bf49267557191c6bbb27b8a4267d3ea4da12659d 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -22,14 +22,22 @@ enum MessageType { DELETE, //leader handles deletion and disseminates info GETNULL, //sent if user tries to get a file from leader and it has 0 copies in the system REREPLICATE, - REREPLICATEGET + REREPLICATEGET, + MAPLESTART, //send to master to initiate maple phase + JUICESTART, //send to master to initiate juice phase + MAPLEACK, + CHUNK, //send to nodes so they have information about what kind of get request to send + APPEND, //process get request and send an append request, need to add this to all replicas too. Check into blackout stuff + APPENDACK, + CHUNKACK, //after append ack received, send this back to master to know when things are Done }; enum PayloadType { REGULAR=97, //start of actual message (membershipList) FILEPAIR, //start of filelist FILENAME, //start of filename - FILEPOSITIONS //start of comma seperated file positions string + FILEPOSITIONS, //start of comma seperated file positions string + FILESIZE }; enum LogType { diff --git a/inc/Node.h b/inc/Node.h index 3d359d69031ba0e49d0e489a7a5a9fc16b7f90bc..d72e1cf13e5347c8ccb2ab2d7f31737bcc46bbcd 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -33,6 +33,7 @@ using namespace std; #define N_b 5 // how many nodes GOSSIP want to use #define T_election 15 // in T_period #define T_switch 3 // in seconds +#define T_maples // lines to process // void *runUdpServer(void *udpSocket); @@ -74,6 +75,12 @@ public: map<string, tuple<string, string, string>> pendingSenderRequests; //? map<string, tuple<bool, bool, bool>> pendingRequestSent; //? + //master properties for MAPLEJUICE + map<string, vector<tuple<string, string, string>>> mapleProccessing; //ip -> [ (file, chunk_start, originIP) ] + map<string, tuple<long int, int>> fileSizes; //used so master can partition in the map phase tuple is (bytes, lines) + HashRing *mapleRing; + map<string, vector<tuple<string, string>>> mapleSending; //originIP -> (file, chunk_start); + Node(); Node(ModeType mode); diff --git a/inc/TcpSocket.h b/inc/TcpSocket.h index 0634131b41e6f2c6d624325c0219825ddc589b4b..6d2d07a67f861308409c23935c8ec72e86e6e98a 100644 --- a/inc/TcpSocket.h +++ b/inc/TcpSocket.h @@ -50,9 +50,11 @@ public: queue<string> qMessages; //election messages added to this queue queue<string> regMessages;//other messages added here queue<string> pendSendMessages;//keeps messages for the tcp client to send + queue<string> mapleMessages; //keeps track of sending void bindServer(string port); void sendFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename); + void sendLines(string ip, string port, string localfilename, int start, int end); void sendMessage(string ip, string port, string message); int messageHandler(int sockfd, string payloadMessage, string returnID); int createConnection(string ip, string port); diff --git a/inc/Utils.h b/inc/Utils.h index 26d1eafd348bef0e3e3b1f4f845baf84e56916b0..2bf32598626d2b9cc2d5128a7d3118fa30c2e37e 100644 --- a/inc/Utils.h +++ b/inc/Utils.h @@ -24,6 +24,8 @@ vector<string> splitString(string s, string delimiter); string getIP(); string getIP(const char * host); int new_thread_id(); +template<typename T> +vector<T> randItems(int numItems, vector<T> toChoose); bool isInVector(vector<int> v, int i); //adapted from https://stackoverflow.com/questions/23030267/custom-sorting-a-vector-of-tuples template<int M, template<typename> class F = std::less> diff --git a/src/Master.cpp b/src/Master.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d78d4cd545b1237d31740697e5100b839fe691f5 --- /dev/null +++ b/src/Master.cpp @@ -0,0 +1,3 @@ +#include "../inc/Node.h" + +//Introducer = Master of MapReduce diff --git a/src/Node.cpp b/src/Node.cpp index 523e9d6d713e0bbfb7b383aed29160d9b05cc205..8cb956288e0c496e82db2aa81a0a8f502437d24b 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -1,14 +1,18 @@ #include "../inc/Node.h" +//TODO, single file for each key, and ussing an append to each replica of it? +//keep a local map, if you find a new key, just let the master know about it and hold off on results in buffer until it exists Node::Node(){ udpServent = new UdpSocket(); tcpServent = new TcpSocket(); hashRing = new HashRing(); + mapleRing = new HashRing(); localTimestamp = 0; heartbeatCounter = 0; runningMode = ALL2ALL; activeRunning = false; prepareToSwitch = false; + masterIP = getIP(INTRODUCER); logWriter = new Logger(LOGGING_FILE_NAME); leaderPosition = -1; proposedTime = 0; @@ -142,6 +146,41 @@ int Node::failureDetection(){ fileList[element.first] = newEntry; } + ////////////////////////////////////////////////////// + //1) remove from HashRing + //2) if processing, reassign + //2a) if no extra nodes, assign to successor, else add new node to ring + //3) if a sender, reassign replica holders as new senders for each thing sent + vector<tuple<string,string>> aliveNodes; + for (auto &e : membershipList) aliveNodes.push_back(e.first); + vector<tuple<string,string,string>> mapleNodes; + int nextId; + if ((mapleRing->nodePositions.size()-1) == hashRing->nodePositions.size()){ + nextId = mapleRing->getSuccessor(deletedNodePostion); + } else { + while (1){ + mapleNodes = randItems(1, aliveNodes); + if (mapleRing->getValue(mapleNodes[0]).compare("No node found") != 0) continue; + break; + } + Member m(get<0>(mapleNodes[0]), get<1>(mapleNodes[0])); + mapleRing->addNode(get<0>(mapleNodes[0]), hashingId(m, get<2>(mapleNodes[0]))); + nextId = hashingId(m, get<2>(mapleNodes[0])) + } + mapleRing->removeNode(deletedNodePostion); + + auto vecCopy(mapleProcessing[get<0>(keyTuple)]); + mapleProcessing[nextId] = vecCopy; + mapleProcessing.erase(get<0>(keyTuple)); + + for (auto &e : mapleSending[get<0>(keyTuple)]){ + vector<string> temp = randItems(1, fileList[e.first]); + mapleSend[temp[0]] = make_tuple(e.first, e.second); + string mapleS = temp[0] + "::" + e.first + "::" + e.second + "::" + to_string(stoi(e.second) + T_maples) + "::"; + mapleMessages.push(mapleS); + } + + // chech if the failure is the sender in pending requests for (auto& senders: pendingSenderRequests) { string sdfsfilename = senders.first; @@ -729,18 +768,7 @@ vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() } switch (this->runningMode) { case GOSSIP: { - srand(time(NULL)); - // N_b is a predefined number - if (availableNodes <= N_b) return availableNodesInfo; - int nodeCount = 0; - while (nodeCount < N_b) { - int randomNum = rand() % availableNodes; - selectedNodesInfo.push_back(availableNodesInfo[indexList[randomNum]]); - indexList.erase(indexList.begin() + randomNum); - availableNodes--; - nodeCount++; - } - return selectedNodesInfo; + return randItems(N_b, availableNodesInfo); } //ALL2ALL default: { @@ -773,6 +801,100 @@ void Node::handleTcpMessage() Messages msg(msgType, payload); // cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { + case JUICESTART: { + if (mapleAssignments.size()) {tcpServent->regMessages.push(msg); break;} + //TODO + } + case MAPLESTART: { + //leader only function + //currently running something, dont start a new phase + if (mapleAssignments.size()) {tcpServent->regMessages.push(msg); break;} + if (inMsg.size() >= 4){ + string maple_exe = inMsg[0], num_maples = inMsg[1], sdfs_pre = inMsg[2], sdfs_dir = inMsg[3]; + int workers = stoi(num_maples); + if (workers > hashRing->nodePositions.size()-1) workers = hashRing->nodePositions.size()-1; + int total_lines = 0; + vector<tuple<string,int>> directory; + for (auto &e: fileSizes){ + if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){ + directory.push_back(make_tuple(e.first, get<1>(e.second))); + total_lines += get<1>(e.second); + } + } + vector<tuple<string,string>> aliveNodes; + for (auto &e : membershipList) aliveNodes.push_back(e.first); + vector<string> mapleNodes = randItems(workers, aliveNodes); + for (auto &e : mapleNodes) { + Member m(get<0>(e), get<1>(e)); + mapleRing->addNode(get<0>(e), hashingId(m, get<2>(e))); + } + int start = 0, id = 0; + string s; + for (auto &e: directory){ + start = 0; + while (start < e.second){ + s = e.first + "::" + to_string(start); + id = mapleRing->locateClosestNode(s); + vector<string> temp = randItems(1, fileList[e.first]); + mapleProccessing[mapleRing->getValue(id)].push_back(make_tuple(e.first, to_string(start), temp[0])); + mapleSending[temp[0]].push_back(make_tuple(e.first, to_string(start))); + string maplemsg = mapleRing->getValue(id) + "::" + s; + this->tcpServent->mapleMessages.push(maplemsg); + start += T_maples; + } + } + } + break; + } + + case CHUNK: { + vector<string> inMsg = splitString(msg.payload, "::"); + cout << "[CHUNK] " << "we will put sdfsfilename: " << msg.payload[1] << " from chunk: " << msg.payload[2]; + cout << " to node " << msg.payload[0] << endl; + string sendMsg = msg.payload + "::" + to_string(stoi(msg.payload[2]) + T_maples) + "::"; + this->tcpServent->pendSendMessages.push(sendMsg); + break; + } + + case CHUNKACK: { + vector<string> inMsg = splitString(msg.payload, "::"); + if (!isLeader) { + //forward to know that the file was put okay + this->tcpServent->sendMessage(leaderIP, TCPPORT, msg.toString()); + //TODO start PROCESSING !!!!! its in inMsg[3] for the file to process + //if processing success, send out TCP MAPLEACK + break; + } + vector<tuple<string,string>> temp; + for (auto &e : mapleSending[inMsg[0]]){ + if (e.first.compare(inMsg[1]) == 0){ + if (e.second.compare(inMsg[2]) == 0){ + continue; + } + } + temp.push_back(e); + } + if (temp.size()) mapleSending[inMsg[0]] = temp; + else mapleSending.erase(inMsg[0]); + break; + } + + case MAPLEACK: { + vector<string> inMsg = splitString(msg.payload, "::"); + vector<tuple<string,string,string>> temp; + for (auto &e : mapleProccessing[inMsg[0]]){ + if (e.first.compare(inMsg[1]) == 0){ + if (e.second.compare(inMsg[2]) == 0){ + continue; + } + } + temp.push_back(e); + } + if (temp.size()) mapleSending[inMsg[0]] = temp; + else mapleSending.erase(inMsg[0]); + break; + } + case PUTACK: { vector<string> inMsg = splitString(msg.payload, "::"); if(inMsg.size() >= 4){ @@ -793,6 +915,7 @@ void Node::handleTcpMessage() cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; localFilelist.erase(sdfsfilename); fileList.erase(sdfsfilename); + fileSizes.erase(sdfsfilename); // This is TCP, so we don't need to ACK } } @@ -813,6 +936,7 @@ void Node::handleTcpMessage() // the file is not available cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl; fileList.erase(sdfsfilename); + fileSizes.erase(sdfsfilename); Messages outMsg(GETNULL, sdfsfilename+": the file is not available::"); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); break; @@ -844,16 +968,18 @@ void Node::handleTcpMessage() // Check hashring, get positions and send out DNS ANS isBlackout = true; vector<string> inMsg = splitString(msg.payload, "::"); - if(inMsg.size() >= 4){ + if(inMsg.size() >= 6){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); string sdfsfilename = inMsg[2]; string localfilename = inMsg[3]; - + long int size = stol(inMsg[4]); + int lines = stoi(inMsg[5]); cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; // update fileList, client itself is one of the replicas updateFileList(sdfsfilename, nodePosition); + fileSizes[sdfsfilename] = make_tuple(size, lines); hashRing->debugHashRing(); int closestNode = hashRing->locateClosestNode(sdfsfilename); int pred = hashRing->getPredecessor(closestNode); @@ -910,7 +1036,6 @@ void Node::handleTcpMessage() cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); } break; } @@ -1088,31 +1213,49 @@ string Node::encapsulateFileList() } //cout << "sdfsfilename " << sdfsfilename << endl; //cout << "positions " << positions << endl; - char *cstr = new char[sdfsfilename.length()+positions.length()+7]; + string size = to_string(get<0>(fileSizes[sdfsfilename])) + "," + to_string(get<1>(fileSizes[sdfsfilename])); + char *cstr = new char[sdfsfilename.length()+positions.length()+size.length()+3+3+3+1]; size_t len = sdfsfilename.length()+3; - cstr[0] = len & 0xff; - cstr[1] = (len >> 8) & 0xff; - if (cstr[1] == 0) { // avoid null - cstr[1] = 0xff; + int index = 0; + cstr[index++] = len & 0xff; + cstr[index] = (len >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; } + index++; //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); - cstr[2] = FILENAME; + cstr[index++] = FILENAME; for (uint i=0; i<sdfsfilename.length(); i++) { - cstr[i+3] = sdfsfilename.c_str()[i]; + cstr[index+i] = sdfsfilename.c_str()[i]; } + index += sdfsfilename.length(); size_t len2 = positions.length()+3; - cstr[sdfsfilename.length()+3] = len2 & 0xff; - cstr[sdfsfilename.length()+4] = (len2 >> 8) & 0xff; - if (cstr[sdfsfilename.length()+4] == 0) { // avoid null - cstr[sdfsfilename.length()+4] = 0xff; + cstr[index++] = len2 & 0xff; + cstr[index] = (len2 >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; } + index++; //printf("cstr[3] %x, cstr[4] %x\n", cstr[0], cstr[1]); - cstr[sdfsfilename.length()+5] = FILEPOSITIONS; + cstr[index++] = FILEPOSITIONS; //printf("cstr[%lu] %d\n", sdfsfilename.length()+2, cstr[sdfsfilename.length()+2]); for (uint i=0; i<positions.length(); i++) { - cstr[sdfsfilename.length()+6+i] = positions.c_str()[i]; + cstr[index+i] = positions.c_str()[i]; } - cstr[sdfsfilename.length()+positions.length()+6] = '\0'; + index += positions.length(); + size_t len3 = size.length()+3; + cstr[index++] = len3 & 0xff; + cstr[index] = (len3 >> 8) & 0xff; + if (cstr[index] == 0) { // avoid null + cstr[index] = 0xff; + } + index++; + cstr[index++] = FILESIZE; + for (uint i=0; i<size.length(); i++) { + cstr[index+i] = size.c_str()[i]; + } + index += size.length(); + cstr[index] = '\0'; //printf("cstrFile %s\n", cstr); string enMegFile(cstr); //cout << "enMegFile " << enMegFile << endl; @@ -1162,6 +1305,7 @@ void Node::decapsulateFileList(string payload) int size = payload.length(); uint pos = 0; fileList.clear(); + fileSizes.clear(); string lastFilename = ""; while (size > 0) { size_t length; @@ -1198,6 +1342,11 @@ void Node::decapsulateFileList(string payload) fileList[lastFilename] = positions; break; } + case FILESIZE: { + vector<string> temp = splitString(deMegPart, ","); + fileSizes[lastFilename] = make_tuple(stol(temp[0]),stoi(temp[1])); + break; + } default: break; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index a833e093500cc2bbf34d5203765f29fdc9bab0c8..60a5f10d3090f18394aa3dcfe4d9cb22db5939af 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -129,6 +129,32 @@ void TcpSocket::sendFile(string ip, string port, close(sockfd); } +void TcpSocket::sendLines(string ip, string port, string localfilename, int start, int end) +{ + int sockfd = 0, lineCounter = -1; + if ((sockfd = createConnection(ip, port)) == -1) return; + // send lines and filename first + string toSend = localfilename + "," + to_string(start) + "," + localfilename + to_string(start) + "temp"; + Messages msg(PUT, toSend); + string payload = msg.toString(); + if (send(sockfd, payload.c_str(), strlen(payload.c_str()), 0) == -1) { + perror("send"); + } + sleep(1); + ifstream file(localfilename.c_str()); + string str; + while (std::getline(file, str)) + { + lineCounter++; + if (lineCounter < start) continue; + if (lineCounter >= end) break; + if (send(sockfd, str.c_str(), strlen(str.c_str()), 0) == -1) { + perror("send"); + } + } + close(sockfd); +} + void TcpSocket::sendMessage(string ip, string port, string message) { int sockfd; @@ -186,22 +212,27 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP string sdfsfilename = "", incomingChecksum = "", remoteLocalname = "", overwriteFilename = ""; // format: size,checksum,sdfsfilename vector<string> fields = splitString(msg.payload, ","); + int start = -1; if (fields.size() >= 5) { filesize = stoi(fields[0]); incomingChecksum = fields[1]; sdfsfilename = fields[2]; remoteLocalname = fields[3]; overwriteFilename = fields[4]; + cout << "file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; + time_t fileTimestamp; + time(&fileTimestamp); + string localfilename = sdfsfilename+"_"+to_string(fileTimestamp); + if (overwriteFilename.compare("") != 0) { + localfilename = overwriteFilename; + cout << "it's GET with filename " << overwriteFilename << endl; + } + cout << "backup filename " << localfilename << endl; + } else { + localfilename = fields[2]; + sdfsfilename = fields[0]; + start = fields[1]; } - cout << "file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; - time_t fileTimestamp; - time(&fileTimestamp); - string localfilename = sdfsfilename+"_"+to_string(fileTimestamp); - if (overwriteFilename.compare("") != 0) { - localfilename = overwriteFilename; - cout << "it's GET with filename " << overwriteFilename << endl; - } - cout << "backup filename " << localfilename << endl; fp = fopen(localfilename.c_str(), "wb"); if (fp == NULL) { cout << "file error" << endl; @@ -222,14 +253,16 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP fclose(fp); FileObject f(localfilename); - if(incomingChecksum.compare(f.checksum) != 0){ + if(incomingChecksum.compare(f.checksum) != 0 && incomingChecksum.compare("") != 0){ cout << "[ERROR] FILE CORRUPTED" << endl; // TODO: Handel file corruption here } else { - Messages putack(PUTACK, returnIP + "::" + sdfsfilename + "::" + localfilename+"::"+remoteLocalname); + if (start != -1){ + Messages putack(CHUNKACK, returnIP + "::" + sdfsfilename + "::" + start + "::" + localfilename); + } + else Messages putack(PUTACK, returnIP + "::" + sdfsfilename + "::" + localfilename+"::"+remoteLocalname); regMessages.push(putack.toString()); } - break; } case DNSANS: diff --git a/src/Threads.cpp b/src/Threads.cpp index ede43630881476a4f57c603bbb3267d4ff54e9b8..0a62bc207f3c747d64749ada99cca3b57cde0d55 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -21,6 +21,12 @@ void *runTcpSender(void *tcpSocket) TcpSocket* tcp; tcp = (TcpSocket*) tcpSocket; while (1) { + while (!tcp->mapleMessages.empty()) { + vector<string> msgSplit = splitString(tcp->mapleMessages.front(), "::"); + Message msg(CHUNK, tcp->mapleMessages.front()); + tcp->sendMessage(msgSplit[0], TCPPORT, msg); + tcp->pendSendMessages.pop(); + } while (!tcp->pendSendMessages.empty()) { vector<string> msgSplit = splitString(tcp->pendSendMessages.front(), "::"); if (msgSplit.size() >= 4) { @@ -28,7 +34,12 @@ void *runTcpSender(void *tcpSocket) string sdfsfilename = msgSplit[2], remoteLocalfilename = msgSplit[3]; cout << "[DOSEND] nodeIP " << nodeIP << ", localfilename " << localfilename; cout << ", sdfsfilename " << sdfsfilename << ", remoteLocalfilename " << remoteLocalfilename << endl; - tcp->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); + if (msgSplit.size() == 5){ + start = stoi(msgSplit[2]); + end = stoi(msgSplit[3]); + tcp->sendLines(nodeIP, TCPPORT, localfilename, start, end); + } + else tcp->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); } tcp->pendSendMessages.pop(); } @@ -43,7 +54,7 @@ void *runSenderThread(void *node) nodeOwn->activeRunning = true; // heartbeat to introducer to join the system - Member introducer(getIP(INTRODUCER), PORT); + Member introducer(node->masterIP, PORT); nodeOwn->joinSystem(introducer); while (nodeOwn->activeRunning) { diff --git a/src/Utils.cpp b/src/Utils.cpp index e42735764e0cc111b52cdf46da197e29b37a4549..b768a11d812f5862b5f36c41cdf05e36b220b0d2 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -46,3 +46,22 @@ bool isInVector(vector<int> v, int i){ } return false; } + +template<typename T> +vector<T> randItems(int numItems, vector<T> toChoose){ + srand(time(NULL)); + vector<int> availableNodesInfo(toChoose); + vector<int> indexList; + int availableNodes = availableNodesInfo.size(); + for (int i = 0; i < availableNodes; i++) indexList.push_back(i); + if (availableNodes <= toChoose) return availableNodesInfo; + int nodeCount = 0; + while (nodeCount < N_b) { + int randomNum = rand() % availableNodes; + selectedNodesInfo.push_back(availableNodesInfo[indexList[randomNum]]); + indexList.erase(indexList.begin() + randomNum); + availableNodes--; + nodeCount++; + } + return availableNodesInfo; +} diff --git a/src/main.cpp b/src/main.cpp index 1e155624c8959117429eb6e95d6b3e619d46a159..dac0665644624e18be696d2bd96c30c2ce8b7b9e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -37,6 +37,7 @@ int main(int argc, char *argv[]) exit(-1); } node->localFilelist.clear(); // for testing + node->fileSize.clear(); /*node->localFilelist["sdfsfilename1"] = "localfilename1"; node->localFilelist["sdfsfilename2"] = "localfilename2";*/ @@ -90,13 +91,22 @@ int main(int argc, char *argv[]) if (!node->isBlackout) { string localfilename = cmdLineInput[1]; string sdfsfilename = cmdLineInput[2]; - Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); - cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; - if (access(localfilename.c_str(), F_OK) != -1) { - node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); - } else { - cout << "[PUT] The file " << localfilename << " is not existed" << endl; + fp = fopen(localfilename.c_str(), "rb"); + if (fp == NULL) { + cout << "[PUT] The file " << localfilename << " does not exist" << endl; + continue; } + fseek(fp, 0, SEEK_END); + long int size = ftell(fp); + fseek(fp, 0, SEEK_SET); + fclose(fp); + int number_of_lines = 0; + string line; + ifstream myfile(localfilename.c_str()); + while (getline(myfile, line)) ++number_of_lines; + Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename + "::" + to_string(size) + "::" + to_string(number_of_lines)); + cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; + node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); } else { cout << "[BLACKOUT] Leader cannot accept the request" << endl; } @@ -120,7 +130,6 @@ int main(int argc, char *argv[]) continue; } } - Messages outMsg(DNSGET, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); cout << "[GET] Got sdfsfilename: " << sdfsfilename << " with localfilename: " << localfilename << endl; node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); @@ -152,6 +161,45 @@ int main(int argc, char *argv[]) node->listLocalFiles(); } else if (cmd == "lsall"){ debugSDFSFileList(node); + } else if (cmd == "maple" && joined){ + if(cmdLineInput.size() < 5){ + cout << "USAGE: maple maple_exe num_maples sdfs_intermediate_dir sdfs_src_dir" << endl; + continue; + } + if (FILE *file = fopen(cmdLineInput[1].c_str(), "r")) { + fclose(file); + } else { + cout << "[MAPLE] " << cmdLineInput[1] << " does not exist locally" << endl; + continue; + } + if (!node->isBlackout){ + string msg = cmdLineInput[1] + "," + cmdLineInput[2] + "," + cmdLineInput[3] + "," + cmdLineInput[4] + "\n"; + Messages outMsg(MAPLESTART, msg); + cout << "[MAPLE] forwarding request to " << node->leaderIP << endl; + node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); + } else { + cout << "[BLACKOUT] Leader cannot accept the request" << endl; + } + } else if (cmd == "juice" && joined){ + if(cmdLineInput.size() < 6){ + cout << "USAGE: juice juice_exe num_juices sdfs_intermediate_dir sdfs_out_file delete={0,1}" << endl; + continue; + } + if (FILE *file = fopen(cmdLineInput[1].c_str(), "r")) { + fclose(file); + } else { + cout << "[JUICE] " << cmdLineInput[1] << " does not exist locally" << endl; + continue; + } + if (!node->isBlackout){ + //TODO + } else { + cout << "[BLACKOUT] Leader cannot accept the request" << endl; + } + } else if (cmd == "ip"){ + cout << getIP() << endl; + } else if (cmd == "leader"){ + cout << "Leader IP " << node->leaderIP << endl; } else { cout << "[join] join to a group via fixed introducer" << endl; cout << "[leave] leave the group" << endl;