diff --git a/Wc b/Wc index eefd2132ef5c2f6c15ef8e0203d069a5b54d6c83..39b695f56e837a420befc3d41d00cc4a68a0e147 100755 Binary files a/Wc and b/Wc differ diff --git a/inc/FileObject.h b/inc/FileObject.h index 059190e046634b265f07fe2581f4770a58d2863d..ca0be7254b5dad71f3b93f1af32391297f69b104 100644 --- a/inc/FileObject.h +++ b/inc/FileObject.h @@ -16,6 +16,7 @@ using namespace std; string getMostRecentFile(string readfile); +void cleanupTmpFiles(); class FileObject { public: diff --git a/inc/Utils.h b/inc/Utils.h index 1b0029570a00aeefbc593302f0c32f78316e4359..2bab74708d6c17b83d8e58f692e3d688685950fd 100644 --- a/inc/Utils.h +++ b/inc/Utils.h @@ -33,9 +33,8 @@ vector<string> splitString(string s, string delimiter); string getIP(); string getIP(const char * host); int new_thread_id(); -void handlePipe(int file, string prefix); +void handlePipe(int file); bool isInVector(vector<int> v, int i); - void sigchld_handler(int s); //adapted from https://stackoverflow.com/questions/23030267/custom-sorting-a-vector-of-tuples diff --git a/src/FileObject.cpp b/src/FileObject.cpp index 00ef92aac36ff22cc9f7fe7cc9e9f899b0ca5764..5042a49045275aff0bfd730093cc559415d44a31 100644 --- a/src/FileObject.cpp +++ b/src/FileObject.cpp @@ -37,3 +37,16 @@ string getMostRecentFile(string readfile){ sort(fileVersions.begin(), fileVersions.end()); return fileVersions[fileVersions.size()-1]; } + +void cleanupTmpFiles(){ + struct dirent *entry = nullptr; + DIR *dp = nullptr; + string match = "tmp-"; + int matchLen = match.size(); + if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} + while ((entry = readdir(dp))){ + if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){ + remove(entry->d_name); + } + } +} diff --git a/src/Node.cpp b/src/Node.cpp index cd24e166a54560f08bac59866ec99994bd3c7b29..5efec0267887973febf393026d292ff5091cbae8 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -747,7 +747,7 @@ void Node::checkFileListConsistency(){ Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); tuple<int, int, int> request = pendingRequests[element.first]; if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ - cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; + //cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; break; } pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); @@ -823,6 +823,7 @@ void Node::handleTcpMessage() //currently running something, dont start a new phase if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); cout << "[MAPLE] already mapling" << endl; break;} cout << "[MAPLE] Leader starting new Maple phase" << endl; + cleanupTmpFiles(); if (inMsg.size() >= 4){ string mapleExe = inMsg[0], num_maples = inMsg[1], sdfsPre = inMsg[2], sdfs_dir = inMsg[3] + "-"; int workers = stoi(num_maples); @@ -830,17 +831,16 @@ void Node::handleTcpMessage() if (workers > hashRing->nodePositions.size()-2) workers = hashRing->nodePositions.size()-2; int total_lines = 0; vector<tuple<string,int>> directory; - cout << "[DIRECTORY] " << sdfs_dir << endl; + cout << "[DIRECTORY] " << sdfs_dir; for (auto &e: fileSizes){ - cout << e.first << " | " << to_string(get<1>(e.second)); + cout << e.first << " | " << to_string(get<1>(e.second)) << " "; if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){ cout << " was a match "; directory.push_back(make_tuple(e.first, get<1>(e.second))); total_lines += get<1>(e.second); } - cout << endl; } - cout << "[MAPLE] need to process " << to_string(total_lines) << endl; + cout << endl << "[MAPLE] need to process " << to_string(total_lines) << endl; vector<tuple<string,string,string>> aliveNodes; for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first); vector<tuple<string,string,string>> mapleNodes = randItems(workers, aliveNodes); @@ -892,7 +892,7 @@ void Node::handleTcpMessage() pid_t pid = fork(); if (pid){ //parent process, DONT need to waitpid because of signal handler set up close(dataPipe[1]); - handlePipe(dataPipe[0], sdfsPre); + handlePipe(dataPipe[0]); } else if (pid < 0) { fprintf (stderr, "Fork failed.\n"); break; } else { //child process @@ -1038,8 +1038,8 @@ void Node::handleTcpMessage() int lines = stoi(inMsg[5]); string overwriteFilename = inMsg[6]; string overwrite = inMsg[7]; - cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; - cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; + //cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; + //cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; // update fileList, client itself is one of the replicas updateFileList(sdfsfilename, nodePosition); fileSizes[sdfsfilename] = make_tuple(size, lines); @@ -1078,8 +1078,8 @@ void Node::handleTcpMessage() // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); //cout << "nodeIP " << nodeIP << endl; - cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; - cout << " using localfilename: " << inMsg[1] << endl; + //cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; + //cout << " using localfilename: " << inMsg[1] << endl; string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]+"::"+inMsg[4]; this->tcpServent->pendSendMessages.push(sendMsg); } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 863cb4aed2bfdbddb5d81d8dbf4fbc0fef8b39e3..277bd031719cb61158e36cc3fd1e68de2e7f8bde 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -293,7 +293,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP case CHUNK: case CHUNKACK: case DNS:{ - cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl; + //cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl; regMessages.push(payloadMessage); //handle from queue break; } diff --git a/src/Utils.cpp b/src/Utils.cpp index dbcdc7f5db1d505ffe17c59bafccbfcd0156c310..5c7cd89d6ec10795389807584b14ad83eae55a45 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -50,9 +50,9 @@ bool isInVector(vector<int> v, int i){ return false; } -void handlePipe(int file, string prefix) { +void handlePipe(int file) { size_t bufSize = 1024; - cout << "[PIPE] sleeping for data. " << " Prefix: " << prefix << endl; + cout << "[PIPE] sleeping for data. " << endl; sleep(5); FILE *stream = fdopen(file, "r"); FILE *tmp; char str[bufSize]; @@ -62,7 +62,7 @@ void handlePipe(int file, string prefix) { lines++; std::string key(strtok(str, delim)); std::string val(strtok(NULL, delim)); - string keyFile = "tmp-" + prefix + "-" + key; + string keyFile = "tmp-" + key; string write = key + "," + val + "\n"; tmp = fopen(keyFile.c_str(), "ab"); fwrite(write.c_str(),sizeof(char),write.size(),tmp);