diff --git a/Wc b/Wc index afde4ac0ff3f60057722451018ad852015c7ba90..a0d8b25729f6c87be617338e1b4f0ff042a8d4f6 100755 Binary files a/Wc and b/Wc differ diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index f7a19773f4490f9dc7a2775585d1f2675afd52a5..e3deb01d81add885c5a0e29b3d3972cc8b772323 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -1,35 +1,7 @@ #ifndef MESSAGESTYPES_H #define MESSAGESTYPES_H -const static char *messageTypes[] = -{ - "ACK", - "JOIN", - "LEADERHEARTBEAT", - "LEADERPENDING", - "HEARTBEAT", - "SWREQ", - "SWRESP", - "JOINRESPONSE", - "JOINREJECT", - "ELECTION", - "ELECTIONACK", - "PUT", - "PUTACK", - "LEADERACK", - "DNS", - "DNSANS", - "DNSGET", - "DELETE", - "GETNULL", - "REREPLICATE", - "REREPLICATEGET", - "MAPLESTART", - "JUICESTART", - "MAPLEACK", - "CHUNK", - "CHUNKACK", -}; +extern const char *messageTypes[]; enum MessageType { ACK, diff --git a/src/Logger.cpp b/src/Logger.cpp index 5ef29950776badb079c614a537bbe6c67b4019fe..88abd05a3fa0c1aaae8d335698e6d8d217bb0597 100644 --- a/src/Logger.cpp +++ b/src/Logger.cpp @@ -1,5 +1,35 @@ #include "../inc/Logger.h" +const char *messageTypes[] = +{ + "ACK", + "JOIN", + "LEADERHEARTBEAT", + "LEADERPENDING", + "HEARTBEAT", + "SWREQ", + "SWRESP", + "JOINRESPONSE", + "JOINREJECT", + "ELECTION", + "ELECTIONACK", + "PUT", + "PUTACK", + "LEADERACK", + "DNS", + "DNSANS", + "DNSGET", + "DELETE", + "GETNULL", + "REREPLICATE", + "REREPLICATEGET", + "MAPLESTART", + "JUICESTART", + "MAPLEACK", + "CHUNK", + "CHUNKACK", +}; + Logger::Logger() { filename = LOGFILE; } Logger::Logger(string fileName) { filename = fileName; } diff --git a/src/Node.cpp b/src/Node.cpp index 971d0ec0d0d18400d05ac9e22046af2f071a86f8..9f377a9bd03924270635e1afc33e109fe21e3910 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -855,16 +855,20 @@ void Node::handleTcpMessage() string s; for (auto &e: directory){ start = 0; - cout << "[MAPLE] file: " << get<0>(e) << " - " << to_string(get<1>(e)) << endl; - while (start < get<1>(e)){ - s = get<0>(e) + "::" + to_string(start); + string file = get<0>(e); + int lines = get<1>(e); + cout << "[MAPLE] file: " << file << " - " << to_string(lines) << endl; + while (start < lines){ + s = file + "::" + to_string(start); id = mapleRing->locateClosestNode(s); - vector<int> temp = randItems(1, fileList[get<0>(e)]); - mapleProcessing[mapleRing->getValue(id)].push_back(make_tuple(get<0>(e), to_string(start), mapleRing->getValue(temp[0]))); - cout << "[MAPLE] assign file " << get<0>(e) << " at " << to_string(start) << " to " << mapleRing->getValue(temp[0]) << endl; - mapleSending[mapleRing->getValue(temp[0])].push_back(make_tuple(get<0>(e), to_string(start))); - string maplemsg = mapleRing->getValue(id) + "::" +mapleExe + "::" + s + "::" + sdfsPre; - //IP, exec, file, start, prefix + vector<int> temp = randItems(1, fileList[file]); + string sender = hashRing->getValue(temp[0]); //because files are part of sdfs anyone can be the sender + string processor = mapleRing->getValue(id); //processor is a maple worker + mapleProcessing[processor].push_back(make_tuple(file, to_string(start), sender)); + cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl; + mapleSending[sender].push_back(make_tuple(file, to_string(start))); + string maplemsg = sender + "::" + processor + "::" +mapleExe + "::" + s + "::" + sdfsPre; + //sender, processor, exec, file, start, prefix tcpServent->mapleMessages.push(maplemsg); start = start + T_maples; } diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 37feaf55d4484966a275340dc54f71729af96bc6..14b379e58c3f2d3ec5f5f0861592eece76996940 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -146,8 +146,9 @@ void TcpSocket::sendLines(string ip, string port, string execfile, string readfi if (lineCounter >= end) break; numbytes += str.size(); } - file.clear(); // clear fail and eof bits - file.seekg(0, std::ios::beg); // back to the start! + file.clear(); // clear fail and eof bits + file.seekg(0); // back to the start! + lineCounter = -1; string toSend = to_string(numbytes) + "," + execfile + "," + readfile + "," + to_string(start) + "," + prefix+"-tmp"+to_string(start)+"-"+unDirectory[1]; Messages msg(PUT, toSend); cout << "[CHUNK] " << messageTypes[msg.type] << " | " << msg.toString() << endl; diff --git a/src/Threads.cpp b/src/Threads.cpp index bda88a928b02e68f886b2ff3f4953cd1c130ea7a..2689e1601b895bb1d71759de1edab9aad97631c7 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -23,7 +23,9 @@ void *runTcpSender(void *tcpSocket) while (1) { while (!tcp->mapleMessages.empty()) { vector<string> msgSplit = splitString(tcp->mapleMessages.front(), "::"); - Messages msg(CHUNK, tcp->mapleMessages.front()); + string removeSender = tcp->mapleMessages.front().substr(msgSplit[0].size() + 2); + cout << "[TEST] " << removeSender << endl; + Messages msg(CHUNK, removeSender); tcp->sendMessage(msgSplit[0], TCPPORT, msg.toString()); tcp->mapleMessages.pop(); }