diff --git a/Wc b/Wc index 3c1fead01a06843bfc0429b84a8b5916a3292d22..c7dcfbb588800efe9e9e4acf9f0bba8e307328d6 100755 Binary files a/Wc and b/Wc differ diff --git a/inc/FileObject.h b/inc/FileObject.h index ca0be7254b5dad71f3b93f1af32391297f69b104..cc4eb6ec1839b49a2252cdd3e113bdb5314c54ca 100644 --- a/inc/FileObject.h +++ b/inc/FileObject.h @@ -16,7 +16,7 @@ using namespace std; string getMostRecentFile(string readfile); -void cleanupTmpFiles(); +void cleanupTmpFiles(string match); class FileObject { public: diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index e3deb01d81add885c5a0e29b3d3972cc8b772323..63368e5c153909f6189956e31293413ad7caee4f 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -30,6 +30,10 @@ enum MessageType { MAPLEACK, CHUNK, //send to nodes so they have information about what kind of get request to send CHUNKACK, //after append ack received, send this back to master to know when things are Done + MERGE, + STARTMERGE, + MERGECOMPLETE, + MERGEFAIL }; enum PayloadType { diff --git a/inc/Node.h b/inc/Node.h index 7d8d123a22be7907fe5beb54c1d579f32e9293d3..b43e94b394a07a57d6d725f952c658834992f9eb 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -5,13 +5,14 @@ #include <string> #include <vector> #include <map> +#include <set> #include <pthread.h> #include <time.h> #include <signal.h> #include <unistd.h> #include <dirent.h> #include <sys/types.h> - +#include <fstream> #include <stdio.h> #include <sys/stat.h> #include <unistd.h> @@ -83,7 +84,8 @@ public: map<string, tuple<bool, bool, bool>> pendingRequestSent; //? //master properties for MAPLEJUICE - map<string, vector<tuple<string, string, string>>> mapleProcessing; //ip -> [ (file, chunk_start, originIP) ] + map<string, vector<tuple<string, string>>> mapleProcessing; //ip -> [ (file, chunk_start) ] + map<string, set<tuple<string,string>>> workerTasks; //above is static, this removes tasks when done map<string, tuple<long int, int>> fileSizes; //used so master can partition in the map phase tuple is (bytes, lines) HashRing *mapleRing; map<string, vector<tuple<string, string>>> mapleSending; //originIP -> (file, chunk_start); diff --git a/inc/TcpSocket.h b/inc/TcpSocket.h index 69c9a2ba76a0b929f165c208d35dd876128fabaf..076f8aa5f8f421b0d2d0156505743f4fe291e328 100644 --- a/inc/TcpSocket.h +++ b/inc/TcpSocket.h @@ -51,15 +51,19 @@ public: queue<string> regMessages;//other messages added here queue<string> pendSendMessages;//keeps messages for the tcp client to send queue<string> mapleMessages; //keeps track of sending + queue<string> mergeMessages; //keeps track of sending void bindServer(string port); - void sendFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename, string overwrite); + void sendFile(string ip, string port, FILE * fp, int size); + void putFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename); + void putDirectory(string ip, string port); //put everything in tmp directory void sendLines(string ip, string port, string execFile, string localFile, string prefix, int start, int end); void sendMessage(string ip, string port, string message); int messageHandler(int sockfd, string payloadMessage, string returnID); int createConnection(string ip, string port); TcpSocket(); private: - string getFileMetadata(int size, string checksum, string sdfsfilename, string localfilename, string remoteLocalfilename, string overwrite); + string getFileMetadata(int size, string checksum, string sdfsfilename, string localfilename, string remoteLocalfilename); + string getDirMetadata(); }; #endif //TCPSOCKET_H diff --git a/mappers/wc.cpp b/mappers/wc.cpp index 6a8535ee81159f92601e7742e8d4a544b8470d0e..e8b2d6ef36196ead19cb9255f59477b1d65c55a3 100644 --- a/mappers/wc.cpp +++ b/mappers/wc.cpp @@ -9,7 +9,7 @@ int main(int argc, char **argv) { while (std::getline(file, str)) { for (size_t i = 0; i < str.size(); i++) { - if (str[i] == '.' || str[i] == ',' || str[i] == '?' || str[i] == ';' || str[i] == '!') str[i] = ' '; + if (str[i] == '.' || str[i] == ',' || str[i] == '?' || str[i] == ';' || str[i] == '!' || str[i] == '-') str[i] = ' '; } std::transform(str.begin(), str.end(), str.begin(), ::tolower); std::vector<std::string> temp = splitString(str, delim); diff --git a/src/FileObject.cpp b/src/FileObject.cpp index 5042a49045275aff0bfd730093cc559415d44a31..209f5df4ed1e60b43da01ed28924ce634d442e74 100644 --- a/src/FileObject.cpp +++ b/src/FileObject.cpp @@ -28,20 +28,20 @@ string getMostRecentFile(string readfile){ DIR *dp = nullptr; int matchLen = readfile.size(); vector<string> fileVersions; - if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl; return ""; } + if ((dp = opendir(".")) == nullptr) { cout << "temp directory error " << endl; closedir(dp); return ""; } while ((entry = readdir(dp))){ if (strncmp(entry->d_name, readfile.c_str(), matchLen) == 0){ fileVersions.push_back(entry->d_name); } } sort(fileVersions.begin(), fileVersions.end()); + closedir(dp); return fileVersions[fileVersions.size()-1]; } -void cleanupTmpFiles(){ +void cleanupTmpFiles(string match){ struct dirent *entry = nullptr; DIR *dp = nullptr; - string match = "tmp-"; int matchLen = match.size(); if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} while ((entry = readdir(dp))){ @@ -49,4 +49,5 @@ void cleanupTmpFiles(){ remove(entry->d_name); } } + closedir(dp); } diff --git a/src/Node.cpp b/src/Node.cpp index d603037cfb6434e4febb58de4bf468b830c0d071..2905d03fa8c8f2d310304ea1e589bdd3d21c1075 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -178,7 +178,9 @@ int Node::failureDetection(){ auto vecCopy(mapleProcessing[get<0>(keyTuple)]); mapleProcessing[get<0>(mapleNodes[0])] = vecCopy; + for (auto el : vecCopy) workerTasks[get<0>(mapleNodes[0])].insert(el); mapleProcessing.erase(get<0>(keyTuple)); + workerTasks.erase(get<0>(keyTuple)); for (auto &e : mapleSending[get<0>(keyTuple)]){ vector<int> temp = randItems(1, fileList[get<0>(e)]); @@ -810,15 +812,16 @@ void Node::handleTcpMessage() // cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { case JUICESTART: { - if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); break;} - //TODO + if (workerTasks.size()) {tcpServent->regMessages.push(msg.toString()); break;} + //TODO JUICESTART + //tell all nodes they can delete tmp files. } case MAPLESTART: { //leader only function //currently running something, dont start a new phase - if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); cout << "[MAPLE] already mapling" << endl; break;} + if (workerTasks.size()) {tcpServent->regMessages.push(msg.toString()); cout << "[MAPLE] already mapling" << endl; break;} cout << "[MAPLE] Leader starting new Maple phase" << endl; - cleanupTmpFiles(); + cleanupTmpFiles("tmp-"); if (inMsg.size() >= 4){ string mapleExe = inMsg[0], num_maples = inMsg[1], sdfsPre = inMsg[2], sdfs_dir = inMsg[3] + "-"; int workers = stoi(num_maples); @@ -860,7 +863,8 @@ void Node::handleTcpMessage() vector<int> temp = randItems(1, fileList[file]); string sender = hashRing->getValue(temp[0]); //because files are part of sdfs anyone can be the sender string processor = mapleRing->getValue(id); //processor is a maple worker - mapleProcessing[processor].push_back(make_tuple(file, to_string(start), sender)); + mapleProcessing[processor].push_back(make_tuple(file, to_string(start))); + workerTasks[processor].insert(make_tuple(file, to_string(start))); cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl; mapleSending[sender].push_back(make_tuple(file, to_string(start))); string maplemsg = sender + "::" + processor + "::" +mapleExe + "::" + s + "::" + sdfsPre; @@ -896,7 +900,7 @@ void Node::handleTcpMessage() int dataPipe[2]; if (pipe(dataPipe)){ fprintf (stderr, "Pipe failed.\n"); break; } pid_t pid = fork(); - if (pid){ //parent process, DONT need to waitpid because of signal handler set up + if (pid){ close(dataPipe[1]); handlePipe(dataPipe[0]); } else if (pid < 0) { @@ -912,37 +916,6 @@ void Node::handleTcpMessage() int status; waitpid(pid, &status, 0); close(dataPipe[0]);close(dataPipe[1]); - //go thorugh and process things from datapipe - //if processing success, send out TCP MAPLEACK - string match = "tmp-"; - int matchLen = match.size(); - struct dirent *entry = nullptr; - DIR *dp = nullptr; - if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << match << endl; break; } - while ((entry = readdir(dp))){ - //cout << "[FILES] found " << entry->d_name << " looking to match " << to_string(matchLen) << " chars from " << match << endl; - if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){ - vector<string> tempVec = splitString(entry->d_name, "-"); - if (tempVec.size() != 2) continue; //temp keys in form tmp-key - string keyFile = inMsg[5] + "-" + tempVec[tempVec.size()-1]; - cout << "[CHUNKACK] transform from: " << entry->d_name << " to " << keyFile << endl; - - /* TODO - * Need to change up how key comnbining works (also need master to keep track of all keys it sees for the next phase) - * 1) Master keeps track of the status of processing node assignments - * 2) on failure, instead of partial re-mapping, the entire node's assignments get redone - * 3) No longer merge temp files after assignment, instead just send the MAPLEACK - * 4) When the master marks all of a nodes assignments as done, send a message telling it to start merging - * 4b) may need to modify from PUT to a dedicated merge message - * 5) on success, notify the master and only then can the master remove the worker node from map of remaining tasks - */ - Messages outMsg(DNS, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + keyFile + "::" + entry->d_name + "::" + to_string(-1) + "::" + to_string(-1) + "::" + keyFile + "::" + "0"); - - //cout << "[PUT] Got localfilename: " << entry->d_name << " with sdfsfilename: " << target << endl; - tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); - } - } - closedir(dp); string ackStr = inMsg[0] + "::" + inMsg[4] + "::" + inMsg[2]; //IP, file, chunk Messages ackMsg(MAPLEACK, ackStr); tcpServent->sendMessage(leaderIP, TCPPORT, ackMsg.toString()); @@ -964,17 +937,65 @@ void Node::handleTcpMessage() } case MAPLEACK: { - vector<tuple<string,string,string>> temp; - for (auto &e : mapleProcessing[inMsg[0]]){ + vector<tuple<string,string>> temp; + for (auto &e : workerTasks[inMsg[0]]){ if (get<0>(e).compare(inMsg[1]) == 0){ if (get<1>(e).compare(inMsg[2]) == 0){ - continue; + temp.push_back(e); } } - temp.push_back(e); } - if (temp.size()) mapleProcessing[inMsg[0]] = temp; - else mapleProcessing.erase(inMsg[0]); + for (auto &e : temp) workerTasks[inMsg[0]].erase(e); + if (!workerTasks[inMsg[0]].size()) { + Messages outMsg(STARTMERGE, ""); + this->tcpServent->sendMessage(inMsg[0], TCPPORT, outMsg.toString()); + } + break; + } + + case STARTMERGE: { + string sendMsg = hashRing->getValue(leaderPosition) + "::" + TCPPORT; + this->tcpServent->mergeMessages.push(sendMsg); + } + + case MERGECOMPLETE: { + workerTasks.erase(inMsg[0]); + mapleProcessing.erase(inMsg[0]); + //actually merge files in + struct dirent *entry = nullptr; + DIR *dp = nullptr; + string match = "tmp-" + inMsg[0] + "-"; + int matchLen = match.size(); + if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} + cout << "[MERGECOMPLETE] processing files matching " << match << " and replacing with prefix: " << sdfsPre << endl; + while ((entry = readdir(dp))){ + if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){ + string entryName(entry->d_name); + string mapleOutput = sdfsPre + "-" + entryName.substr(matchLen); + ofstream keyFile; + keyFile.open(mapleOutput, ios::app); + ifstream toMerge(entry->d_name); + if (!toMerge.is_open() || !keyFile.is_open()) { + cout << "bad file permissions for " << entry->d_name << " and/or " << mapleOutput << endl; + break; + } + keyFile << toMerge.rdbuf(); + keyFile.close(); + } + } + + if (!mapleProcessing.size()) { //start replication of key files + + } + + break; + } + + //because TCP if we get a fail, we know that the node failed + //so re-requesting the files to merge will be taken care of in failureDetection() + case MERGEFAIL: { + string tmpFiles = "tmp-" + inMsg[0] + "-"; + cleanupTmpFiles(tmpFiles); break; } @@ -1047,7 +1068,7 @@ void Node::handleTcpMessage() if(isLeader){ // Check hashring, get positions and send out DNS ANS isBlackout = true; - if(inMsg.size() >= 8){ + if(inMsg.size() >= 7){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); string sdfsfilename = inMsg[2]; @@ -1055,7 +1076,6 @@ void Node::handleTcpMessage() long int size = stol(inMsg[4]); int lines = stoi(inMsg[5]); string overwriteFilename = inMsg[6]; - string overwrite = inMsg[7]; //cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; //cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; // update fileList, client itself is one of the replicas @@ -1082,7 +1102,7 @@ void Node::handleTcpMessage() pendingRequests[sdfsfilename] = tuple<int, int, int>(closestNode, pred, succ); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, false, false); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(inMsgIP, "", ""); - Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename + "::" + overwriteFilename + "::" + overwrite); + Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename + "::" + overwriteFilename); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); } } @@ -1091,14 +1111,14 @@ void Node::handleTcpMessage() } case DNSANS:{ // Read the answer and send a PUT msg to dest - if(inMsg.size() >= 5){ + if(inMsg.size() >= 4){ int nodePosition = stoi(inMsg[0]); // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); //cout << "nodeIP " << nodeIP << endl; //cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; //cout << " using localfilename: " << inMsg[1] << endl; - string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]+"::"+inMsg[4]; + string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]; this->tcpServent->pendSendMessages.push(sendMsg); } break; @@ -1113,7 +1133,7 @@ void Node::handleTcpMessage() string localfilename = this->localFilelist[sdfsfilename]; cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; - string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename+"::"+"1"; + string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename; this->tcpServent->pendSendMessages.push(sendMsg); } break; @@ -1128,9 +1148,8 @@ void Node::handleTcpMessage() string localfilename = this->localFilelist[sdfsfilename]; cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; - string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+"::"+"1"; + string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, ""); } break; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 22b1d8d4401a397082ee3e312f9652d59eefe5ad..c923da1cbae3fe32febb9c84cef0df457c05274c 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -85,39 +85,89 @@ void TcpSocket::bindServer(string port) } string TcpSocket::getFileMetadata(int size, string checksum, - string sdfsfilename, string localfilename, string remoteLocalfilename, string overwrite) + string sdfsfilename, string localfilename, string remoteLocalfilename) { // format: size,checksum,sdfsfilename - string msg = to_string(size) + "," + checksum + "," + sdfsfilename+","+localfilename+","+remoteLocalfilename+","+overwrite; + string msg = to_string(size) + "," + checksum + "," + sdfsfilename+","+localfilename+","+remoteLocalfilename; return msg; } -void TcpSocket::sendFile(string ip, string port, - string localfilename, string sdfsfilename, string remoteLocalfilename, string overwrite) +string TcpSocket::getDirMetadata() { - int numbytes, sockfd; - char buf[DEFAULT_TCP_BLKSIZE]; - FILE *fp; - int size = 0, sendSize = 0; - bzero(buf, sizeof(buf)); - if ((sockfd = createConnection(ip, port)) == -1) return; - fp = fopen(localfilename.c_str(), "rb"); + struct dirent *entry = nullptr; + DIR *dp = nullptr; + FILE * fp; + string match = "tmp-"; + int matchLen = match.size(); + vector<string> split; + int size = 0; + string msg; + if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;} + while ((entry = readdir(dp))){ + if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){ + split.clear(); + split = splitString(entry->d_name, "-"); + if (split.size() > 2) continue; + fp = fopen(entry->d_name, "rb"); + if (fp == NULL) { + printf("Could not open file to send."); + continue; + } + fseek(fp, 0, SEEK_END); + size = ftell(fp); + fseek(fp, 0, SEEK_SET); + if (msg.size()) msg += ","; + msg += split[1]; + msg += to_string(size); + fclose(fp); + } + } + closedir(dp); + return msg; +} + +void TcpSocket::putDirectory(string ip, string port) { + string toSend = getDirMetadata(); + if (!toSend.size()) return; + Messages msg(MERGE, toSend); + sendMessage(ip, port, msg.toString()); + vector<string> toProcess = splitString(toSend, ","); + FILE * fp; + int index = 0; + while (index < toProcess.size() - 1){ + fp = fopen(toProcess[index].c_str(), "rb"); + if (fp == NULL) { + printf("Could not open file to send."); + continue; + } + sendFile(ip, port, fp, stoi(toProcess[index+1])); + fclose(fp); + index += 2; + } +} + + +void TcpSocket::putFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename){ + FILE *fp = fopen(localfilename.c_str(), "rb"); if (fp == NULL) { printf("Could not open file to send."); return; } fseek(fp, 0, SEEK_END); - size = ftell(fp); + int size = ftell(fp); fseek(fp, 0, SEEK_SET); - - // send bytes and filename first FileObject f(localfilename); - Messages msg(PUT, getFileMetadata(size, f.checksum, sdfsfilename, localfilename, remoteLocalfilename, overwrite)); - string payload = msg.toString(); - if (send(sockfd, payload.c_str(), strlen(payload.c_str()), 0) == -1) { - perror("send"); - } + Messages msg(PUT, getFileMetadata(size, f.checksum, sdfsfilename, localfilename, remoteLocalfilename)); + sendMessage(ip, port, msg.toString()); + sendFile(ip, port, fp, size); +} + +void TcpSocket::sendFile(string ip, string port, FILE * fp, int size) { sleep(1); + int numbytes, sockfd, sendSize; + if ((sockfd = createConnection(ip, port)) == -1) return; + char buf[DEFAULT_TCP_BLKSIZE]; + bzero(buf, sizeof(buf)); while (!feof(fp) && size > 0) { sendSize = (size < DEFAULT_TCP_BLKSIZE) ? size : DEFAULT_TCP_BLKSIZE; bzero(buf, sizeof(buf)); @@ -214,7 +264,8 @@ int TcpSocket::createConnection(string ip, string port){ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP){ char buf[DEFAULT_TCP_BLKSIZE]; - int numbytes = 0; + int numbytes = 0, filesize = 0, byteReceived = 0; + FILE *fp; Messages msg(payloadMessage); switch (msg.type) { case ELECTION: @@ -222,22 +273,45 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP qMessages.push(payloadMessage); break; } + case MERGE: { + vector<string> filesAndSizes = splitString(msg.payload, ","); + int index = 0; + int fail = 0; + while (index < filesAndSizes.size() - 1){ + string filename = "tmp-" + returnIP + "-" + filesAndSizes[index]; + filesize = stoi(filesAndSizes[index+1]); + numbytes = 0; + byteReceived = 0; + fp = fopen(filename.c_str(), "wb"); + bzero(buf, sizeof(buf)); + while ((numbytes=recv(sockfd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { + fwrite(buf, sizeof(char), numbytes, fp); + byteReceived += numbytes; + if (byteReceived >= filesize) { + break; + } + bzero(buf, sizeof(buf)); + } + if (byteReceived < filesize) fail = 1; + cout << "we have " << to_string(byteReceived) << " bytes from this connection" << endl; + fclose(fp); + index += 2; + } + if (fail) { Messages ack(MERGEFAIL, returnIP + "::"); regMessages.push(ack.toString()); } + else { Messages ack(MERGECOMPLETE, returnIP + "::"); regMessages.push(ack.toString()); } + break; + } case PUT: { - FILE *fp; - int filesize = 0, byteReceived = 0; - string mode = "wb"; - string sdfsfilename = "", incomingChecksum = "", remoteLocalname = "", overwriteFilename = "", prefix = "", overwrite = "", localfilename = ""; + string sdfsfilename = "", incomingChecksum = "", remoteLocalname = "", overwriteFilename = "", prefix = "", localfilename = ""; // format: size,checksum,sdfsfilename vector<string> fields = splitString(msg.payload, ","); int start = -1; - if (fields.size() == 6) { + if (fields.size() == 5) { filesize = stoi(fields[0]); incomingChecksum = fields[1]; sdfsfilename = fields[2]; remoteLocalname = fields[3]; overwriteFilename = fields[4]; - overwrite = fields[5]; - if ((stoi(overwrite)) == 0) mode = "ab"; cout << "[PUT] file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; time_t fileTimestamp; time(&fileTimestamp); @@ -247,7 +321,8 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP //cout << "it's GET with filename " << overwriteFilename << endl; } //cout << "backup filename " << localfilename << endl; - } else { + } + if (fields.size() == 6){ //size, exec, read, start, tmp, prefix filesize = stoi(fields[0]); localfilename = fields[4]; //tempfile to read from @@ -257,13 +332,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP prefix = fields[5]; cout << "[PUT] bytes: " << filesize << " exec: " << sdfsfilename << ", actual: " << remoteLocalname << ", start: " << to_string(start) << ", temp: " << localfilename << ", prefix: " << prefix << endl; } - fp = fopen(localfilename.c_str(), mode.c_str()); - if (fp == NULL) { - cout << "file error" << endl; - close(sockfd); - return 1; - } - + fp = fopen(localfilename.c_str(), "wb"); bzero(buf, sizeof(buf)); while ((numbytes=recv(sockfd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { fwrite(buf, sizeof(char), numbytes, fp); @@ -305,6 +374,9 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP case MAPLEACK: case CHUNK: case CHUNKACK: + case STARTMERGE: + case MERGECOMPLETE: + case MERGEFAIL: case DNS:{ //cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl; regMessages.push(payloadMessage); //handle from queue diff --git a/src/TestBench.cpp b/src/TestBench.cpp index 94d1255a78c06afcc3489e70aee6c41ace510a49..93cb1ce62257f6ebdade6eef951ac8e84c3ec644 100644 --- a/src/TestBench.cpp +++ b/src/TestBench.cpp @@ -118,7 +118,7 @@ void *runTcpClientTEST(void *tcpSocket) // testing to send file for (int i=0; i<2; i++) { sleep(1); - tcp->sendFile("127.0.0.1", TCPPORT, "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3", "1"); + tcp->putFile("127.0.0.1", TCPPORT, "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3"); } pthread_exit(NULL); } diff --git a/src/Threads.cpp b/src/Threads.cpp index dea6f26b6ba06c216faa8b08668e5591ef307031..091d3910c312673b57db69e420aada9b392b9211 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -24,7 +24,7 @@ void *runTcpSender(void *tcpSocket) while (!tcp->mapleMessages.empty()) { vector<string> msgSplit = splitString(tcp->mapleMessages.front(), "::"); string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2); - cout << "[TEST] " << removeSender << endl; + //cout << "[TEST] " << removeSender << endl; Messages msg(CHUNK, removeSender); //processor, exec, file, start, prefix tcp->sendMessage(msgSplit[0], TCPPORT, msg.toString()); @@ -32,9 +32,8 @@ void *runTcpSender(void *tcpSocket) } while (!tcp->pendSendMessages.empty()) { vector<string> msgSplit = splitString(tcp->pendSendMessages.front(), "::"); - if (msgSplit.size() >= 5) { - string nodeIP = msgSplit[0], localfilename = msgSplit[1], overwrite = msgSplit[4]; - string sdfsfilename = msgSplit[2], remoteLocalfilename = msgSplit[3]; + if (msgSplit.size() >= 4) { + string nodeIP = msgSplit[0], localfilename = msgSplit[1], sdfsfilename = msgSplit[2], remoteLocalfilename = msgSplit[3]; cout << "[DOSEND] nodeIP " << nodeIP << ", localfilename " << localfilename; cout << ", sdfsfilename " << sdfsfilename << ", remoteLocalfilename " << remoteLocalfilename << endl; if (msgSplit.size() == 6){ @@ -44,10 +43,15 @@ void *runTcpSender(void *tcpSocket) cout << "[CHUNK] sending : " << sdfsfilename << " from " << msgSplit[3] << " to " << msgSplit[5] << endl; tcp->sendLines(nodeIP, TCPPORT, localfilename, sdfsfilename, msgSplit[4], start, end); //exec, file, start, end } - else tcp->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename, overwrite); + else tcp->putFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); } tcp->pendSendMessages.pop(); } + while (!tcp->mergeMessages.empty()) { + vector<string> msgSplit = splitString(tcp->mergeMessages.front(), "::"); + tcp->putDirectory(msgSplit[0], msgSplit[1]); + tcp->mergeMessages.pop(); + } } pthread_exit(NULL); } diff --git a/src/Utils.cpp b/src/Utils.cpp index 2c06f2c00a5675dd5f12b51b9ef33eb1093fdc43..099b7b7bb1d5d28f4a84487c18699aeac61c7aa6 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -54,7 +54,7 @@ void handlePipe(int file) { size_t bufSize = 1024; cout << "[PIPE] sleeping for data. " << endl; sleep(5); - FILE *stream = fdopen(file, "r"); FILE *tmp; + FILE *stream = fdopen(file, "rb"); FILE *tmp; char str[bufSize]; const char * delim = ","; int lines = 0; diff --git a/src/main.cpp b/src/main.cpp index b11ba4a858dd0974b80c4f55e124ac17025aff91..87b41b7b51c27e1cab7821c6e7c577ac04dfd53f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -104,7 +104,7 @@ int main(int argc, char *argv[]) string line; ifstream myfile(localfilename.c_str()); while (getline(myfile, line)) ++number_of_lines; - Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename + "::" + to_string(size) + "::" + to_string(number_of_lines) + "::" + "::" + "1"); + Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename + "::" + to_string(size) + "::" + to_string(number_of_lines) + "::"); cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); } else {