diff --git a/Distributed_System_MP2_Report_G24.pdf b/Distributed_System_MP2_Report_G24.pdf new file mode 100644 index 0000000000000000000000000000000000000000..98ff47c896797c26594bf61584bde9cd170aba01 Binary files /dev/null and b/Distributed_System_MP2_Report_G24.pdf differ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..dce2206a4825fe9fda127ea71095e8d0875686ce --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +APP := Node +INC_DIR := ./inc +CFLAGS := -g -Wall -std=c++11 -I$(INC_DIR) +LIBS := -lpthread +SRC_FILES := src/Node.cpp src/Messages.cpp src/Member.cpp src/UdpSocket.cpp src/Threads.cpp src/RandomGenerator.cpp src/Logger.cpp src/TcpSocket.cpp src/main.cpp src/HashRing.cpp src/FileObject.cpp + +.PHONY: clean + +all: clean app + +app: + $(CXX) -o $(APP) $(SRC_FILES) $(CFLAGS) $(LIBS) + +clean: + $(RM) -f $(APP) *.o diff --git a/README.md b/README.md index e178f0a24613388010d10fd3f56eff28c0fe3438..7406b84625029d8f252c24600e9b72ae7fa94c53 100644 --- a/README.md +++ b/README.md @@ -1 +1,45 @@ -# MapReduce \ No newline at end of file +# MP2 Simple Distributed File System + +## Executing Instructions + * Building Node +``` +$ make all +``` + + * We are running All-to-All in MP2 (Note: Gossip-style is not tested) +``` +$ ./Node 0 +``` + + * Running time commands +``` +$ [join] join to a group via fixed introducer +$ [leave] leave the group +$ [id] print id (IP/PORT) +$ [member] print all membership list +$ [switch] switch to other mode (All-to-All to Gossip, and vice versa) +$ [mode] show in 0/1 [All-to-All/Gossip] modes +$ [exit] terminate process +$ === New since MP2 === +$ [put] localfilename sdfsfilename +$ [get] sdfsfilename localfilename +$ [delete] sdfsfilename +$ [ls] list all machine (VM) addresses where this file is currently being stored +$ [lsall] list all sdfsfilenames with positions +$ [store] list all files currently being stored at this machine +``` + + * Create files +``` +$ dd if=/dev/urandom of=test_file bs=2097152 count=2 +$ dd if=/dev/urandom of=test_file_07 bs=1000000 count=7 +``` + + + * All logs are in `logs.txt` under the mp2 folder + +## Acknowledgement + * [Beej's guide](http://beej.us/guide/bgnet/html/multi/index.html) + * [Multiple Threads](https://www.tutorialspoint.com/cplusplus/cpp_multithreading.htm) + * [String parser](https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c) + * [Wikicorpus](https://www.cs.upc.edu/~nlp/wikicorpus/) \ No newline at end of file diff --git a/inc/FileObject.h b/inc/FileObject.h new file mode 100644 index 0000000000000000000000000000000000000000..7bbbdd4af87c166a97cbbada8da4a016f25789c5 --- /dev/null +++ b/inc/FileObject.h @@ -0,0 +1,25 @@ +#ifndef FILEOBJECT_H +#define FILEOBJECT_H + +#include <iostream> +#include <string> +#include <stdio.h> +#include <stdlib.h> +#include <fstream> + +#include "HashRing.h" + +using namespace std; + +class FileObject { +public: + string fileName; + string checksum; + int positionOnHashring; + FileObject(string fileName); + string toString(); + string getChecksum(); + int getPositionOnHashring(); +}; + +#endif //FILEOBJECT_H \ No newline at end of file diff --git a/inc/HashRing.h b/inc/HashRing.h new file mode 100644 index 0000000000000000000000000000000000000000..7e0ff968b8d1c317dd0817a9a9ef3258742802f1 --- /dev/null +++ b/inc/HashRing.h @@ -0,0 +1,39 @@ +#ifndef HASHRING_H +#define HASHRING_H + +#include <iostream> +#include <string> +#include <stdio.h> +#include <stdlib.h> +#include <map> +#include <vector> +#include <algorithm> + +using namespace std; + +#define HASHMODULO 360 + +class HashRing { +public: + vector<int> nodePositions; //allow for quick finding of node positions on the ring. + map<int, string> ring; + map<int, int> fileToClosestNode; // match up each file to its closest node -> may not need to use this, since we have the locateClosestNode function which maps file position + //to node position on hash ring. + + HashRing(); + vector<string> splitString(string s, string delimiter); + int locateClosestNode(string filename); + string getValue(int key); + int addNode(string nodeInfo, int position); // Here we may have to change where other files point to + int addFile(string fileInfo, int position); + int removeNode(int position); // Here we will have to change where all files that point to this node point to + int removeFile(int position); + int getSuccessor(int nodePosition); //Get position of successor node + int getPredecessor(int nodePosition); //Get position of predecessor node + void debugHashRing(); + void clear(); + int getRandomNode(tuple<int, int, int> excludedNodes); + +}; + +#endif //HASHRING_H \ No newline at end of file diff --git a/inc/Introducer.h b/inc/Introducer.h new file mode 100644 index 0000000000000000000000000000000000000000..06ce118c52d6cfdf8553d4d3efca09d5ea7afe35 --- /dev/null +++ b/inc/Introducer.h @@ -0,0 +1,30 @@ +#ifndef INTRODUCER_H +#define INTRODUCER_H + +#include <iostream> +#include <string> +#include <vector> +#include <map> +#include "Messages.h" +#include "Member.h" + +using namespace std; + +#define T_timeout 5 +#define T_cleanup 5 + +#define INTRODUCER "172.22.94.78" +#define PORT "6000" + +class Introducer { +public: + map<pair<string, string>, pair<int, int>> membershipList; + Member nodeInformation; + + Introducer(); + int heartbeatToNode(Member destination); + int joinSystem(Member introdcuer); + int listenForHeartbeat(); +}; + +#endif //INTRODUCER_H \ No newline at end of file diff --git a/inc/Logger.h b/inc/Logger.h new file mode 100644 index 0000000000000000000000000000000000000000..e6d189a5c487a57528886d5f5bfa068bb1625f85 --- /dev/null +++ b/inc/Logger.h @@ -0,0 +1,27 @@ +#ifndef LOGGER_H +#define LOGGER_H + +#include <iostream> +#include <string> +#include <fstream> +#include <stdio.h> + +#include "MessageTypes.h" + +#define LOGFILE "logs.txt" + +using namespace std; + +class Logger{ +public: + string filename; + //ofstream loggingFile; + Logger(); + Logger(string fileName); + int printTheLog(LogType type, string s); + //int closeFile(); +private: + int writeToFile(string messages); +}; + +#endif //LOGGER_H \ No newline at end of file diff --git a/inc/Member.h b/inc/Member.h new file mode 100644 index 0000000000000000000000000000000000000000..41984b6c8cb9cd76d1edb922a1931b3e42fb4b11 --- /dev/null +++ b/inc/Member.h @@ -0,0 +1,24 @@ +#ifndef MEMBER_H +#define MEMBER_H + +#include <iostream> +#include <string> + +using namespace std; + +class Member { +public: + string ip; + string port; + int timestamp; + int heartbeatCounter; + int failed_flag; + Member(); + Member(string nodeIp, string nodePort, int nodeTimestamp, int heartbeatCounter); + Member(string nodeIp, string nodePort); + Member(string nodeIp, string nodePort, int nodeTimestamp); + string toString(); +}; + + +#endif //MEMBER_H \ No newline at end of file diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h new file mode 100644 index 0000000000000000000000000000000000000000..4410644dbb05a8070fc379125f16cdccab3975d4 --- /dev/null +++ b/inc/MessageTypes.h @@ -0,0 +1,44 @@ +#ifndef MESSAGESTYPES_H +#define MESSAGESTYPES_H + +enum MessageType { + ACK, + JOIN, + LEADERHEARTBEAT, + LEADERPENDING, + HEARTBEAT, + SWREQ, + SWRESP, + JOINRESPONSE, + JOINREJECT, + ELECTION, + ELECTIONACK, + PUT, + PUTACK, + LEADERACK, + DNS, + DNSANS, + DNSGET, + DELETE, + GETNULL, + REREPLICATE, + REREPLICATEGET}; + +enum PayloadType { + REGULAR=97, + FILEPAIR, + FILENAME, + FILEPOSITIONS}; + +enum LogType { + JOINGROUP, + UPDATE, + FAIL, + LEAVE, + REMOVE, + GOSSIPTO, + GOSSIPFROM, + BANDWIDTH, + MEMBERS}; + +#endif //MESSAGESTYPES_H \ No newline at end of file diff --git a/inc/Messages.h b/inc/Messages.h new file mode 100644 index 0000000000000000000000000000000000000000..ee1ddb6866e4bf0484eff95b4320bcb7b12c2d26 --- /dev/null +++ b/inc/Messages.h @@ -0,0 +1,20 @@ +#ifndef MESSAGES_H +#define MESSAGES_H + +#include <iostream> +#include <string> +#include "MessageTypes.h" + +using namespace std; + +class Messages { +public: + MessageType type; + string payload; + + Messages(string payloadMessage); + Messages(MessageType messageType, string payloadMessage); + string toString(); +}; + +#endif //MESSAGES_H \ No newline at end of file diff --git a/inc/Modes.h b/inc/Modes.h new file mode 100644 index 0000000000000000000000000000000000000000..e45037eaed1497dca2d0132177fe2e31a32dc217 --- /dev/null +++ b/inc/Modes.h @@ -0,0 +1,6 @@ +#ifndef MODES_H +#define MODES_H + +enum ModeType {ALL2ALL, GOSSIP}; + +#endif //MODES_H \ No newline at end of file diff --git a/inc/Node.h b/inc/Node.h new file mode 100644 index 0000000000000000000000000000000000000000..63931fa406df219085dd8fc948dd564b9fec3b8e --- /dev/null +++ b/inc/Node.h @@ -0,0 +1,136 @@ +#ifndef NODE_H +#define NODE_H + +#include <iostream> +#include <string> +#include <vector> +#include <map> +#include <pthread.h> +#include <time.h> +#include <signal.h> + +#include "Messages.h" +#include "Modes.h" +#include "Member.h" +#include "UdpSocket.h" +#include "TcpSocket.h" +#include "Logger.h" +#include "HashRing.h" + +using namespace std; + +#define INTRODUCER "172.22.94.78" // VM1 +//#define INTRODUCER "172.22.158.81" // VM9 +#define PORT "6000" + +//#define LOG_VERBOSE 1 + +#define LOGGING_FILE_NAME "logs.txt" + +// --- parameters (stay tuned) --- +#define T_period 300000 // in microseconds +#define T_timeout 15 // in T_period +#define T_cleanup 15 // in T_period +#define N_b 5 // how many nodes GOSSIP want to use + +#define T_election 15 // in T_period +// ------ + +#define T_switch 3 // in seconds + +void *runUdpServer(void *udpSocket); +void *runTcpServer(void *tcpSocket); +void *runTcpSender(void *tcpSocket); +void *runSenderThread(void *node); + +class Node { +public: + // (ip_addr, port_num, timestamp at insertion) -> (hb_count, timestamp, fail_flag) + map<tuple<string, string, string>, tuple<int, int, int>> membershipList; + Member nodeInformation; + UdpSocket *udpServent; + TcpSocket *tcpServent; + int localTimestamp; + int heartbeatCounter; + time_t startTimestamp; + string joinTimestamp; + // unsigned long byteSent; + // unsigned long byteReceived; + + + ModeType runningMode; + Logger* logWriter; + bool activeRunning; + bool prepareToSwitch; + + bool isLeader; + bool isBlackout; + int leaderPosition; // -1 for no leader + int hashRingPosition; + int proposedTime; + int electedTime; + string possibleSuccessorIP; + string leaderIP; + string leaderPort; + HashRing *hashRing; + + map<string, vector<int>> fileList; //vector of node positions on a hashring for a given file where we can find that file stored + map<string, string> localFilelist; // Map sdfsfilename -> localfilename + map<string, tuple<int, int, int>> pendingRequests; + map<string, tuple<string, string, string>> pendingSenderRequests; + map<string, tuple<bool, bool, bool>> pendingRequestSent; + + + Node(); + Node(ModeType mode); + int getPositionOnHashring(); + int heartbeatToNode(); + int joinSystem(Member introdcuer); + int listenToHeartbeats(); + int failureDetection(); + void updateNodeHeartbeatAndTime(); + void computeAndPrintBW(double diff); + int requestSwitchingMode(); + int SwitchMyMode(); + void debugMembershipList(); + void startActive(); + // Added below Since MP2 + bool checkHashNodeCollision(int checkPosition); + bool checkLeaderExist(); + bool findWillBeLeader(); + void proposeToBeLeader(); + void processElection(Messages messages); + void processTcpMessages(); + void processRegMessages(); + void restartElection(); + void setUpLeader(string message, bool pending); + string encapsulateFileList(); + void decapsulateFileList(string payload); + string encapsulateMessage(map<PayloadType,string> payloads); + string decapsulateMessage(string payload); + bool isInVector(vector<int> v, int i); + int updateHashRing(); + // leader related functions here + void leaderCreateHashRing(); + int putFileSender(string filename, string sdfsfilename); + int putFileMaster(string sdfsfilename); + int putFileReeiver(string sdfsfilename); + void listLocalFiles(); + void findNodesWithFile(string sdfsfilename); + void debugSDFSFileList(); + void listSDFSFileList(string sdfsfilename); + string populateSDFSFileList(MessageType type, string mem_list_to_send); + void updateFileList(string sdfsfilename, int nodePosition); + void checkFileListConsistency(); + +private: + string populateMembershipMessage(); + string populateIntroducerMembershipMessage(); + void readMessage(string message); + void processHeartbeat(string message); + vector<string> splitString(string s, string delimiter); + vector<tuple<string,string, string>> getRandomNodesToGossipTo(); + int hashingId(Member nodeMember, string joinTime); +}; + +#endif //NODE_H \ No newline at end of file diff --git a/inc/TcpSocket.h b/inc/TcpSocket.h new file mode 100644 index 0000000000000000000000000000000000000000..958e9f6c64d5cbbc7cdc3fcf483b8d2339bab543 --- /dev/null +++ b/inc/TcpSocket.h @@ -0,0 +1,48 @@ +#ifndef TCPSOCKET_H +#define TCPSOCKET_H + +#include <iostream> +#include <string> +#include <queue> + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <sys/wait.h> +#include <signal.h> + +// #include "Messages.h" +// #include "UdpSocket.h" + +using namespace std; + +#define DEFAULT_TCP_BLKSIZE (128 * 1024) +#define BACKLOG 10 + +#define TCPPORT "4950" + +// void *runTcpServer(void *tcpSocket); +// void *runTcpClient(void *tcpSocket); + +class TcpSocket { +public: + queue<string> qMessages; + queue<string> regMessages; + queue<string> pendSendMessages; + + void bindServer(string port); + void sendFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename); + void sendMessage(string ip, string port, string message); + TcpSocket(); +private: + string getFileMetadata(int size, string checksum, string sdfsfilename, string localfilename, string remoteLocalfilename); + vector<string> splitString(string s, string delimiter); +}; +#endif //TCPSOCKET_H \ No newline at end of file diff --git a/inc/UdpSocket.h b/inc/UdpSocket.h new file mode 100644 index 0000000000000000000000000000000000000000..17402e07257bdc14e19ceca4c7d0bb1db274a2ce --- /dev/null +++ b/inc/UdpSocket.h @@ -0,0 +1,37 @@ +#ifndef UDPSOCKET_H +#define UDPSOCKET_H + +#include <iostream> +#include <string> +#include <queue> + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + +using namespace std; + +#define MAXBUFLEN (128 * 1024) +#define LOSS_RATE 0 + +void *get_in_addr(struct sockaddr *sa); + +class UdpSocket { +public: + unsigned long byteSent; + unsigned long byteReceived; + queue<string> qMessages; + + void bindServer(string port); + void sendMessage(string ip, string port, string message); + UdpSocket(); + +}; +#endif //UDPSOCKET_H \ No newline at end of file diff --git a/src/FileObject.cpp b/src/FileObject.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1097041850cde279bbcabcafe1e19615a520c283 --- /dev/null +++ b/src/FileObject.cpp @@ -0,0 +1,24 @@ +#include "../inc/FileObject.h" + +FileObject::FileObject(string fileName){ + this->fileName = fileName; + this->checksum = getChecksum(); +} + +string FileObject::getChecksum(){ + ifstream fileStream(fileName); + string fileContent((istreambuf_iterator<char>(fileStream)), + (istreambuf_iterator<char>())); + size_t hashResult = hash<string>{}(fileContent); + return to_string(hashResult); +} + +string FileObject::toString(){ + return fileName + "::" + checksum; +} + +int FileObject::getPositionOnHashring(){ + string toBeHashed = "FILE::" + fileName; + positionOnHashring = hash<string>{}(toBeHashed) % HASHMODULO; + return 0; +} diff --git a/src/HashRing.cpp b/src/HashRing.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ea42b6d62bc75286169d717923a0e5b23282bf30 --- /dev/null +++ b/src/HashRing.cpp @@ -0,0 +1,180 @@ +#include "../inc/HashRing.h" + + +HashRing::HashRing(){ + +} + +string HashRing::getValue(int key){ + if(key == -1){ + return "No node found"; + } + return ring[key]; +} + +void HashRing::clear() +{ + ring.clear(); + nodePositions.clear(); +} + +void HashRing::debugHashRing() +{ + cout << "Current Ring: " << endl; + for (auto& element: ring) { + int position = element.first; + string object = element.second; + cout << object << " at " << position << endl; + } +} + +vector<string> HashRing::splitString(string s, string delimiter){ + vector<string> result; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + result.push_back (token); + } + + result.push_back (s.substr (pos_start)); + return result; +} + +int HashRing::addFile(string fileInfo, int position){ + ring[position] = fileInfo; + fileToClosestNode[position] = locateClosestNode(fileInfo); + //TODO: deal with hash collisions? + return 0; +} + +int HashRing::addNode(string nodeInfo, int position){ + ring[position] = nodeInfo; + nodePositions.push_back(position); //Add node position on hash ring to list of node positions + sort(nodePositions.begin(), nodePositions.end()); + //int insertPosition = -1; + //for(uint i = 0; i < nodePositions.size(); i++){ + // if(nodePositions[i] == position){ + // insertPosition = i; + // break; + // } + //} + /* commenting out this code since we are making hash ring node only + //Case where the newly inserted node is not at beginning of list + if(insertPosition != 0){ + for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it++){ + if(it -> first <= nodePositions[insertPosition] && it -> first > nodePositions[insertPosition - 1]){ + it -> second = locateClosestNode(it -> first); + } + } + } + //If newly inserted not is at beginning of list, just need to look at files with indexes either less than that node's index, + //or at files with index greater than the highest node index + else{ + for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it++){ + if(it -> first <= nodePositions[insertPosition] || it -> first > nodePositions[nodePositions.size() -1]){ + it -> second = locateClosestNode(it -> first); + } + } + } + */ + return 0; + //TODO: deal with hash collisions? +} + +int HashRing::removeFile(int position){ + ring.erase(position); + fileToClosestNode.erase(position); + return 0; + +} + +int HashRing::removeNode(int position){ + ring.erase(position); + for(uint i = 0; i < nodePositions.size(); i++){ + if(nodePositions[i] == position){ + nodePositions.erase(nodePositions.begin() + i); + } + } + //find all files that point to this node, and tell them to now point to their next closest node instead. + /* comment out this code since we are making hash ring node only at this point. + for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it ++){ + if(it ->second == position){ + it ->second = locateClosestNode(it ->first); + } + }*/ + + return 0; +} + +int HashRing::locateClosestNode(string filename){ + int filePosition; + //Hash filename to get file position on hash ring, then use that to find closest node to it + filePosition = hash<string>{}(filename) % HASHMODULO; + for(int i : nodePositions){ + if(i >= filePosition){ + return i; + } + } + //If we cannnot find a Node at a entry on the hash circle greater than or equal to our file position, we wrap back around and take + // the first node's position as where that file should go. + return nodePositions[0]; +} + +int HashRing::getPredecessor(int nodePosition){ + if(nodePositions.size() == 1){ + return nodePosition; + } + unsigned int indexOfNode = -1; + for(uint i = 0; i < nodePositions.size(); i++){ + if(nodePositions[i] == nodePosition){ + indexOfNode = i; + break; + } + } + //If indexOfNode = 0, get the last entry in Node position as predecessor + if(indexOfNode == 0){ + return nodePositions[nodePositions.size() - 1]; + } + return nodePositions[indexOfNode - 1]; +} + +int HashRing::getSuccessor(int nodePosition){ + if(nodePositions.size() == 1){ + return nodePosition; + } + unsigned int indexOfNode = -1; + for(uint i = 0; i < nodePositions.size(); i++){ + if(nodePositions[i] == nodePosition){ + indexOfNode = i; + break; + } + } + //If indexOfNode = size of nodePosition - 1, get the first entry in Node position as successor + if(indexOfNode == nodePositions.size() - 1){ + return nodePositions[0]; + } + return nodePositions[indexOfNode + 1]; +} + + +//exclude the three positions in the tuple, +//return node position in hash ring of a random node that is not one of the +//three nodes in this tuple. +int HashRing::getRandomNode(tuple<int, int, int> excludedNodes){ + if(nodePositions.size() >= 4){ + //get random node + // return that random node + vector<int> indicesToPickFrom; + for(unsigned int i = 0; i < nodePositions.size(); i++){ + if(nodePositions[i] != get<0>(excludedNodes) && nodePositions[i] != get<1>(excludedNodes) && nodePositions[i] != get<2>(excludedNodes)){ + indicesToPickFrom.push_back(i); + } + } + int randomSelection = rand() % indicesToPickFrom.size(); + return nodePositions[indicesToPickFrom[randomSelection]]; + } + return -1; +} \ No newline at end of file diff --git a/src/Introducer.cpp b/src/Introducer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1e948d39ec66e5b1ec8a9d1d6f76e51291db2e7e --- /dev/null +++ b/src/Introducer.cpp @@ -0,0 +1,38 @@ +#include "../inc/Introducer.h" +#include <ctime> +#include <chrono> +Introducer::Introducer() +{ + +} +/** + * + * HeartbeatToNode: Sends a string version of the membership list to the receiving node. The receiving node will convert the string to + * a <string, long> map where the key is the Addresss (IP + PORT) and value is the heartbeat counter. We then compare the Member. + * + **/ +int Introducer::heartbeatToNode(Member destination){ + //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter + string mem_list_to_send = ""; + //Case 1 need to add destination to membership list + + + //Case 2: destination already exists in the membership list of introducer, just a normal heartbeat + for(auto& element: this->membershipList){ + pair<string, string> keyPair = element.first; + pair<int, int> valuePair = element.second; + mem_list_to_send += keyPair.first + "," + keyPair.second + ","; + mem_list_to_send += to_string(valuePair.first) + "\n"; + } + //Now we have messages ready to send, need to invoke UDP client to send + return 0; +} + +int Introducer::joinSystem(Member introducer){ + Messages join(JOIN, "IP: " + this->nodeInformation.ip + " PortNum: " + this->nodeInformation.port); + return 0; +} + +int Introducer::listenForHeartbeat(){ + +} diff --git a/src/Logger.cpp b/src/Logger.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cc24aada4b1d7f65ddcf1945bda5e5342f48df59 --- /dev/null +++ b/src/Logger.cpp @@ -0,0 +1,83 @@ +#include "../inc/Logger.h" + +Logger::Logger() +{ + filename = LOGFILE; + //loggingFile.open(filename); +} + +Logger::Logger(string fileName) +{ + filename = fileName; + //loggingFile.open(filename); +} + +int Logger::writeToFile(string messages) +{ + FILE *fp; + fp = fopen(filename.c_str(), "a"); + fprintf(fp, "%s", messages.c_str()); + fclose(fp); + return 0; +} + +int Logger::printTheLog(LogType type, string s) +{ + switch (type) { + case JOINGROUP: { + string messages = "[JOIN]"+s+"\n"; + writeToFile(messages); + break; + } + case UPDATE: { + string messages = "[UPDATE]"+s+"\n"; + writeToFile(messages); + break; + } + case FAIL: { + string messages = "[FAIL]"+s+"\n"; + writeToFile(messages); + break; + } + case REMOVE: { + string messages = "[REMOVE]"+s+"\n"; + writeToFile(messages); + break; + } + case LEAVE: { + string messages = "[LEAVE]"+s+"\n"; + writeToFile(messages); + break; + } + case GOSSIPTO: { + string messages = "[SENT]"+s+"\n"; + writeToFile(messages); + break; + } + case GOSSIPFROM: { + string messages = "[RECEIVED]"+s+"\n"; + writeToFile(messages); + break; + } + case BANDWIDTH: { + string messages = "[BANDWIDTH]"+s+"\n"; + writeToFile(messages); + break; + } + case MEMBERS: { + string messages = "[MembershipList] "+s+"\n"; + writeToFile(messages); + break; + } + default: + break; + } + //loggingFile << s; + return 0; +} + +/*int Logger::closeFile() +{ + loggingFile.close(); + return 0; +}*/ \ No newline at end of file diff --git a/src/Member.cpp b/src/Member.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a4177a0f2bb25640458a61b12d2d968a47e36550 --- /dev/null +++ b/src/Member.cpp @@ -0,0 +1,42 @@ +#include "../inc/Member.h" + +Member::Member(string nodeIp, string nodePort, int nodeTimestamp, int nodeHeartbeatCounter) +{ + ip = nodeIp; + port = nodePort; + failed_flag = 0; + timestamp = nodeTimestamp; + heartbeatCounter = nodeHeartbeatCounter; +} + +Member::Member(string nodeIp, string nodePort) +{ + ip = nodeIp; + port = nodePort; + timestamp = 0; + failed_flag = 0; + heartbeatCounter = 0; +} + +Member::Member(string nodeIp, string nodePort, int nodeTimestamp) +{ + ip = nodeIp; + port = nodePort; + timestamp = nodeTimestamp; + failed_flag = 0; + heartbeatCounter = 0; +} + +string Member::toString() +{ + string message = ip + "::" + port + "::" + to_string(timestamp); + return message; +} + +Member::Member(){ + ip = ""; + port = ""; + timestamp = 0; + failed_flag = 0; + heartbeatCounter = 0; +} \ No newline at end of file diff --git a/src/Messages.cpp b/src/Messages.cpp new file mode 100644 index 0000000000000000000000000000000000000000..360efad0c3f2d8043a8baf4d870217419b64a596 --- /dev/null +++ b/src/Messages.cpp @@ -0,0 +1,41 @@ +#include "../inc/Messages.h" + +Messages::Messages(string payloadMessage) +{ + string delimiter = "::"; + size_t pos = 0; + int line = 0; + string token; + while ((pos = payloadMessage.find(delimiter)) != string::npos) { + token = payloadMessage.substr(0, pos); +//#ifdef LOG_VERBOSE +// cout << line << "/" << token << endl; +//#endif + switch (line) { + case 0: { + type = static_cast<MessageType>(stoi(token)); + break; + } + default: + break; + } + payloadMessage.erase(0, pos + delimiter.length()); + line++; + } +//#ifdef LOG_VERBOSE +// cout << payloadMessage << endl; +//#endif + payload = payloadMessage; +} + +Messages::Messages(MessageType messageType, string payloadMessage) +{ + type = messageType; + payload = payloadMessage; +} + +string Messages::toString() +{ + string message = to_string(type) + "::" + payload; + return message; +} diff --git a/src/Node.cpp b/src/Node.cpp new file mode 100644 index 0000000000000000000000000000000000000000..33423c88b3879a55b7e8e8980c0ddc970c82f162 --- /dev/null +++ b/src/Node.cpp @@ -0,0 +1,1557 @@ +#include "../inc/Node.h" + + +//add another function to Node class for failure detection +//call function before sender (heartbeat) after listenForHeartbeat + +Node::Node() +{ + // create a udp object + udpServent = new UdpSocket(); + tcpServent = new TcpSocket(); + hashRing = new HashRing(); + localTimestamp = 0; + heartbeatCounter = 0; + //time(&startTimestamp); + // byteSent = 0; + // byteReceived = 0; + runningMode = ALL2ALL; + activeRunning = false; + prepareToSwitch = false; + logWriter = new Logger(LOGGING_FILE_NAME); + leaderPosition = -1; + proposedTime = 0; + electedTime = 0; + joinTimestamp = ""; + possibleSuccessorIP = ""; + leaderIP = ""; + leaderPort = ""; + isBlackout = true; +} + +Node::Node(ModeType mode) +{ + // create a udp object + udpServent = new UdpSocket(); + tcpServent = new TcpSocket(); + hashRing = new HashRing(); + localTimestamp = 0; + heartbeatCounter = 0; + //time(&startTimestamp); + // byteSent = 0; + // byteReceived = 0; + runningMode = mode; + activeRunning = false; + prepareToSwitch = false; + logWriter = new Logger(LOGGING_FILE_NAME); + leaderPosition = -1; + proposedTime = 0; + electedTime = 0; + joinTimestamp = ""; + possibleSuccessorIP = ""; + leaderIP = ""; + leaderPort = ""; + isBlackout = true; +} + +void Node::startActive() +{ + membershipList.clear(); + restartElection(); + // inserting its own into the list + time(&startTimestamp); + string startTime = ctime(&startTimestamp); + startTime = startTime.substr(0, startTime.find("\n")); + tuple<string,string,string> mapKey(nodeInformation.ip, nodeInformation.port, startTime); + tuple<int, int, int> valueTuple(nodeInformation.heartbeatCounter, nodeInformation.timestamp, 0); + membershipList[mapKey] = valueTuple; + + debugMembershipList(); + joinTimestamp = startTime; // for hashRing + getPositionOnHashring(); // update its hashRingPosition +} + +void Node::computeAndPrintBW(double diff) +{ +#ifdef LOG_VERBOSE + cout << "total " << udpServent->byteSent << " bytes sent" << endl; + cout << "total " << udpServent->byteReceived << " bytes received" << endl; + printf("elasped time is %.2f s\n", diff); +#endif + if (diff > 0) { + double bandwidth = udpServent->byteSent/diff; + string message = "["+to_string(this->localTimestamp)+"] B/W usage: "+to_string(bandwidth)+" bytes/s"; +#ifdef LOG_VERBOSE + printf("%s\n", message.c_str()); +#endif + this->logWriter->printTheLog(BANDWIDTH, message); + } +} + +void Node::updateNodeHeartbeatAndTime() +{ + string startTime = ctime(&startTimestamp); + startTime = startTime.substr(0, startTime.find("\n")); + tuple<string, string, string> keyTuple(nodeInformation.ip, nodeInformation.port,startTime); + tuple<int, int, int> valueTuple(heartbeatCounter, localTimestamp, 0); + this->membershipList[keyTuple] = valueTuple; +} + +string Node::populateMembershipMessage() +{ + //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag + string mem_list_to_send = ""; + //Assume destination already exists in the membership list of this node, just a normal heartbeat + switch (this->runningMode) + { + case GOSSIP: + for (auto& element: this->membershipList) { + tuple<string, string, string> keyTuple = element.first; + tuple<int, int, int> valueTuple = element.second; + mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; + mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; + } + break; + + default: + string startTime = ctime(&startTimestamp); + startTime = startTime.substr(0, startTime.find("\n")); + mem_list_to_send += nodeInformation.ip + "," + nodeInformation.port + "," + startTime + ","; + mem_list_to_send += to_string(heartbeatCounter) + "," + to_string(0) + "\n"; + break; + } + return mem_list_to_send; +} + +string Node::populateIntroducerMembershipMessage(){ + string mem_list_to_send = ""; + for (auto& element: this->membershipList) { + tuple<string, string, string> keyTuple = element.first; + tuple<int, int, int> valueTuple = element.second; + mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; + mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; + } + return mem_list_to_send; +} + +/** + * + * HeartbeatToNode: Sends a string version of the membership list to the receiving node. The receiving node will convert the string to + * a <string, long> map where the key is the Addresss (IP + PORT) and value is the heartbeat counter. We then compare the Member. + * + **/ +int Node::heartbeatToNode() +{ + // 3. prepare to send heartbeating, and + string mem_list_to_send = populateMembershipMessage(); + vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); + + //Now we have messages ready to send, need to invoke UDP client to send +#ifdef LOG_VERBOSE + cout << "pick " << targetNodes.size() << " of " << this->membershipList.size()-1; + cout << " members" << endl; +#endif + + // 4. do gossiping + for (uint i=0; i<targetNodes.size(); i++) { + //cout << targetNodes[i].first << "/" << targetNodes[i].second << endl; + Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); + + string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); +#ifdef LOG_VERBOSE + cout << "[Gossip]" << message.c_str() << endl; +#endif + this->logWriter->printTheLog(GOSSIPTO, message); + + //cout << mem_list_to_send.size() << " Bytes sent..." << endl; + // byteSent += mem_list_to_send.size(); + if (isLeader) { + //Messages msg(LEADERHEARTBEAT, mem_list_to_send); + if (isBlackout) { + string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } else { + string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } + } else { + //Messages msg(HEARTBEAT, mem_list_to_send); + string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } + } + + return 0; +} + +int Node::failureDetection(){ + //1. check local membership list for any timestamps whose curr_time - timestamp > T_fail + //2. If yes, mark node as local failure, update fail flag to 1 and update timestamp to current time + //3. for already failed nodes, check to see if curr_time - time stamp > T_cleanup + //4. If yes, remove node from membership list + vector<tuple<string,string,string>> removedVec; + for(auto& element: this->membershipList){ + tuple<string,string,string> keyTuple = element.first; + tuple<int, int, int> valueTuple = element.second; +#ifdef LOG_VERBOSE + cout << "checking " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; +#endif + if ((get<0>(keyTuple).compare(nodeInformation.ip) == 0) && (get<1>(keyTuple).compare(nodeInformation.port) == 0)) { + // do not check itself +#ifdef LOG_VERBOSE + cout << "skip it" << endl; +#endif + continue; + } + if(get<2>(valueTuple) == 0){ + if(localTimestamp - get<1>(valueTuple) > T_timeout){ + //cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; + //cout << "local time " << localTimestamp << " vs. " << get<1>(valueTuple) << endl; + get<1>(this->membershipList[keyTuple]) = localTimestamp; + get<2>(this->membershipList[keyTuple]) = 1; + + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple)+": Local Failure"; + cout << "[FAIL]" << message.c_str() << endl; + this->logWriter->printTheLog(FAIL, message); + if(isLeader){ + // clearn up fileList + Member deletedNode(get<0>(keyTuple), get<1>(keyTuple)); + int deletedNodePostion = hashingId(deletedNode, get<2>(keyTuple)); + hashRing->removeNode(deletedNodePostion); + for (auto& element: fileList) { + vector<int> newEntry; + for(unsigned int i = 0; i < element.second.size(); i++){ + if(element.second[i] != deletedNodePostion){ + newEntry.push_back(element.second[i]); + } + } + fileList[element.first] = newEntry; + } + + // chech if the failure is the sender in pending requests + for (auto& senders: pendingSenderRequests) { + string sdfsfilename = senders.first; + tuple<string,string,string> sender = senders.second; + if ((get<0>(keyTuple).compare(get<0>(sender))==0) && + get<0>(pendingRequestSent[sdfsfilename]) && + (get<0>(pendingRequests[sdfsfilename])!=-1)) { + // it sent out, and is not finished, cannot help + // we lost the data from the client + cout << "[PUT] client itself fails, we cannot help, remove request" << endl; + isBlackout = false; + pendingRequests.erase(sdfsfilename); + pendingRequestSent.erase(sdfsfilename); + continue; + } + if ((get<0>(keyTuple).compare(get<1>(sender))==0) && + get<1>(pendingRequestSent[sdfsfilename]) && + (get<1>(pendingRequests[sdfsfilename])!=-1)) { + // the sender fails during 2nd pass + // replace the sent + cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; + if (get<2>(pendingRequests[sdfsfilename])!=-1) { + tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename])); + } else { + pendingRequests.erase(sdfsfilename); + } + continue; + } + if ((get<0>(keyTuple).compare(get<2>(sender))==0) && + get<2>(pendingRequestSent[sdfsfilename]) && + (get<2>(pendingRequests[sdfsfilename])!=-1)) { + // it sent out, but replicates are failed + // restart again + cout << "[PUT/REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; + pendingRequests.erase(sdfsfilename); + } + } + + } + + } + } + else{ + if(localTimestamp - get<1>(valueTuple) > T_cleanup){ + // core dumped happened here; bug fix + auto iter = this->membershipList.find(keyTuple); + if (iter != this->membershipList.end()) { + //cout << "Got " << get<0>(iter->first) << "/" << get<1>(iter->first) << "/" << get<2>(iter->first); + //cout << " with " << to_string(get<0>(iter->second)) << "/"; + //cout << to_string(get<1>(iter->second)) << "/"; + //cout << to_string(get<2>(iter->second)) << endl; + //cout << this->membershipList[keyTuple] + //this->membershipList.erase(iter); + removedVec.push_back(keyTuple); + } + } + } + } + + // O(c*n) operation, but it ensures safety + bool leaderRemoved = false; + for (uint i=0; i<removedVec.size(); i++) { + auto iter = this->membershipList.find(removedVec[i]); + if (iter != this->membershipList.end()) { + + if (leaderIP.compare(get<0>(removedVec[i]))==0) { // this is the leader + leaderRemoved = true; + cout << "[ELECTION] leader " << leaderIP << " is removed" << endl; + } + + this->membershipList.erase(iter); + + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(removedVec[i])+"/"+get<1>(removedVec[i])+"/"+get<2>(removedVec[i])+": REMOVED FROM LOCAL MEMBERSHIP LIST"; + cout << "[REMOVE]" << message.c_str() << endl; + this->logWriter->printTheLog(REMOVE, message); + + //this->debugMembershipList(); + } + } + if (this->membershipList.size()==1 || leaderRemoved) { // Only me or leader failed, restart leader election + if (checkLeaderExist()) { // restart if we have a leader + restartElection(); + } + } + return 0; +} + + +int Node::joinSystem(Member introducer) +{ + string mem_list_to_send = populateMembershipMessage(); + //Messages msg(JOIN, mem_list_to_send); + string msg = populateSDFSFileList(JOIN, mem_list_to_send); + + string message = "["+to_string(this->localTimestamp)+"] sent a request to "+introducer.ip+"/"+introducer.port+", I am "+nodeInformation.ip+"/"+nodeInformation.port; + cout << "[JOIN]" << message.c_str() << endl; + this->logWriter->printTheLog(JOINGROUP, message); + udpServent->sendMessage(introducer.ip, introducer.port, msg); + return 0; +} + +int Node::requestSwitchingMode() +{ + string message = nodeInformation.ip+","+nodeInformation.port; + //Messages msg(SWREQ, message); + string msg = populateSDFSFileList(SWREQ, message); + for(auto& element: this->membershipList) { + tuple<string,string,string> keyTuple = element.first; + //tuple<int, int, int> valueTuple = element.second; + cout << "[SWITCH] sent a request to " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << endl; + udpServent->sendMessage(get<0>(keyTuple), get<1>(keyTuple), msg); + } + return 0; +} + +int Node::SwitchMyMode() +{ + // wait for a while + sleep(T_switch); + // empty all messages + udpServent->qMessages = queue<string>(); + switch (this->runningMode) { + case GOSSIP: { + this->runningMode = ALL2ALL; + cout << "[SWITCH] === from gossip to all-to-all ===" << endl; + break; + } + case ALL2ALL: { + this->runningMode = GOSSIP; + cout << "[SWITCH] === from all-to-all to gossip ===" << endl; + break; + } + default: + break; + } + // finishing up + prepareToSwitch = false; + return 0; +} + +int Node::listenToHeartbeats() +{ + //look in queue for any strings --> if non empty, we have received a message and need to check the membership list + + // 1. deepcopy and handle queue + queue<string> qCopy(udpServent->qMessages); + udpServent->qMessages = queue<string>(); + + int size = qCopy.size(); + //cout << "Got " << size << " messages in the queue" << endl; + //cout << "checking queue size " << nodeOwn->udpServent->qMessages.size() << endl; + + // 2. merge membership list + for (int j = 0; j < size; j++) { + //cout << qCopy.front() << endl; + readMessage(qCopy.front()); + + // Volunteerily leave + if(this->activeRunning == false){ + return 0; + } + // byteReceived += qCopy.front().size(); + qCopy.pop(); + } + + return 0; +} + +void Node::debugMembershipList() +{ + cout << "Membership list [" << this->membershipList.size() << "]:" << endl; + if (isLeader) { + cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; + } else { + cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; + } + string message = ""; + + for (auto& element: this->membershipList) { + tuple<string,string,string> keyTuple = element.first; + tuple<int, int, int> valueTuple = element.second; + + if (nodeInformation.ip.compare(get<0>(keyTuple))==0) { // Myself + if (isLeader) { + message += "[L/M] "; + } else { + message += "[M] "; + } + } else if (leaderIP.compare(get<0>(keyTuple))==0) { + message += "[L] "; + } else { + if (isLeader) { + message += " "; + } else { + message += " "; + } + } + + message += get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple); + message += ": "+to_string(get<0>(valueTuple))+"/"+to_string(get<1>(valueTuple))+"/"+to_string(get<2>(valueTuple))+"\n"; + } + cout << message.c_str() << endl; + this->logWriter->printTheLog(MEMBERS, message); +} + +void Node::processHeartbeat(string message) { + bool changed = false; + vector<string> incomingMembershipList = splitString(message, "\n"); + vector<string> membershipListEntry; + for(string list_entry: incomingMembershipList){ +#ifdef LOG_VERBOSE + cout << "handling with " << list_entry << endl; +#endif + if (list_entry.size() == 0) { + continue; + } + membershipListEntry.clear(); + membershipListEntry = splitString(list_entry, ","); + if (membershipListEntry.size() != 5) { + // circumvent craching + continue; + } + + int incomingHeartbeatCounter = stoi(membershipListEntry[3]); + int failFlag = stoi(membershipListEntry[4]); + tuple<string,string,string> mapKey(membershipListEntry[0], membershipListEntry[1], membershipListEntry[2]); + + if ((get<0>(mapKey).compare(nodeInformation.ip) == 0) && (get<1>(mapKey).compare(nodeInformation.port) == 0)) { + // Volunteerily leave if you are marked as failed + if(failFlag == 1){ + this->activeRunning = false; + + string message = "["+to_string(this->localTimestamp)+"] node "+this->nodeInformation.ip+"/"+this->nodeInformation.port+" is left"; + cout << "[VOLUNTARY LEAVE]" << message.c_str() << endl; + this->logWriter->printTheLog(LEAVE, message); + return; + } + + // do not check itself heartbeat +#ifdef LOG_VERBOSE + cout << "skip it" << endl; +#endif + continue; + } + + map<tuple<string,string,string>, tuple<int, int, int>>::iterator it; + it = this->membershipList.find(mapKey); + if (it == this->membershipList.end() && failFlag == 0) { + tuple<int, int, int> valueTuple(incomingHeartbeatCounter, localTimestamp, failFlag); + this->membershipList[mapKey] = valueTuple; + updateHashRing(); + string message = "["+to_string(this->localTimestamp)+"] new node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" is joined"; + cout << "[JOIN]" << message.c_str() << endl; + this->logWriter->printTheLog(JOINGROUP, message); + changed = true; + } else if(it != this->membershipList.end()) { + // update heartbeat count and local timestamp if fail flag of node is not equal to 1. If it equals 1, we ignore it. + if(get<2>(this->membershipList[mapKey]) != 1){ + //if incoming membership list has node with fail flag = 1, but fail flag in local membership list = 0, we have to set fail flag = 1 in local + switch (this->runningMode) { + case GOSSIP: { + if(failFlag == 1){ + get<2>(this->membershipList[mapKey]) = 1; + get<1>(this->membershipList[mapKey]) = localTimestamp; + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+": Disseminated Failure"; + cout << "[FAIL]" << message.c_str() << endl; + this->logWriter->printTheLog(FAIL, message); + } + else{ + int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]); + if(incomingHeartbeatCounter > currentHeartbeatCounter){ + get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; + get<1>(this->membershipList[mapKey]) = localTimestamp; + // get<2>(this->membershipList[mapKey]) = failFlag; + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); +#ifdef LOG_VERBOSE + cout << "[UPDATE]" << message.c_str() << endl; +#endif + this->logWriter->printTheLog(UPDATE, message); + } + } + break; + } + default: { // ALL2ALL doesn't disseminate + int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]); + if(incomingHeartbeatCounter > currentHeartbeatCounter){ + get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; + get<1>(this->membershipList[mapKey]) = localTimestamp; + get<2>(this->membershipList[mapKey]) = failFlag; + string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); +#ifdef LOG_VERBOSE + cout << "[UPDATE]" << message.c_str() << endl; +#endif + this->logWriter->printTheLog(UPDATE, message); + } + break; + } + } + } else { + // do nothing + } + } + } + + // If membership list changed in all-to-all, full membership list will be sent + if(changed && this->runningMode == ALL2ALL){ + string mem_list_to_send = populateIntroducerMembershipMessage(); + vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); + + for (uint i=0; i<targetNodes.size(); i++) { + Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); + + string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); +#ifdef LOG_VERBOSE + cout << "[Gossip]" << message.c_str() << endl; +#endif + this->logWriter->printTheLog(GOSSIPTO, message); + + if (isLeader) { + //Messages msg(LEADERHEARTBEAT, mem_list_to_send); + if (isBlackout) { + string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } else { + string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } + } else { + //Messages msg(HEARTBEAT, mem_list_to_send); + string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); + } + + } + } +} + +void Node::setUpLeader(string message, bool pending) +{ + string msg(message); + vector<string> fields = splitString(msg, ","); + if(fields.size() >= 3){ + Member leader(fields[0], fields[1]); + leaderPosition = hashingId(leader, fields[2]); + leaderIP = fields[0]; + leaderPort = fields[1]; + } + leaderCreateHashRing(); // local copy of hashRing on each node + + if (pending != isBlackout) { + if (isBlackout) { + cout << "[BLACKOUT] Leader is ready now" << endl; + } else { + cout << "[BLACKOUT] Leader is busy now" << endl; + } + } + if (pending) { + isBlackout = true; + } else { + isBlackout = false; + } +} + +/** + * given a string message which contains a membership list, we will take the string, split it by returns, and then split it by commas, to then compare the heartbeat counters + * of each IP,PORT,timestamp tuple with the membership list of the receiving Node. + * + * Found help on how to do string processing part of this at https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c + */ +void Node::readMessage(string message){ + + // decapsulate with specific messages + //cout << "readMessage " << message << endl; + string deMeg = decapsulateMessage(message); + bool pending = true; + //cout << "readMessage deMeg " << deMeg << endl; + + Messages msg(deMeg); + switch (msg.type) { + case LEADERHEARTBEAT: // Note: not for Gossip-style, only All-to-All + //cout << "LEADERHEARTBEAT: " << msg.payload << endl; + pending = false; + case LEADERPENDING: + setUpLeader(msg.payload, pending); + case HEARTBEAT: + case JOINRESPONSE:{ + processHeartbeat(msg.payload); + break; + } + case JOIN:{ + // introducer checks collision here + vector<string> fields = splitString(msg.payload, ","); + if(fields.size() >= 3){ + Member member(fields[0], fields[1]); + int checkPosition = hashingId(member, fields[2]); + if (checkHashNodeCollision(checkPosition)) { + //Messages response(JOINREJECT, ""); + string response = populateSDFSFileList(JOINREJECT, ""); + udpServent->sendMessage(fields[0], fields[1], response); + } else { + string introducerMembershipList; + introducerMembershipList = populateIntroducerMembershipMessage(); + //Messages response(JOINRESPONSE, introducerMembershipList); + string response = populateSDFSFileList(JOINRESPONSE, introducerMembershipList); + udpServent->sendMessage(fields[0], fields[1], response); + } + } + break; + } + case SWREQ: { + // got a request, send an ack back + vector<string> fields = splitString(msg.payload, ","); + if (fields.size() == 2) { + cout << "[SWITCH] got a request from "+fields[0]+"/"+fields[1] << endl; + string messageReply = nodeInformation.ip+","+nodeInformation.port; + //Messages msgReply(SWRESP, messageReply); + string msgReply = populateSDFSFileList(SWRESP, messageReply); + udpServent->sendMessage(fields[0], fields[1], msgReply); + + prepareToSwitch = true; + } + break; + } + case SWRESP: { + // got an ack + vector<string> fields = splitString(msg.payload, ","); + if (fields.size() == 2) { + cout << "[SWITCH] got an ack from "+fields[0]+"/"+fields[1] << endl; + } + break; + } + case JOINREJECT: { + // TODO: the node should leave + cout << "[JOINREJECT] There is a collision, and I have to leave..." << endl; + this->activeRunning = false; + pthread_exit(NULL); + break; + } + default: + break; + } + //debugMembershipList(); +} + +vector<string> Node::splitString(string s, string delimiter){ + vector<string> result; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + result.push_back (token); + } + + result.push_back (s.substr (pos_start)); + return result; +} + +int Node::hashingId(Member nodeMember, string joinTime) +{ + string toBeHashed = "NODE::" + nodeMember.ip + "::" + nodeMember.port + "::" + joinTime; + int ringPosition = hash<string>{}(toBeHashed) % HASHMODULO; + return ringPosition; +} + +int Node::getPositionOnHashring(){ + hashRingPosition = hashingId(nodeInformation, joinTimestamp); + cout << "[ELECTION] This node is at hash position: " << hashRingPosition << endl; + return 0; +} + +int Node::updateHashRing(){ + bool needToUpdate = true; + for(auto& it: membershipList){ + needToUpdate = true; + string ip = get<0>(it.first); + for(int i: hashRing->nodePositions){ + if(ip.compare(hashRing->getValue(i)) == 0){ + needToUpdate = false; + break; + } + } + if(needToUpdate){ + Member toBeInserted(ip, get<1>(it.first)); + int hashPosition = hashingId(toBeInserted, get<2>(it.first)); + hashRing->addNode(ip, hashPosition); + } + } + return 0; +} + +bool Node::checkLeaderExist() +{ + return leaderPosition != -1; +} + +bool Node::checkHashNodeCollision(int checkPosition) +{ + // if True, the position is full + for (auto& element: this->membershipList) { + tuple<string, string, string> keyTuple = element.first; + Member member(get<0>(keyTuple), get<1>(keyTuple)); + if (nodeInformation.ip.compare(member.ip)==0) { // myself, skip it + continue; + } + int pos = hashingId(member, get<2>(keyTuple)); + if (pos == checkPosition) { + return true; + } + } + return false; +} + +bool Node::findWillBeLeader() +{ + bool beLeader = true; + vector<int> positions; + vector<string> ipAddresses; + if (membershipList.size() > 1) { // only 1 member does not need the leader + for (auto& element: this->membershipList) { + tuple<string, string, string> keyTuple = element.first; + Member member(get<0>(keyTuple), get<1>(keyTuple)); + int pos = hashingId(member, get<2>(keyTuple)); + if (pos < hashRingPosition) { + //cout << get<0>(keyTuple) << " with id " << pos << " is smaller" << endl; + beLeader = false; + } + if (nodeInformation.ip.compare(get<0>(keyTuple))!=0) { + int posNext = (pos + (HASHMODULO-hashRingPosition)) % HASHMODULO; + positions.push_back(posNext); + ipAddresses.push_back(get<0>(keyTuple)); + } + } + } else { + beLeader = false; + } + + if (positions.size() > 0) { + int index = 0; + int possibleSuccessor = positions[index]; + for (uint i=1; i<positions.size(); i++) { + if (positions[i] < possibleSuccessor) { + possibleSuccessor = positions[i]; + index = i; + } + } + //cout << "[ELECTION] My Possible Successor is " << ipAddresses[index] << endl; + possibleSuccessorIP = ipAddresses[index]; + } + + return beLeader; +} + +void Node::restartElection() // haven't tested yet +{ + cout << "[ELECTION] No leader now, restart election..." << endl; + electedTime = localTimestamp; + isLeader = false; + leaderPosition = -1; + leaderIP = ""; + leaderPort = ""; +} + +void Node::leaderCreateHashRing() +{ + // The leader or notes creates hashRing + hashRing->clear(); + for (auto& element: this->membershipList) { // update hashRing + tuple<string, string, string> keyTuple = element.first; + Member member(get<0>(keyTuple), get<1>(keyTuple)); + int pos = hashingId(member, get<2>(keyTuple)); + //hashRing->addNode("NODE::"+get<0>(keyTuple), pos); // since we don't store file, remove NODE + hashRing->addNode(get<0>(keyTuple), pos); + } + //hashRing->debugHashRing(); +} + +void Node::proposeToBeLeader() +{ + // Start election + Messages msg(ELECTION, to_string(hashRingPosition)); + cout << "[ELECTION] Propose to be leader, send to " << possibleSuccessorIP << endl; + tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); +} + +void Node::processElection(Messages messages) +{ + switch (messages.type) { + case ELECTION: { // check id + int currentId = stoi(messages.payload); + if (hashRingPosition > currentId) { + //incoming is smaller, just forward + cout << "[ELECTION] Got Election, agree on voting: " << messages.payload << endl; + tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString()); + } else if (hashRingPosition < currentId) { + //incoming is biger, replace and send it + cout << "[ELECTION] Got Election, against this voting " << messages.payload; + cout << ", and using my id " << hashRingPosition << endl; + Messages msg(ELECTION, to_string(hashRingPosition)); + tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); + } else { // finish 1st pass + cout << "[ELECTION] Got Election, everyone voted on me and start acking" << endl; + Messages msg(ELECTIONACK, to_string(hashRingPosition)); + tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); + } + break; + } + case ELECTIONACK: { + int currentId = stoi(messages.payload); + if (hashRingPosition == currentId) { // finish 2 pass + cout << "[ELECTION] I am the leader now" << endl; + isBlackout = false; + leaderPosition = hashRingPosition; + isLeader = true; + leaderIP = nodeInformation.ip; + leaderPort = nodeInformation.port; + leaderCreateHashRing(); + } else { + // Not me, just forward + cout << "[ELECTION] Pass ACK " << messages.payload << endl; + tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString()); + } + electedTime = localTimestamp; // update elected time + cout << "[ELECTION] Elected at Local Time " << electedTime << endl; + break; + } + default: + break; + } +} + +void Node::processTcpMessages() +{ + queue<string> qCopy(tcpServent->qMessages); + tcpServent->qMessages = queue<string>(); + + int size = qCopy.size(); + //cout << "Got " << size << " TCP messages" << endl; + + for (int j=0; j<size; j++) { + //cout << qCopy.front() << endl; + Messages msg(qCopy.front()); + //cout << "Has " << msg.type << " with " << msg.payload << endl; + switch (msg.type) { + case ELECTION: + case ELECTIONACK: { + processElection(msg); + break; + } + default: + break; + } + qCopy.pop(); + } +} + +void Node::updateFileList(string sdfsfilename, int nodePosition) +{ + if (isLeader) { + vector<int> positions = fileList[sdfsfilename]; + bool existed = false; + for (uint i=0; i<positions.size(); i++) { + if (positions[i] == nodePosition) { + existed = true; + } + } + if (!existed) { + positions.push_back(nodePosition); + } + vector<int> storedPositionsCopy(positions); + fileList[sdfsfilename] = storedPositionsCopy; + } +} + +//Can only be called when we are the leader node, since we are going to be calling REREPLICATE here, and checking the global file list. +//Called in ProcessRegMessages before we do anything else, since we want to check the global file list consistency before we process the other messages +//In the Queue. +void Node::checkFileListConsistency(){ + if (membershipList.size() < 4) { + cout << "[ERROR] The number of members are too small, we need at least 4" << endl; + return; + } + for (auto& element: fileList) { + if(element.second.size() < 4){ + //Need to rereplicate --> do this one at a time + //First check the closest node, successor and predecessor + int closestNodePostion = hashRing->locateClosestNode(element.first); + int pred = hashRing->getPredecessor(closestNodePostion); + int succ = hashRing->getSuccessor(closestNodePostion); + int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ)); + vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode}; + for(unsigned int i = 0; i < nodesToCheck.size(); i++){ + if (!isInVector(element.second, nodesToCheck[i])) + { + string nodeInfo = hashRing->getValue(nodesToCheck[i]); + Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); + tuple<int, int, int> request = pendingRequests[element.first]; + if(get<0>(request) == 0 && get<1>(request) == 0 && get<2>(request) == 0){ + pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); + pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + break; + } + if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ + cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; + break; + } + pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); + pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); + tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + break; + } + } + } + } + +} + +bool Node::isInVector(vector<int> v, int i){ + for(int element: v){ + if(element == i){ + return true; + } + } + return false; +} + + +void Node::processRegMessages() +{ + //Before we do anything here, we should have the leader check to see if the file list is consistent or not. + //We do this by: + //1. Checking the files in the filelist, making sure each one has 4 entries. If not, then we need to rereplicate. + // We can initiate a PUT, put pending request, setting as -1, -1, and then last one as target node that we want to replicate to (new node to replace the one that failed) + if(isLeader){ + checkFileListConsistency(); + } + queue<string> qCopy(tcpServent->regMessages); + tcpServent->regMessages = queue<string>(); + + int size = qCopy.size(); + //cout << "Got " << size << " TCP messages" << endl; + + for (int j=0; j<size; j++) { + // cout << qCopy.front() << endl; + vector<string> msgSplit = splitString(qCopy.front(), "::"); + if (msgSplit.size() < 1){ + qCopy.pop(); + continue; + } + string payload = ""; + for(uint k = 1; k < msgSplit.size(); k++){ + if(k == msgSplit.size() - 1){ + payload += msgSplit[k]; + } else { + payload += msgSplit[k] + "::"; + } + } + MessageType msgType = static_cast<MessageType>(stoi(msgSplit[0])); + Messages msg(msgType, payload); + // cout << "Has " << msg.type << " with " << msg.payload << endl; + switch (msg.type) { + case PUTACK: { + vector<string> inMsg = splitString(msg.payload, "::"); + if(inMsg.size() >= 4){ + string inMsgIP = inMsg[0]; + string sdfsfilename = inMsg[1]; + string localfilename = inMsg[2]; + string remoteLocalname = inMsg[3]; + + cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl; + + localFilelist[sdfsfilename] = localfilename; + Messages outMsg(ACK, to_string(this->hashRingPosition)+"::"+sdfsfilename+"::"+remoteLocalname); + this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); + } + break; + } + case DELETE: { + if (isLeader) { + vector<string> inMsg = splitString(msg.payload, "::"); + if(inMsg.size() >= 2){ + string inMsgIP = inMsg[0]; + string sdfsfilename = inMsg[1]; + + cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; + localFilelist.erase(sdfsfilename); + fileList.erase(sdfsfilename); + // This is TCP, so we don't need to ACK + } + } + break; + } + case DNSGET: { + if(isLeader){ + // Do replicating to the node + //isBlackout = true; + vector<string> inMsg = splitString(msg.payload, "::"); + cout << "msg.payload " << msg.payload << endl; + if(inMsg.size() >= 4){ + string inMsgIP = inMsg[0]; + int nodePosition = stoi(inMsg[1]); + int selectedNodePosition = nodePosition; + string sdfsfilename = inMsg[2]; + string localfilename = inMsg[3]; + cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl; + vector<int> positions = fileList[sdfsfilename]; + if (positions.size() == 0) { + // the file is not available + cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl; + fileList.erase(sdfsfilename); + Messages outMsg(GETNULL, sdfsfilename+": the file is not available::"); + this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); + //isBlackout = false; + break; + } + cout << "[DNSGET] we have "; + for (uint i=0; i<positions.size(); i++) { // pick any node other than the requested node + cout << positions[i] << " "; + if (positions[i]!=nodePosition) { + selectedNodePosition = positions[i]; + } + } + cout << endl; + cout << "[DNSGET] we picks " << selectedNodePosition << endl; + pendingRequests[sdfsfilename] = tuple<int, int, int>(-1, -1, nodePosition); + pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, true, true); + string nodeIP = hashRing->getValue(selectedNodePosition); + pendingSenderRequests[sdfsfilename] = tuple<string, string, string>("", "", nodeIP); + Messages outMsg(REREPLICATEGET, to_string(nodePosition) + "::" + sdfsfilename+ "::" +localfilename); + cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos "; + cout << to_string(nodePosition) << endl; + this->tcpServent->sendMessage(nodeIP, TCPPORT, outMsg.toString()); + } + } + break; + } + case DNS: { + // TODO: finish DNS functionality here, send out DNSANS + if(isLeader){ + // Check hashring, get positions and send out DNS ANS + isBlackout = true; + vector<string> inMsg = splitString(msg.payload, "::"); + if(inMsg.size() >= 4){ + string inMsgIP = inMsg[0]; + int nodePosition = stoi(inMsg[1]); + string sdfsfilename = inMsg[2]; + string localfilename = inMsg[3]; + + cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; + cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; + //this->localFilelist[sdfsfilename] = localfilename; + // update fileList, client itself is one of the replicas + updateFileList(sdfsfilename, nodePosition); + hashRing->debugHashRing(); + int closestNode = hashRing->locateClosestNode(sdfsfilename); + int pred = hashRing->getPredecessor(closestNode); + int succ = hashRing->getSuccessor(closestNode); + if (hashRing->getValue(closestNode).compare(inMsgIP)==0) { + closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + cout << "[DNS] we need one more node " << closestNode << endl; + } + if (hashRing->getValue(pred).compare(inMsgIP)==0) { + pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + cout << "[DNS] we need one more node " << pred << endl; + } + if (hashRing->getValue(succ).compare(inMsgIP)==0) { + succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); + cout << "[DNS] we need one more node " << succ << endl; + } + cout << "[DNS] we have nodes [" << closestNode << " (closestNode), "; + cout << pred << " (pred), " << succ << " (succ)], reply " << closestNode << endl; + pendingRequests[sdfsfilename] = tuple<int, int, int>(closestNode, pred, succ); + pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, false, false); + pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(inMsgIP, "", ""); + Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename); + this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); + } + } + + break; + } + case DNSANS:{ + // Read the answer and send a PUT msg to dest + vector<string> inMsg = splitString(msg.payload, "::"); + if(inMsg.size() >= 3){ + int nodePosition = stoi(inMsg[0]); + // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address + string nodeIP = hashRing->getValue(nodePosition); + //cout << "nodeIP " << nodeIP << endl; + + cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; + cout << " using localfilename: " << inMsg[1] << endl; + + string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"; + this->tcpServent->pendSendMessages.push(sendMsg); + //this->tcpServent->sendFile(nodeIP, TCPPORT, inMsg[1], inMsg[2], ""); + } + break; + } + case REREPLICATEGET: { + vector<string> inMsg = splitString(msg.payload, "::"); + if (inMsg.size() >= 3) { + int nodePosition = stoi(inMsg[0]); + // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address + string nodeIP = hashRing->getValue(nodePosition); + string sdfsfilename = inMsg[1]; + string remoteLocalfilename = inMsg[2]; + string localfilename = this->localFilelist[sdfsfilename]; + cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; + cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; + string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename; + this->tcpServent->pendSendMessages.push(sendMsg); + //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); + } + break; + } + case REREPLICATE:{ + // Read the answer and send a PUT msg to dest + vector<string> inMsg = splitString(msg.payload, "::"); + if (inMsg.size() >= 2) { + int nodePosition = stoi(inMsg[0]); + // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address + string nodeIP = hashRing->getValue(nodePosition); + string sdfsfilename = inMsg[1]; + string localfilename = this->localFilelist[sdfsfilename]; + cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl; + cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl; + string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"; + this->tcpServent->pendSendMessages.push(sendMsg); + //this->tcpServent->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, ""); + } + break; + } + case GETNULL: { + vector<string> inMsg = splitString(msg.payload, "::"); + if (inMsg.size() >= 1) { + cout << "[GETNULL] " << inMsg[0] << endl; + } + break; + } + + case ACK:{ + vector<string> inMsg = splitString(msg.payload, "::"); + if (inMsg.size() >= 3) { + string nodePosition = inMsg[0]; + string sdfsfilename = inMsg[1]; + string localfilename = inMsg[2]; + localFilelist[sdfsfilename] = localfilename; + + Messages outMsg(LEADERACK, this->nodeInformation.ip + "::" + to_string(this->hashRingPosition) + "::" + msg.payload); + cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename; + cout << " on node " << nodePosition << ", and ACK back to the leader" << endl; + this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); + } + + break; + } + case LEADERACK:{ + if(isLeader){ + //TODO: tick the list off + vector<string> inMsg = splitString(msg.payload, "::"); + if(inMsg.size() >= 4){ + string inMsgIP = inMsg[0]; + int inMsgnodePosition = stoi(inMsg[1]); + int nodePosition = stoi(inMsg[2]); + string sdfsfilename = inMsg[3]; + string replicatedNodeIP = hashRing->getValue(nodePosition); + + cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl; + string closestNodeIP = ""; + + // update fileList + updateFileList(sdfsfilename, inMsgnodePosition); + updateFileList(sdfsfilename, nodePosition); + + vector<int> temp; + cout << "pendingRequests: "; + if (get<0>(pendingRequests[sdfsfilename]) == nodePosition) { + closestNodeIP = hashRing->getValue(get<0>(pendingRequests[sdfsfilename])); + temp.push_back(-1); + } else { + temp.push_back(get<0>(pendingRequests[sdfsfilename])); + } + cout << temp[0] << " (sent: " << get<0>(pendingRequestSent[sdfsfilename]); + cout << ", from " << get<0>(pendingSenderRequests[sdfsfilename]) << "), "; + if (get<1>(pendingRequests[sdfsfilename]) == nodePosition) { + temp.push_back(-1); + } else { + temp.push_back(get<1>(pendingRequests[sdfsfilename])); + } + cout << temp[1] << " (sent: " << get<1>(pendingRequestSent[sdfsfilename]); + cout << ", from " << get<1>(pendingSenderRequests[sdfsfilename]) << "), "; + if (get<2>(pendingRequests[sdfsfilename]) == nodePosition) { + temp.push_back(-1); + } else { + temp.push_back(get<2>(pendingRequests[sdfsfilename])); + } + cout << temp[2] << " (sent:" << get<2>(pendingRequestSent[sdfsfilename]); + cout << ", from " << get<2>(pendingSenderRequests[sdfsfilename]) << ")" << endl; + pendingRequests[sdfsfilename] = tuple<int, int, int>(temp[0], temp[1], temp[2]); + + if(get<1>(pendingRequests[sdfsfilename]) == -1 && get<2>(pendingRequests[sdfsfilename])== -1){ + pendingRequests.erase(sdfsfilename); + pendingRequestSent.erase(sdfsfilename); + pendingSenderRequests.erase(sdfsfilename); + cout << "[LEADERACK] 3 or more Replicated files are done" << endl; + isBlackout = false; + break; + } + if((get<1>(pendingRequests[sdfsfilename])!=-1) && (!get<1>(pendingRequestSent[sdfsfilename]))){ + Messages outMsg(REREPLICATE, to_string(get<1>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); + // cout << "Sending out rereplicate to " << inMsgIP << "with message " << outMsg.toString() << endl; + cout << "[LEADERACK] Ask node incoming " << inMsgIP << " to replicate on pos "; + cout << to_string(get<1>(pendingRequests[sdfsfilename])) << endl; + this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); + pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), true, get<2>(pendingRequestSent[sdfsfilename])); + pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), inMsgIP, get<2>(pendingSenderRequests[sdfsfilename])); + } + if((get<2>(pendingRequests[sdfsfilename]) != -1) && (!get<2>(pendingRequestSent[sdfsfilename]))){ + Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); + // cout << "Sending out rereplicate to " << closestNodeIP << "with message " << outMsg.toString() << endl; + cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos "; + cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl; + this->tcpServent->sendMessage(closestNodeIP, TCPPORT, outMsg.toString()); + pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), get<1>(pendingRequestSent[sdfsfilename]), true); + pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), get<1>(pendingSenderRequests[sdfsfilename]), inMsgIP); + } + } + } + break; + } + default: + break; + } + qCopy.pop(); + } +} + +/** + * Store the given filename in your sdfs filename, discard the original name, and + * give it a new name. The hashing will be done based on this sdfs filename. + * + * Can be called by any node, this one will be called by sender + * +*/ +// int Node::putFileSender(string filename, string sdfsfilename){ +// tcpServent->sendFile(leaderIP, leaderPort, filename); +// return 0; + +// } + +// int Node::putFileMaster(string sdfsfilename){ +// return 0; +// } + +// int Node::putFileReeiver(string sdfsfilename){ +// return 0; + +// } + +void Node::listLocalFiles(){ + cout << "sdfsfilename ---> localfilename" << endl; + for (auto& element: localFilelist) { + cout << element.first << " ---> " << element.second << endl; + } +} + +void Node::debugSDFSFileList() { + cout << "sdfsfilename ---> positions,..." << endl; + for (auto& element: fileList) { + cout << element.first << " ---> "; + for (uint i=0; i<element.second.size(); i++) { + cout << element.second[i]; + if (i == element.second.size()-1) { + continue; + } else { + cout << ", "; + } + } + cout << endl; + } +} + +void Node::listSDFSFileList(string sdfsfilename) { + bool found = false; + vector<int> foundPositions; + for (auto& element: fileList) { + if(element.first.compare(sdfsfilename)==0) { // found sdfsfilename + found = true; + foundPositions = element.second; + break; + } + } + if (found) { + if (foundPositions.size() > 0) { + cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; + cout << "=========" << endl; + for (uint i=0; i<foundPositions.size(); i++) { + string storedIP = hashRing->getValue(foundPositions[i]); + cout << storedIP << " at " << foundPositions[i] << endl; + } + } else { + cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; + cout << "=== Current list is empty ===" << endl; + } + } else { + cout << "sdfsfilename " << sdfsfilename << " is not existed" << endl; + } +} + +string Node::encapsulateFileList() +{ + string enMeg = ""; + if (checkLeaderExist() && isLeader) { + for (auto& element: fileList) { + string positions = ""; + string sdfsfilename = element.first; + + for (uint i=0; i<element.second.size(); i++) { + positions += to_string(element.second[i]); + if (i != element.second.size()-1) { + positions += ","; + } + } + //cout << "sdfsfilename " << sdfsfilename << endl; + //cout << "positions " << positions << endl; + char *cstr = new char[sdfsfilename.length()+positions.length()+7]; + size_t len = sdfsfilename.length()+3; + cstr[0] = len & 0xff; + cstr[1] = (len >> 8) & 0xff; + if (cstr[1] == 0) { // avoid null + cstr[1] = 0xff; + } + //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); + cstr[2] = FILENAME; + for (uint i=0; i<sdfsfilename.length(); i++) { + cstr[i+3] = sdfsfilename.c_str()[i]; + } + size_t len2 = positions.length()+3; + cstr[sdfsfilename.length()+3] = len2 & 0xff; + cstr[sdfsfilename.length()+4] = (len2 >> 8) & 0xff; + if (cstr[sdfsfilename.length()+4] == 0) { // avoid null + cstr[sdfsfilename.length()+4] = 0xff; + } + //printf("cstr[3] %x, cstr[4] %x\n", cstr[0], cstr[1]); + cstr[sdfsfilename.length()+5] = FILEPOSITIONS; + //printf("cstr[%lu] %d\n", sdfsfilename.length()+2, cstr[sdfsfilename.length()+2]); + for (uint i=0; i<positions.length(); i++) { + cstr[sdfsfilename.length()+6+i] = positions.c_str()[i]; + } + cstr[sdfsfilename.length()+positions.length()+6] = '\0'; + //printf("cstrFile %s\n", cstr); + string enMegFile(cstr); + //cout << "enMegFile " << enMegFile << endl; + enMeg += enMegFile; + } + //cout << "encapsulateFileList " << enMeg << endl; + } + return enMeg; +} + +string Node::encapsulateMessage(map<PayloadType,string> payloads) +{ + string enMeg = ""; + //cout << "payloads.size " << payloads.size() << endl; + for (auto& element: payloads) { + PayloadType type = element.first; + string message = element.second; + //cout << "message " << message << endl; + //cout << "message.length " << message.length() << endl; + //cout << "type " << type << endl; + + char *cstr = new char[message.length()+4]; + size_t len = message.length()+3; + cstr[0] = len & 0xff; + cstr[1] = (len >> 8) & 0xff; + if (cstr[1] == 0) { // avoid null + cstr[1] = 0xff; + } + //printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]); + cstr[2] = type; + //printf("cstr[2] %x\n", cstr[2]); + for (uint i=0; i<message.length(); i++) { + cstr[i+3] = message.c_str()[i]; + } + cstr[message.length()+3] = '\0'; + //printf("cstrMsg %s\n", cstr); + string enMegPart(cstr); + //cout << "enMegPart " << enMegPart << endl; + enMeg += enMegPart; + } + //cout << "encapsulateMessage " << enMeg << endl; + return enMeg; +} + +void Node::decapsulateFileList(string payload) +{ + int size = payload.length(); + uint pos = 0; + fileList.clear(); + string lastFilename = ""; + while (size > 0) { + size_t length; + if ((payload.c_str()[1+pos] & 0xff) == 0xff) { + length = 0; + } else { + length = (payload.c_str()[1+pos]) & 0xff; + length = length << 8; + } + length += (payload.c_str()[0+pos]) & 0xff; + PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]); + //printf(" len %lu, type %d\n", length, type); + char cstr[length]; + bzero(cstr, sizeof(cstr)); + for (uint i=3; i<length; i++) { + cstr[i-3] = payload.c_str()[pos+i]; + } + string deMegPart(cstr); + switch (type) { + case FILENAME: { + //cout << "FILENAME " << deMegPart << endl; + lastFilename = deMegPart; + break; + } + case FILEPOSITIONS: { + //cout << "FILEPOSITIONS " << deMegPart << endl; + vector<string> temp = splitString(deMegPart, ","); + vector<int> positions; + for (uint i=0; i<temp.size(); i++) { + if (temp[i].compare("")!=0) { + positions.push_back(stoi(temp[i])); + } + } + fileList[lastFilename] = positions; + break; + } + default: + break; + } + size -= length; + pos += length; + } + + // check with local file list + if (!isLeader) { + vector<string> fileToDelete; + for (auto& element: localFilelist) { + //cout << "sdfsfilename " << element.first << endl; + if (fileList[element.first].size() == 0) { + fileToDelete.push_back(element.first); + } + } + for (uint i=0; i<fileToDelete.size(); i++) { + localFilelist.erase(fileToDelete[i]); + cout << "[DELETE] sdfsfilename " << fileToDelete[i] << endl; + } + } +} + +string Node::decapsulateMessage(string payload) +{ + int size = payload.length(); + uint pos = 0; + //cout << "payload " << payload << endl; + //cout << "size " << size << endl; + string deMeg = ""; + while (size > 0) { + size_t length; + if ((payload.c_str()[1+pos] & 0xff) == 0xff) { + length = 0; + } else { + length = (payload.c_str()[1+pos]) & 0xff; + length = length << 8; + } + length += (payload.c_str()[0+pos] & 0xff); + //printf("lengthMeg %x %x %lu\n", payload.c_str()[0+pos], payload.c_str()[1+pos], length); + + PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]); + //printf(" len %lu, type %d\n", length, type); + char cstr[length]; + bzero(cstr, sizeof(cstr)); + for (uint i=3; i<length; i++) { + cstr[i-3] = payload.c_str()[pos+i]; + } + //printf("cstr %s\n", cstr); + string deMegPart(cstr); + //cout << "deMegPart " << deMegPart << endl; + if (type == REGULAR) { + deMeg = deMegPart; + } else if (type == FILEPAIR) { + if (checkLeaderExist() && !isLeader) { + //cout << "FILEPAIR " << deMegPart << endl; + decapsulateFileList(deMegPart); + } + } + //cout << "size1 " << size << endl; + size -= length; + pos += length; + //cout << "size2 " << size << endl; + } + //cout << "deMeg " << deMeg << endl; + return deMeg; +} + +// piggyback fileList in heartbeat +string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) +{ + Messages msg(type, mem_list_to_send); + //cout << "populateSDFSFileList " << msg.toString() << endl; + map<PayloadType,string> payloads; + payloads[REGULAR] = msg.toString(); + if (isLeader) { // Only the leader includes the fileList + payloads[FILEPAIR] = encapsulateFileList(); + } + string enMeg = encapsulateMessage(payloads); + return enMeg; +} + +void Node::findNodesWithFile(string sdfsfilename){ + /*tuple<int, int, int> nodes = fileList[sdfsfilename]; + cout << hashRing->getValue(get<0>(nodes)) << endl; + cout << hashRing->getValue(get<1>(nodes)) << endl; + cout << hashRing->getValue(get<2>(nodes)) << endl;*/ +} + diff --git a/src/RandomGenerator.cpp b/src/RandomGenerator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b70b6ebd47fbc3aaa43e542400936a2457b8920f --- /dev/null +++ b/src/RandomGenerator.cpp @@ -0,0 +1,69 @@ +#include "../inc/Node.h" + +vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() +{ + //make an index list, then math.random to get index + // from list (remember to exclude the node itself) + vector<int> indexList; + vector<string> IPList; + vector<string> portList; + vector<string> timestampList; + vector<tuple<string, string, string>> selectedNodesInfo; + //int i = 0; + for(auto& element: this->membershipList){ + tuple<string, string, string> keyPair = element.first; + tuple<int, int, int> valueTuple = element.second; + //cout << "run " << keyPair.first << "/" << keyPair.second << endl; + //add check to make sure we don't add any failed nodes to gossip to + if(get<0>(keyPair).compare(this->nodeInformation.ip) != 0 && get<2>(valueTuple) != 1){ + IPList.push_back(get<0>(keyPair)); + portList.push_back(get<1>(keyPair)); + timestampList.push_back(get<2>(keyPair)); + indexList.push_back(IPList.size()-1); // bug fix + } + //i++; + } + // on one to gossip, return an empty vector + if (IPList.size() == 0) { + return selectedNodesInfo; + } + + /*for (uint j=0; j<indexList.size(); j++) { + cout << indexList[j] << ":" << IPList[j] << "/" << portList[j] << endl; + }*/ + + switch (this->runningMode) { + case GOSSIP: { + srand(time(NULL)); + int nodesSelected = 0; + // N_b is a predefined number + if(IPList.size() <= N_b){ + for (uint j=0; j<indexList.size(); j++) { + //int chosenIndex = indexList[j]; + //cout << "put " << IPList[j] << "/" << portList[j] << endl; + selectedNodesInfo.push_back(make_tuple(IPList[j], portList[j], timestampList[j])); + } + } + else{ + while (nodesSelected < N_b) { + int randomNum = rand() % indexList.size(); + int chosenIndex = indexList[randomNum]; + selectedNodesInfo.push_back(make_tuple(IPList[chosenIndex], portList[chosenIndex], timestampList[chosenIndex])); + indexList.erase(indexList.begin() + randomNum); + nodesSelected++; + } + } + break; + } + default: { + // All2All + for (uint j=0; j<indexList.size(); j++) { + //int chosenIndex = indexList[j]; + //cout << "put " << IPList[j] << "/" << portList[j] << endl; + selectedNodesInfo.push_back(make_tuple(IPList[j], portList[j], timestampList[j])); + } + break; + } + } + return selectedNodesInfo; +} \ No newline at end of file diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2860bf143f5ef33934be26692d5874d7628269e7 --- /dev/null +++ b/src/TcpSocket.cpp @@ -0,0 +1,418 @@ +#include "../inc/TcpSocket.h" +#include "../inc/UdpSocket.h" +#include "../inc/Messages.h" +#include "../inc/FileObject.h" + +void sigchld_handler(int s) +{ + while(waitpid(-1, NULL, WNOHANG) > 0); +} + +TcpSocket::TcpSocket() +{ + +} + +void TcpSocket::bindServer(string port) +{ + int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + struct addrinfo hints, *servinfo, *p; + struct sockaddr_storage their_addr; // connector's address information + socklen_t sin_size; + struct sigaction sa; + int yes = 1; + char s[INET6_ADDRSTRLEN]; + int rv; + char buf[DEFAULT_TCP_BLKSIZE]; + int numbytes; + string delimiter = "::"; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; // use my IP + + if ((rv = getaddrinfo(NULL, port.c_str(), &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return; + } + + // loop through all the results and bind to the first we can + for(p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("server: socket"); + continue; + } + + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, + sizeof(int)) == -1) { + perror("setsockopt"); + exit(1); + } + + if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + close(sockfd); + perror("server: bind"); + continue; + } + + break; + } + + if (p == NULL) { + fprintf(stderr, "server: failed to bind\n"); + return; + } + + freeaddrinfo(servinfo); // all done with this structure + + if (listen(sockfd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } + + sa.sa_handler = sigchld_handler; // reap all dead processes + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + if (sigaction(SIGCHLD, &sa, NULL) == -1) { + perror("sigaction"); + exit(1); + } + + //printf("server: waiting for connections...\n"); + + while(1) { // main accept() loop + sin_size = sizeof their_addr; + new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + if (new_fd == -1) { + perror("accept"); + continue; + } + + inet_ntop(their_addr.ss_family, + get_in_addr((struct sockaddr *)&their_addr), + s, sizeof s); + //printf("server: got connection from %s\n", s); + + bzero(buf, sizeof(buf)); + MessageType type = JOIN; // for default case here + string payload = ""; + if ((numbytes = recv(new_fd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { + //buf[numbytes] = '\0'; + //printf("Got %s\n", buf); + + string payloadMessage(buf); + Messages msg(payloadMessage); + //printf("message type: %d\n", msg.type); + type = msg.type; + payload = msg.payload; + } + + switch (type) { + case ELECTION: + case ELECTIONACK: { + //cout << "election id is " << payload << endl; + string payloadMessage(buf); + //cout << "payloadMessage " << payloadMessage << endl; + qMessages.push(payloadMessage); + //cout << "qMessages size 1 " << qMessages.size() << endl; + break; + } + case PUT: { + FILE *fp; + int filesize = 0; + int byteReceived = 0; + string sdfsfilename = ""; + string incomingChecksum = ""; + string remoteLocalname = ""; + string overwriteFilename = ""; + // format: size,checksum,sdfsfilename + vector<string> fields = splitString(payload, ","); + if (fields.size() >= 5) { + filesize = stoi(fields[0]); + incomingChecksum = fields[1]; + sdfsfilename = fields[2]; + remoteLocalname = fields[3]; + overwriteFilename = fields[4]; + } + cout << "file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; + + time_t fileTimestamp; + time(&fileTimestamp); + string localfilename = sdfsfilename+"_"+to_string(fileTimestamp); + if (overwriteFilename.compare("") != 0) { + localfilename = overwriteFilename; + cout << "it's GET with filename " << overwriteFilename << endl; + } + cout << "backup filename " << localfilename << endl; + fp = fopen(localfilename.c_str(), "wb"); + if (fp == NULL) { + cout << "file error" << endl; + close(new_fd); + exit(0); + } + + bzero(buf, sizeof(buf)); + while ((numbytes=recv(new_fd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { + //printf("Got %d\n", numbytes); + fwrite(buf, sizeof(char), numbytes, fp); + byteReceived += numbytes; + if (byteReceived >= filesize) { + break; + } + bzero(buf, sizeof(buf)); + } + cout << "we have all the file, finishing this connections" << endl; + fclose(fp); + + FileObject f(localfilename); + if(incomingChecksum.compare(f.checksum) != 0){ + cout << "[ERROR] FILE CORRUPTED" << endl; + // TODO: Handel file corruption here + } else { + string returnIP(s); + Messages putack(PUTACK, returnIP + "::" + sdfsfilename + "::" + localfilename+"::"+remoteLocalname); + regMessages.push(putack.toString()); + } + + break; + } + case DNSANS: + case ACK: + case PUTACK: + case LEADERACK: + case REREPLICATE: + case REREPLICATEGET: + case DNSGET: + case DELETE: + case GETNULL: + case DNS:{ + string payloadMessage(buf); + cout << "Type: " << type << " payloadMessage: " << payloadMessage << endl; + regMessages.push(payloadMessage); + break; + } + default: + break; + } + close(new_fd); + } +} + +string TcpSocket::getFileMetadata(int size, string checksum, + string sdfsfilename, string localfilename, string remoteLocalfilename) +{ + // format: size,checksum,sdfsfilename + string msg = to_string(size) + "," + checksum + "," + sdfsfilename+","+localfilename+","+remoteLocalfilename; + return msg; +} + +void TcpSocket::sendFile(string ip, string port, + string localfilename, string sdfsfilename, string remoteLocalfilename) +{ + int sockfd, numbytes; + char buf[DEFAULT_TCP_BLKSIZE]; + struct addrinfo hints, *servinfo, *p; + int rv; + char s[INET6_ADDRSTRLEN]; + FILE *fp; + int size = 0; + + bzero(buf, sizeof(buf)); + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return; + } + + // loop through all the results and connect to the first we can + for (p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("client: socket"); + continue; + } + + if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + close(sockfd); + perror("client: connect"); + continue; + } + + break; + } + + if (p == NULL) { + fprintf(stderr, "client: failed to connect\n"); + return; + } + + inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), + s, sizeof s); + printf("client: connecting to %s\n", s); + + freeaddrinfo(servinfo); // all done with this structure + + // read file + fp = fopen(localfilename.c_str(), "rb"); + if (fp == NULL) { + printf("Could not open file to send."); + return; + } + fseek(fp, 0, SEEK_END); + size = ftell(fp); + fseek(fp, 0, SEEK_SET); + + // send bytes and filename first + FileObject f(localfilename); + Messages msg(PUT, getFileMetadata(size, f.checksum, sdfsfilename, localfilename, remoteLocalfilename)); + string payload = msg.toString(); + + if (send(sockfd, payload.c_str(), strlen(payload.c_str()), 0) == -1) { + perror("send"); + } + sleep(1); + + while (!feof(fp) && size > 0) { + if (size < DEFAULT_TCP_BLKSIZE) { + bzero(buf, sizeof(buf)); + numbytes = fread(buf, sizeof(char), size, fp); + //printf("11 numbytes %d, size %d\n", numbytes, size); + if (send(sockfd, buf, numbytes, 0) == -1) { + perror("send"); + } + } else { + bzero(buf, sizeof(buf)); + numbytes = fread(buf, sizeof(char), DEFAULT_TCP_BLKSIZE, fp); + //printf("22 numbytes %d, size %d\n", numbytes, size); + size -= numbytes; + //printf("33 numbytes %d, size %d\n", numbytes, size); + if (send(sockfd, buf, numbytes, 0) == -1) { + perror("send"); + } + } + + } + fclose(fp); + close(sockfd); +} + +void TcpSocket::sendMessage(string ip, string port, string message) +{ + int sockfd; + struct addrinfo hints, *servinfo, *p; + int rv; + char s[INET6_ADDRSTRLEN]; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return; + } + + // loop through all the results and connect to the first we can + for (p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("client: socket"); + continue; + } + + if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + close(sockfd); + perror("client: connect"); + continue; + } + + break; + } + + if (p == NULL) { + fprintf(stderr, "client: failed to connect\n"); + return; + } + + inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), + s, sizeof s); + //printf("client: connecting to %s\n", s); + + freeaddrinfo(servinfo); // all done with this structure + + if (send(sockfd, message.c_str(), strlen(message.c_str()), 0) == -1) { + perror("send"); + } + + close(sockfd); +} + +// copy from Node.cpp +vector<string> TcpSocket::splitString(string s, string delimiter){ + vector<string> result; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + result.push_back (token); + } + + result.push_back (s.substr (pos_start)); + return result; +} + +// void *runTcpServer(void *tcpSocket) +// { +// TcpSocket* tcp; +// tcp = (TcpSocket*) tcpSocket; +// tcp->bindServer(TCPPORT); +// pthread_exit(NULL); +// } + +// void *runTcpClient(void *tcpSocket) +// { +// TcpSocket* tcp; +// tcp = (TcpSocket*) tcpSocket; +// sleep(1); +// // testing election +// Messages msg(ELECTION, "my_id"); +// tcp->sendMessage("127.0.0.1", TCPPORT, msg.toString()); + +// // testing to send file +// for (int i=0; i<2; i++) { +// sleep(1); +// tcp->sendFile("127.0.0.1", TCPPORT, "file_example_MP3_700KB.mp3"); +// } +// pthread_exit(NULL); +// } + +/*int main(int argc, char *argv[]) +{ + TcpSocket *tcpSocket = new TcpSocket(); + + pthread_t threads[2]; + + int rc; + + if ((rc = pthread_create(&threads[0], NULL, runTcpServer, (void *)tcpSocket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + if ((rc = pthread_create(&threads[1], NULL, runTcpClient, (void *)tcpSocket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + pthread_exit(NULL); + + return 0; +}*/ \ No newline at end of file diff --git a/src/Threads.cpp b/src/Threads.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c6d0196d4ff51c0a2d53c21c3933e60b4eff3580 --- /dev/null +++ b/src/Threads.cpp @@ -0,0 +1,154 @@ +#include "../inc/Node.h" + +/** + * + * runUdpServer: Enqueue each heartbeat it receives + * + **/ +void *runUdpServer(void *udpSocket) +{ + // acquire UDP object + UdpSocket* udp; + udp = (UdpSocket*) udpSocket; + udp->bindServer(PORT); + pthread_exit(NULL); +} + +/** + * + * runTcpServer: Enqueue each request it receives + * + **/ +void *runTcpServer(void *tcpSocket) +{ + TcpSocket* tcp; + tcp = (TcpSocket*) tcpSocket; + tcp->bindServer(TCPPORT); + pthread_exit(NULL); +} + +vector<string> splitString(string s, string delimiter){ + vector<string> result; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + result.push_back (token); + } + + result.push_back (s.substr (pos_start)); + return result; +} + +void *runTcpSender(void *tcpSocket) +{ + TcpSocket* tcp; + tcp = (TcpSocket*) tcpSocket; + while (1) { + while (!tcp->pendSendMessages.empty()) { + vector<string> msgSplit = splitString(tcp->pendSendMessages.front(), "::"); + if (msgSplit.size() >= 4) { + string nodeIP = msgSplit[0]; + string localfilename = msgSplit[1]; + string sdfsfilename = msgSplit[2]; + string remoteLocalfilename = msgSplit[3]; + cout << "[DOSEND] nodeIP " << nodeIP << ", localfilename " << localfilename; + cout << ", sdfsfilename " << sdfsfilename << ", remoteLocalfilename " << remoteLocalfilename << endl; + tcp->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); + } + tcp->pendSendMessages.pop(); + } + } + pthread_exit(NULL); +} + +void testMessages(UdpSocket* udp) +{ + sleep(2); + for (int j = 0; j < 4; j++) { + udp->sendMessage("127.0.0.1", PORT, "test message "+to_string(j)); + } + sleep(1); +} + +/** + * + * runSenderThread: + * 1. handle messages in queue + * 2. merge membership list + * 3. prepare to send heartbeating + * 4. do gossiping + * + **/ +void *runSenderThread(void *node) +{ + // acquire node object + Node *nodeOwn = (Node *) node; + + nodeOwn->activeRunning = true; + + // step: joining to the group -> just heartbeating to introducer + Member introducer(INTRODUCER, PORT); + nodeOwn->joinSystem(introducer); + + while (nodeOwn->activeRunning) { + + // 1. deepcopy and handle queue, and + // 2. merge membership list + nodeOwn->listenToHeartbeats(); + + // Volunteerily leave + if(nodeOwn->activeRunning == false){ + pthread_exit(NULL); + } + + //add failure detection in between listening and sending out heartbeats + nodeOwn->failureDetection(); + + // keep heartbeating + nodeOwn->localTimestamp++; + nodeOwn->heartbeatCounter++; + nodeOwn->updateNodeHeartbeatAndTime(); + + // 3. prepare to send heartbeating, and + // 4. do gossiping + nodeOwn->heartbeatToNode(); + + // 5. check for regular TCP messages + nodeOwn->processRegMessages(); + + // 6. check leader (If hashRing is sent via heartbeat, then we have a leader) + if (!nodeOwn->checkLeaderExist()) { // If no leader + nodeOwn->processTcpMessages(); + if (nodeOwn->findWillBeLeader()) { + //cout << "Try to propose to be leader" << endl; + if (nodeOwn->localTimestamp-nodeOwn->electedTime > T_election) { // when entering to stable state + if (nodeOwn->localTimestamp-nodeOwn->proposedTime > T_election) { + nodeOwn->proposeToBeLeader(); + nodeOwn->proposedTime = nodeOwn->localTimestamp; + } + } + } + } + + // for debugging + //nodeOwn->debugMembershipList(); + time_t endTimestamp; + time(&endTimestamp); + double diff = difftime(endTimestamp, nodeOwn->startTimestamp); + nodeOwn->computeAndPrintBW(diff); +#ifdef LOG_VERBOSE + cout << endl; +#endif + if (nodeOwn->prepareToSwitch) { + cout << "[SWITCH] I am going to swtich my mode in " << T_switch << "s" << endl; + nodeOwn->SwitchMyMode(); + } else { + usleep(T_period); + } + } + + pthread_exit(NULL); +} \ No newline at end of file diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5d17e06e0fae9230e1c32892747b8c1da00a255c --- /dev/null +++ b/src/UdpSocket.cpp @@ -0,0 +1,183 @@ +#include "../inc/UdpSocket.h" + +UdpSocket::UdpSocket(){ + byteSent = 0; + byteReceived = 0; +} + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +void UdpSocket::bindServer(string port) +{ + int sockfd; + struct addrinfo hints, *servinfo, *p; + int rv; + int numbytes; + struct sockaddr_storage their_addr; + char buf[MAXBUFLEN]; + socklen_t addr_len; + //char s[INET6_ADDRSTRLEN]; + + memset(&hints, 0, sizeof hints); + + hints.ai_family = AF_UNSPEC; // set to AF_INET to force IPv4 + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_PASSIVE; // use my IP + + if ((rv = getaddrinfo(NULL, port.c_str(), &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return; + } + + // loop through all the results and bind to the first we can + for(p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("bindServer: socket"); + continue; + } + + if (bind(sockfd, p->ai_addr, p->ai_addrlen) < 0) { + close(sockfd); + perror("bindServer: bind"); + continue; + } + break; + } + + if (p == NULL) { + fprintf(stderr, "bindServer: failed to bind socket\n"); + return; + } + + freeaddrinfo(servinfo); + + //cout << "bindServer: waiting to recvfrom... " << endl; + + addr_len = sizeof(their_addr); + bzero(buf, sizeof(buf)); + while ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN-1 , 0, + (struct sockaddr *)&their_addr, &addr_len)) > 0) { + this->byteReceived += numbytes; + //cout << "bindServer: from " << inet_ntop(their_addr.ss_family, + // get_in_addr((struct sockaddr *)&their_addr), + // s, sizeof(s)); + //cout << "bindServer: packet is " << numbytes << " bytes long" << endl; + buf[numbytes] = '\0'; + //cout << ": " << buf << endl; + + // put into queue + qMessages.push(buf); + + bzero(buf, sizeof(buf)); + } + + close(sockfd); +} + +void UdpSocket::sendMessage(string ip, string port, string message) +{ + int sockfd; + struct addrinfo hints, *servinfo, *p; + int rv; + int numbytes; + int lucky_number; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return; + } + + // loop through all the results and make a socket + for(p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("sendMessage: socket"); + continue; + } + break; + } + + if (p == NULL) { + fprintf(stderr, "sendMessage: failed to bind socket\n"); + return; + } + + // Simulate package loss + srand(time(NULL)); + lucky_number = rand() % 100 + 1; + + this->byteSent += strlen(message.c_str()); + if(lucky_number > LOSS_RATE){ + numbytes = sendto(sockfd, message.c_str(), strlen(message.c_str()), 0, p->ai_addr, p->ai_addrlen); + } + + if (numbytes == -1) { + perror("sendMessage: sendto"); + exit(1); + } + + freeaddrinfo(servinfo); + + //cout << "sendMessage: sent " << numbytes << " bytes to " << ip << endl; + close(sockfd); +} + +// void *runServer(void *udpSocket) +// { +// UdpSocket* udp; +// udp = (UdpSocket*) udpSocket; +// udp->bindServer("4950"); +// pthread_exit(NULL); +// } + +// void *runClient(void *udpSocket) +// { +// UdpSocket* udp; +// udp = (UdpSocket*) udpSocket; +// for (int i = 0; i < 3; i++) { +// sleep(2); +// udp->sendMessage("127.0.0.1", "4950", "test message"); +// } +// pthread_exit(NULL); +// } + +/*int main(int argc, char *argv[]) +{ + UdpSocket *udpSocket = new UdpSocket(); + + if (strcmp(argv[1], "client") == 0) { + udpSocket.send_message("127.0.0.1", "4950", "test message"); + } else { + udpSocket.bind_server("4950"); + } + + pthread_t threads[NUM_THREADS]; + + int rc; + + if ((rc = pthread_create(&threads[0], NULL, runServer, (void *)udpSocket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + if ((rc = pthread_create(&threads[1], NULL, runClient, (void *)udpSocket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + pthread_exit(NULL); + + return 0; +}*/ \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3e20e044c054c5df62f12d8b09fdb0aefb2653a1 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,220 @@ +#include "../inc/Node.h" + +int main(int argc, char *argv[]) +{ + pthread_t threads[4]; + int rc; + Node *node; + + cout << "Mode: " << ALL2ALL << "->All-to-All, "; + cout << GOSSIP << "->Gossip-style" << endl; + if (argc < 2) { + node = new Node(); + } else { + ModeType mode = ALL2ALL; + if (atoi(argv[1]) == 1) { + mode = GOSSIP; + } + node = new Node(mode); + } + cout << "Running mode: " << node->runningMode << endl; + cout << endl; + + char host[100] = {0}; + struct hostent *hp; + + if (gethostname(host, sizeof(host)) < 0) { + cout << "error: gethostname" << endl; + return 0; + } + + if ((hp = gethostbyname(host)) == NULL) { + cout << "error: gethostbyname" << endl; + return 0; + } + + if (hp->h_addr_list[0] == NULL) { + cout << "error: no ip" << endl; + return 0; + } + //cout << "hostname " << hp->h_name << endl; + Member own(inet_ntoa(*(struct in_addr*)hp->h_addr_list[0]), PORT, + node->localTimestamp, node->heartbeatCounter); + node->nodeInformation = own; + cout << "[NEW] Starting Node at " << node->nodeInformation.ip << "/"; + cout << node->nodeInformation.port << "..." << endl; + + int *ret; + string s; + string cmd; + bool joined = false; + + // listening server can run first regardless of running time commands + if ((rc = pthread_create(&threads[0], NULL, runUdpServer, (void *)node->udpServent)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + if ((rc = pthread_create(&threads[2], NULL, runTcpServer, (void *)node->tcpServent)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + if ((rc = pthread_create(&threads[4], NULL, runTcpSender, (void *)node->tcpServent)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + node->localFilelist.clear(); // for testing + /*node->localFilelist["sdfsfilename1"] = "localfilename1"; + node->localFilelist["sdfsfilename2"] = "localfilename2";*/ + + while(1){ + string s; + getline (cin, s); + vector<string> cmdLineInput; + string delimiter = " "; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + cmdLineInput.push_back (token); + } + cmdLineInput.push_back (s.substr (pos_start)); + cmd = cmdLineInput[0]; + // Deal with multiple cmd input + if(cmd == "join"){ + node->startActive(); + if ((rc = pthread_create(&threads[1], NULL, runSenderThread, (void *)node)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + joined = true; + } else if(cmd == "leave" && joined){ + node->activeRunning = false; + node->membershipList.clear(); + node->restartElection(); // clean up leader info + pthread_join(threads[1], (void **)&ret); + + string message = "["+to_string(node->localTimestamp)+"] node "+node->nodeInformation.ip+"/"+node->nodeInformation.port+" is left"; + cout << "[LEAVE]" << message.c_str() << endl; + node->logWriter->printTheLog(LEAVE, message); + sleep(2); // wait for logging + + joined = false; + } else if(cmd == "id"){ + cout << "ID: (" << node->nodeInformation.ip << ", " << node->nodeInformation.port << ")" << endl; + } else if(cmd == "member"){ + node->debugMembershipList(); + } else if(cmd == "switch") { + if(joined){ + node->requestSwitchingMode(); + } + } else if(cmd == "mode") { + cout << "In " << node->runningMode << " mode" << endl; + } else if(cmd == "exit"){ + cout << "exiting..." << endl; + break; + } else if (cmd == "put" && joined){ // MP2 op1 + if(cmdLineInput.size() < 3){ + cout << "USAGE: put filename sdfsfilename" << endl; + continue; + } + if (!node->isBlackout) { + string localfilename = cmdLineInput[1]; + string sdfsfilename = cmdLineInput[2]; + Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); + cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; + if (access(localfilename.c_str(), F_OK) != -1) { + node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); + } else { + cout << "[PUT] The file " << localfilename << " is not existed" << endl; + } + + } else { + cout << "[BLACKOUT] Leader cannot accept the request" << endl; + } + } else if (cmd == "get" && joined){ // MP2 op2 + if(cmdLineInput.size() < 3){ + cout << "USAGE: get sdfsfilename filename" << endl; + continue; + } + string sdfsfilename = cmdLineInput[1]; + string localfilename = cmdLineInput[2]; + if (node->localFilelist.find(sdfsfilename) != node->localFilelist.end()) { + // found + cout << "[GET] You have sdfsfilename " << sdfsfilename << " as " << node->localFilelist[sdfsfilename] << endl; + continue; + } + if (node->fileList.find(sdfsfilename) == node->fileList.end()) { + // not found + cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl; + continue; + } else { + if (node->fileList[sdfsfilename].size()==1 && node->isBlackout) { + // in 1st pass, the Leader is backup, wait for a minute + cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl; + continue; + } + } + + Messages outMsg(DNSGET, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); + cout << "[GET] Got sdfsfilename: " << sdfsfilename << " with localfilename: " << localfilename << endl; + node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); + } else if (cmd == "delete" && joined){ // MP2 op3 + if(cmdLineInput.size() < 2){ + cout << "USAGE: delete sdfsfilename" << endl; + continue; + } + if (!node->isBlackout) { + string sdfsfilename = cmdLineInput[1]; + Messages outMsg(DELETE, node->nodeInformation.ip + "::" + sdfsfilename); + cout << "[DELETE] Got sdfsfilename: " << sdfsfilename << endl; + node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); + } else { + cout << "[BLACKOUT] Leader cannot accept the request" << endl; + } + } else if (cmd == "ls" && joined){ // MP2 op4 + if(cmdLineInput.size() < 2){ + cout << "USAGE: ls sdfsfilename" << endl; + continue; + } + if (!node->isBlackout) { + string sdfsfilename = cmdLineInput[1]; + node->listSDFSFileList(sdfsfilename); + } else { + cout << "[BLACKOUT] Leader cannot accept the request" << endl; + } + } else if (cmd == "store"){ // MP2 op5 + node->listLocalFiles(); + } else if (cmd == "lsall"){ + node->debugSDFSFileList(); + } else { + cout << "[join] join to a group via fixed introducer" << endl; + cout << "[leave] leave the group" << endl; + cout << "[id] print id (IP/PORT)" << endl; + cout << "[member] print all membership list" << endl; + cout << "[switch] switch to other mode (All-to-All to Gossip, and vice versa)" << endl; + cout << "[mode] show in 0/1 [All-to-All/Gossip] modes" << endl; + cout << "[exit] terminate process" << endl; + cout << " === New since MP2 === " << endl; + cout << "[put] localfilename sdfsfilename" << endl; + cout << "[get] sdfsfilename localfilename" << endl; + cout << "[delete] sdfsfilename" << endl; + cout << "[ls] list all machine (VM) addresses where this file is currently being stored" << endl; + cout << "[lsall] list all sdfsfilenames with positions" << endl; + cout << "[store] list all files currently being stored at this machine" << endl << endl; + } // More command line interface if wanted + } + + pthread_kill(threads[0], SIGUSR1); + pthread_kill(threads[4], SIGUSR1); + if(joined){ + pthread_kill(threads[1], SIGUSR1); + } + + pthread_exit(NULL); + + return 1; +} \ No newline at end of file