From f8e23a0e03bd7917f3d26123824db4c760f8f4ad Mon Sep 17 00:00:00 2001 From: afederici <ajf5@illinois.edu> Date: Mon, 16 Nov 2020 19:17:10 -0600 Subject: [PATCH] edited version --- .gitignore | 3 + Makefile | 4 +- inc/FileObject.h | 6 +- inc/HashRing.h | 22 +- inc/Introducer.h | 30 --- inc/Logger.h | 6 +- inc/MessageTypes.h | 64 ++--- inc/Messages.h | 5 +- inc/Node.h | 121 +++++----- inc/TcpSocket.h | 53 ++-- inc/UdpSocket.h | 17 +- inc/Utils.h | 39 +++ src/FileObject.cpp | 2 +- src/HashRing.cpp | 88 ++----- src/Introducer.cpp | 38 --- src/Logger.cpp | 19 +- src/Messages.cpp | 6 - src/Node.cpp | 524 ++++++++++------------------------------ src/RandomGenerator.cpp | 69 ------ src/TcpSocket.cpp | 387 +++++++++-------------------- src/TestBench.cpp | 164 +++++++++++++ src/Threads.cpp | 89 ++----- src/UdpSocket.cpp | 71 +----- src/Utils.cpp | 48 ++++ src/main.cpp | 70 ++---- 25 files changed, 694 insertions(+), 1251 deletions(-) delete mode 100644 inc/Introducer.h create mode 100644 inc/Utils.h delete mode 100644 src/Introducer.cpp delete mode 100644 src/RandomGenerator.cpp create mode 100644 src/TestBench.cpp create mode 100644 src/Utils.cpp diff --git a/.gitignore b/.gitignore index 259148f..ba59743 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ *.exe *.out *.app + +Node +Node.dsym diff --git a/Makefile b/Makefile index dce2206..08ff933 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ APP := Node INC_DIR := ./inc CFLAGS := -g -Wall -std=c++11 -I$(INC_DIR) LIBS := -lpthread -SRC_FILES := src/Node.cpp src/Messages.cpp src/Member.cpp src/UdpSocket.cpp src/Threads.cpp src/RandomGenerator.cpp src/Logger.cpp src/TcpSocket.cpp src/main.cpp src/HashRing.cpp src/FileObject.cpp +SRC_FILES := src/Node.cpp src/Messages.cpp src/Member.cpp src/UdpSocket.cpp src/Threads.cpp src/Utils.cpp src/Logger.cpp src/TcpSocket.cpp src/TestBench.cpp src/main.cpp src/HashRing.cpp src/FileObject.cpp .PHONY: clean @@ -12,4 +12,4 @@ app: $(CXX) -o $(APP) $(SRC_FILES) $(CFLAGS) $(LIBS) clean: - $(RM) -f $(APP) *.o + $(RM) -f $(APP) *.o diff --git a/inc/FileObject.h b/inc/FileObject.h index 7bbbdd4..9267f77 100644 --- a/inc/FileObject.h +++ b/inc/FileObject.h @@ -1,7 +1,7 @@ #ifndef FILEOBJECT_H #define FILEOBJECT_H -#include <iostream> +#include <iostream> #include <string> #include <stdio.h> #include <stdlib.h> @@ -18,8 +18,8 @@ public: int positionOnHashring; FileObject(string fileName); string toString(); - string getChecksum(); + string getChecksum(); //hash the file contents using iterator over whole file int getPositionOnHashring(); }; -#endif //FILEOBJECT_H \ No newline at end of file +#endif //FILEOBJECT_H diff --git a/inc/HashRing.h b/inc/HashRing.h index 7e0ff96..fe49e83 100644 --- a/inc/HashRing.h +++ b/inc/HashRing.h @@ -1,39 +1,37 @@ #ifndef HASHRING_H #define HASHRING_H -#include <iostream> +#include <iostream> #include <string> #include <stdio.h> #include <stdlib.h> #include <map> #include <vector> #include <algorithm> +#include "Member.h" using namespace std; #define HASHMODULO 360 +int hashingId(Member nodeMember, string joinTime); + +//Class used for ring leader election algorithm as well as file replication by using successors class HashRing { public: - vector<int> nodePositions; //allow for quick finding of node positions on the ring. + vector<int> nodePositions; //allow for quick finding of node positions on the ring. map<int, string> ring; - map<int, int> fileToClosestNode; // match up each file to its closest node -> may not need to use this, since we have the locateClosestNode function which maps file position - //to node position on hash ring. - + HashRing(); - vector<string> splitString(string s, string delimiter); int locateClosestNode(string filename); - string getValue(int key); + string getValue(int key); //lookup using ring map int addNode(string nodeInfo, int position); // Here we may have to change where other files point to - int addFile(string fileInfo, int position); int removeNode(int position); // Here we will have to change where all files that point to this node point to - int removeFile(int position); - int getSuccessor(int nodePosition); //Get position of successor node + int getSuccessor(int nodePosition); //Get position of successor node int getPredecessor(int nodePosition); //Get position of predecessor node void debugHashRing(); void clear(); int getRandomNode(tuple<int, int, int> excludedNodes); - }; -#endif //HASHRING_H \ No newline at end of file +#endif //HASHRING_H diff --git a/inc/Introducer.h b/inc/Introducer.h deleted file mode 100644 index 06ce118..0000000 --- a/inc/Introducer.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef INTRODUCER_H -#define INTRODUCER_H - -#include <iostream> -#include <string> -#include <vector> -#include <map> -#include "Messages.h" -#include "Member.h" - -using namespace std; - -#define T_timeout 5 -#define T_cleanup 5 - -#define INTRODUCER "172.22.94.78" -#define PORT "6000" - -class Introducer { -public: - map<pair<string, string>, pair<int, int>> membershipList; - Member nodeInformation; - - Introducer(); - int heartbeatToNode(Member destination); - int joinSystem(Member introdcuer); - int listenForHeartbeat(); -}; - -#endif //INTRODUCER_H \ No newline at end of file diff --git a/inc/Logger.h b/inc/Logger.h index e6d189a..4f4cb8b 100644 --- a/inc/Logger.h +++ b/inc/Logger.h @@ -1,7 +1,7 @@ #ifndef LOGGER_H #define LOGGER_H -#include <iostream> +#include <iostream> #include <string> #include <fstream> #include <stdio.h> @@ -15,13 +15,11 @@ using namespace std; class Logger{ public: string filename; - //ofstream loggingFile; Logger(); Logger(string fileName); int printTheLog(LogType type, string s); - //int closeFile(); private: int writeToFile(string messages); }; -#endif //LOGGER_H \ No newline at end of file +#endif //LOGGER_H diff --git a/inc/MessageTypes.h b/inc/MessageTypes.h index 4410644..2697d2e 100644 --- a/inc/MessageTypes.h +++ b/inc/MessageTypes.h @@ -3,42 +3,44 @@ enum MessageType { ACK, - JOIN, - LEADERHEARTBEAT, - LEADERPENDING, - HEARTBEAT, - SWREQ, - SWRESP, - JOINRESPONSE, - JOINREJECT, - ELECTION, - ELECTIONACK, - PUT, - PUTACK, + JOIN, //used in joinSystem to join + LEADERHEARTBEAT, //send with heartbeat that this node is the leader + LEADERPENDING, //send instead of heartbeat if its currently a blackout + HEARTBEAT, //non-leader heartbeat + SWREQ, //tell other nodes to switch mode + SWRESP, //ack the switch request + JOINRESPONSE, //introducer sends memberhsip list + JOINREJECT, //if there is a hash collision introducer rejects the join (try new port?) + ELECTION, //used to ELECT self as leader + ELECTIONACK, //after this message makes it back to proposed leader election is over + PUT, //metadata followed by the actual data, add PUTACK into msg queue + PUTACK, //add file to localFileList and send an ACK to sender of PUT LEADERACK, - DNS, - DNSANS, - DNSGET, - DELETE, - GETNULL, + DNS, //sent to leader on PUT request, update pending requests and send out DNSANS + DNSANS, //PUT file to whatever node got sent by DNS + DNSGET, //sent to leader on GET request, sends out REPLICATEGET to selected node with file + DELETE, //leader handles deletion and disseminates info + GETNULL, //sent if user tries to get a file from leader and it has 0 copies in the system REREPLICATE, - REREPLICATEGET}; + REREPLICATEGET +}; enum PayloadType { - REGULAR=97, - FILEPAIR, - FILENAME, - FILEPOSITIONS}; + REGULAR=97, //start of actual message (membershipList) + FILEPAIR, //start of filelist + FILENAME, //start of filename + FILEPOSITIONS //start of comma seperated file positions string +}; enum LogType { - JOINGROUP, - UPDATE, - FAIL, - LEAVE, - REMOVE, - GOSSIPTO, - GOSSIPFROM, - BANDWIDTH, + JOINGROUP, + UPDATE, + FAIL, + LEAVE, + REMOVE, + GOSSIPTO, + GOSSIPFROM, + BANDWIDTH, MEMBERS}; -#endif //MESSAGESTYPES_H \ No newline at end of file +#endif //MESSAGESTYPES_H diff --git a/inc/Messages.h b/inc/Messages.h index ee1ddb6..6f2c310 100644 --- a/inc/Messages.h +++ b/inc/Messages.h @@ -1,7 +1,7 @@ #ifndef MESSAGES_H #define MESSAGES_H -#include <iostream> +#include <iostream> #include <string> #include "MessageTypes.h" @@ -11,10 +11,9 @@ class Messages { public: MessageType type; string payload; - Messages(string payloadMessage); Messages(MessageType messageType, string payloadMessage); string toString(); }; -#endif //MESSAGES_H \ No newline at end of file +#endif //MESSAGES_H diff --git a/inc/Node.h b/inc/Node.h index 63931fa..3d359d6 100644 --- a/inc/Node.h +++ b/inc/Node.h @@ -1,7 +1,7 @@ #ifndef NODE_H #define NODE_H -#include <iostream> +#include <iostream> #include <string> #include <vector> #include <map> @@ -16,15 +16,14 @@ #include "TcpSocket.h" #include "Logger.h" #include "HashRing.h" +#include "Utils.h" +#include "HashRing.h" using namespace std; -#define INTRODUCER "172.22.94.78" // VM1 -//#define INTRODUCER "172.22.158.81" // VM9 +#define INTRODUCER "fa20-cs425-g02-01.cs.illinois.edu" // VM1 #define PORT "6000" -//#define LOG_VERBOSE 1 - #define LOGGING_FILE_NAME "logs.txt" // --- parameters (stay tuned) --- @@ -32,11 +31,9 @@ using namespace std; #define T_timeout 15 // in T_period #define T_cleanup 15 // in T_period #define N_b 5 // how many nodes GOSSIP want to use - #define T_election 15 // in T_period -// ------ - #define T_switch 3 // in seconds +// void *runUdpServer(void *udpSocket); void *runTcpServer(void *tcpSocket); @@ -54,83 +51,73 @@ public: int heartbeatCounter; time_t startTimestamp; string joinTimestamp; - // unsigned long byteSent; - // unsigned long byteReceived; - ModeType runningMode; Logger* logWriter; bool activeRunning; bool prepareToSwitch; bool isLeader; - bool isBlackout; + bool isBlackout; //true while DNS being handled int leaderPosition; // -1 for no leader int hashRingPosition; - int proposedTime; - int electedTime; - string possibleSuccessorIP; + int proposedTime; //time when proposed to be leader with first ELECTION msg + int electedTime; //keeps track of time of last ELECTIONACK + string possibleSuccessorIP; //successor in hashRing string leaderIP; string leaderPort; HashRing *hashRing; map<string, vector<int>> fileList; //vector of node positions on a hashring for a given file where we can find that file stored map<string, string> localFilelist; // Map sdfsfilename -> localfilename - map<string, tuple<int, int, int>> pendingRequests; - map<string, tuple<string, string, string>> pendingSenderRequests; - map<string, tuple<bool, bool, bool>> pendingRequestSent; + map<string, tuple<int, int, int>> pendingRequests; //? + map<string, tuple<string, string, string>> pendingSenderRequests; //? + map<string, tuple<bool, bool, bool>> pendingRequestSent; //? + - Node(); Node(ModeType mode); - int getPositionOnHashring(); - int heartbeatToNode(); - int joinSystem(Member introdcuer); - int listenToHeartbeats(); - int failureDetection(); - void updateNodeHeartbeatAndTime(); - void computeAndPrintBW(double diff); - int requestSwitchingMode(); - int SwitchMyMode(); - void debugMembershipList(); - void startActive(); - // Added below Since MP2 - bool checkHashNodeCollision(int checkPosition); - bool checkLeaderExist(); - bool findWillBeLeader(); - void proposeToBeLeader(); - void processElection(Messages messages); - void processTcpMessages(); - void processRegMessages(); - void restartElection(); - void setUpLeader(string message, bool pending); - string encapsulateFileList(); - void decapsulateFileList(string payload); - string encapsulateMessage(map<PayloadType,string> payloads); - string decapsulateMessage(string payload); - bool isInVector(vector<int> v, int i); - int updateHashRing(); - // leader related functions here - void leaderCreateHashRing(); - int putFileSender(string filename, string sdfsfilename); - int putFileMaster(string sdfsfilename); - int putFileReeiver(string sdfsfilename); - void listLocalFiles(); - void findNodesWithFile(string sdfsfilename); - void debugSDFSFileList(); - void listSDFSFileList(string sdfsfilename); - string populateSDFSFileList(MessageType type, string mem_list_to_send); - void updateFileList(string sdfsfilename, int nodePosition); - void checkFileListConsistency(); - + int heartbeatToNode(); //send out memList to targets + int joinSystem(Member introdcuer); //send memList with JOIN message to introducer + int listenToHeartbeats(); //process UDP message queue + int failureDetection(); //check for failures and update pendingRequests if sender failed. Also check for leader failure + string updateNodeHeartbeatAndTime(); //update this nodes heartbeat and time in membershipList + int requestSwitchingMode(); //send out SWREQ to switch modes + int SwitchMyMode(); //actually switch running mode and print debug messages + void startActive(); //initialize, get ready to join system + bool checkHashNodeCollision(int checkPosition); //check if any member from memberlist hashes to checkPosition + + void tcpElectionProcessor(); //process TCP queue for election messages + void handleTcpMessage(); //process TCP queue for all other messages + string encapsulateFileList(); //LEADER ONLY: encode fileList into byte format + void decapsulateFileList(string payload); //update file positions and remove files that have 0 copies + string encapsulateMessage(map<PayloadType,string> payloads); //Combines membership list heartbeating with fileList info (master sends this only) + string decapsulateMessage(string payload); //handle file list and return regular part of the message + void listLocalFiles(); //iterate over localFileList + void listSDFSFileList(string sdfsfilename); //print all locations of sdfsfilename from fileList + string populateSDFSFileList(MessageType type, string mem_list_to_send); //piggyback fileList to membershipList if coming from master + void updateFileList(string sdfsfilename, int nodePosition); //LEADER ONLY: add new position for file in fileList + void checkFileListConsistency(); //LEADER ONLY: have leader decide where to replicate files with not enough copies + + bool checkLeaderExist(); //return if leader exists + bool findWillBeLeader(); //min on hashring should be leader, also update node's successor while checking the ring + void leaderCreateHashRing(); //leader creates new hashring from its membership list + void proposeToBeLeader(); //TCP ELECTION msg to successor + void electionMessageHandler(Messages messages); //process TCP ELECTION messages using ring leader election algo + void restartElection(); //reset leader info + void setUpLeader(string message, bool pending); //setup leader private: - string populateMembershipMessage(); - string populateIntroducerMembershipMessage(); - void readMessage(string message); - void processHeartbeat(string message); - vector<string> splitString(string s, string delimiter); - vector<tuple<string,string, string>> getRandomNodesToGossipTo(); - int hashingId(Member nodeMember, string joinTime); + string populateMembershipMessage(); //membershipList to string based on mode type + string populateIntroducerMembershipMessage(); //entire membership list to string + void handleUdpMessage(string message); //handle each UDP message + int getPositionOnHashring(); //returns hashring position + int updateHashRing(); //go through membershipList and add nodes to hashRing if not already there + void processHeartbeat(string message); //update local membership list from a heartbeat message + vector<tuple<string,string, string>> getRandomNodesToGossipTo(); //pick targets for multicast }; -#endif //NODE_H \ No newline at end of file +void computeAndPrintBW(Node * n, double diff); //print bandwidth +void debugSDFSFileList(Node * n); //debug function +void debugMembershipList(Node * n); //debug function + +#endif //NODE_H diff --git a/inc/TcpSocket.h b/inc/TcpSocket.h index 958e9f6..0634131 100644 --- a/inc/TcpSocket.h +++ b/inc/TcpSocket.h @@ -1,10 +1,15 @@ #ifndef TCPSOCKET_H #define TCPSOCKET_H -#include <iostream> +#include <sys/wait.h> +#include <signal.h> + +#include <iostream> #include <string> +#include <map> +#include <utility> #include <queue> - +#include <string> #include <stdio.h> #include <stdlib.h> #include <unistd.h> @@ -13,36 +18,46 @@ #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> -#include <netdb.h> #include <arpa/inet.h> -#include <sys/wait.h> -#include <signal.h> - -// #include "Messages.h" -// #include "UdpSocket.h" - -using namespace std; +#include <netdb.h> +#include <ctype.h> +#include <stdbool.h> +#include <unistd.h> +#include <errno.h> +#include "MessageTypes.h" +#include "Messages.h" +#include "Utils.h" +#ifdef __linux__ +#include <bits/stdc++.h> +#endif + +using std::string; +using std::queue; +using std::to_string; +using std::map; +using std::make_tuple; +using std::get; #define DEFAULT_TCP_BLKSIZE (128 * 1024) #define BACKLOG 10 - #define TCPPORT "4950" -// void *runTcpServer(void *tcpSocket); -// void *runTcpClient(void *tcpSocket); - class TcpSocket { public: - queue<string> qMessages; - queue<string> regMessages; - queue<string> pendSendMessages; + //tcp server directly handles PUTs. If put received, request from + //receiving server for the data and store it, ends with adding PUTACK to + //regMessages queue.. Other requests also put into one of the queueus. + queue<string> qMessages; //election messages added to this queue + queue<string> regMessages;//other messages added here + queue<string> pendSendMessages;//keeps messages for the tcp client to send void bindServer(string port); void sendFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename); void sendMessage(string ip, string port, string message); + int messageHandler(int sockfd, string payloadMessage, string returnID); + int createConnection(string ip, string port); TcpSocket(); private: string getFileMetadata(int size, string checksum, string sdfsfilename, string localfilename, string remoteLocalfilename); - vector<string> splitString(string s, string delimiter); }; -#endif //TCPSOCKET_H \ No newline at end of file +#endif //TCPSOCKET_H diff --git a/inc/UdpSocket.h b/inc/UdpSocket.h index 17402e0..bae3bf0 100644 --- a/inc/UdpSocket.h +++ b/inc/UdpSocket.h @@ -1,10 +1,9 @@ #ifndef UDPSOCKET_H #define UDPSOCKET_H - -#include <iostream> +#include <iostream> #include <string> #include <queue> - +#include <utility> #include <stdio.h> #include <stdlib.h> #include <unistd.h> @@ -15,23 +14,29 @@ #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> +#ifdef __linux__ +#include <bits/stdc++.h> +#endif -using namespace std; +using std::string; +using std::queue; +using std::get; #define MAXBUFLEN (128 * 1024) #define LOSS_RATE 0 void *get_in_addr(struct sockaddr *sa); +// Add every message to a queue on receive, process this in the UDP thread class UdpSocket { public: unsigned long byteSent; unsigned long byteReceived; queue<string> qMessages; - + void bindServer(string port); void sendMessage(string ip, string port, string message); UdpSocket(); }; -#endif //UDPSOCKET_H \ No newline at end of file +#endif //UDPSOCKET_H diff --git a/inc/Utils.h b/inc/Utils.h new file mode 100644 index 0000000..26d1eaf --- /dev/null +++ b/inc/Utils.h @@ -0,0 +1,39 @@ +#ifndef UTILS_H +#define UTILS_H +#include <iostream> +#include <string> +#include <utility> +#include <vector> +#include <netdb.h> +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#ifdef __linux__ +#include <bits/stdc++.h> +#endif + +using std::string; +using std::vector; +using std::get; +using std::tuple_element; + +static pthread_mutex_t thread_counter_lock = PTHREAD_MUTEX_INITIALIZER; +static int thread_counter = 0; + +vector<string> splitString(string s, string delimiter); +string getIP(); +string getIP(const char * host); +int new_thread_id(); +bool isInVector(vector<int> v, int i); +//adapted from https://stackoverflow.com/questions/23030267/custom-sorting-a-vector-of-tuples +template<int M, template<typename> class F = std::less> +struct TupleCompare +{ + template<typename T> + bool operator()(T const &t1, T const &t2) + { + return F<typename tuple_element<M, T>::type>()(std::get<M>(t1), std::get<M>(t2)); + } +}; + +#endif //UDPSOCKET_H diff --git a/src/FileObject.cpp b/src/FileObject.cpp index 1097041..1f29665 100644 --- a/src/FileObject.cpp +++ b/src/FileObject.cpp @@ -8,7 +8,7 @@ FileObject::FileObject(string fileName){ string FileObject::getChecksum(){ ifstream fileStream(fileName); string fileContent((istreambuf_iterator<char>(fileStream)), - (istreambuf_iterator<char>())); + (istreambuf_iterator<char>())); size_t hashResult = hash<string>{}(fileContent); return to_string(hashResult); } diff --git a/src/HashRing.cpp b/src/HashRing.cpp index ea42b6d..23e58f1 100644 --- a/src/HashRing.cpp +++ b/src/HashRing.cpp @@ -1,9 +1,6 @@ #include "../inc/HashRing.h" - -HashRing::HashRing(){ - -} +HashRing::HashRing(){} string HashRing::getValue(int key){ if(key == -1){ @@ -18,79 +15,31 @@ void HashRing::clear() nodePositions.clear(); } +int hashingId(Member nodeMember, string joinTime) +{ + string toBeHashed = "NODE::" + nodeMember.ip + "::" + nodeMember.port + "::" + joinTime; + int ringPosition = hash<string>{}(toBeHashed) % HASHMODULO; + return ringPosition; +} + void HashRing::debugHashRing() { cout << "Current Ring: " << endl; - for (auto& element: ring) { + for (auto& element: ring) { int position = element.first; string object = element.second; cout << object << " at " << position << endl; } } -vector<string> HashRing::splitString(string s, string delimiter){ - vector<string> result; - size_t pos_start = 0, pos_end, delim_len = delimiter.length(); - string token; - - while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { - token = s.substr (pos_start, pos_end - pos_start); - pos_start = pos_end + delim_len; - result.push_back (token); - } - - result.push_back (s.substr (pos_start)); - return result; -} - -int HashRing::addFile(string fileInfo, int position){ - ring[position] = fileInfo; - fileToClosestNode[position] = locateClosestNode(fileInfo); - //TODO: deal with hash collisions? - return 0; -} - int HashRing::addNode(string nodeInfo, int position){ ring[position] = nodeInfo; - nodePositions.push_back(position); //Add node position on hash ring to list of node positions + nodePositions.push_back(position); sort(nodePositions.begin(), nodePositions.end()); - //int insertPosition = -1; - //for(uint i = 0; i < nodePositions.size(); i++){ - // if(nodePositions[i] == position){ - // insertPosition = i; - // break; - // } - //} - /* commenting out this code since we are making hash ring node only - //Case where the newly inserted node is not at beginning of list - if(insertPosition != 0){ - for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it++){ - if(it -> first <= nodePositions[insertPosition] && it -> first > nodePositions[insertPosition - 1]){ - it -> second = locateClosestNode(it -> first); - } - } - } - //If newly inserted not is at beginning of list, just need to look at files with indexes either less than that node's index, - //or at files with index greater than the highest node index - else{ - for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it++){ - if(it -> first <= nodePositions[insertPosition] || it -> first > nodePositions[nodePositions.size() -1]){ - it -> second = locateClosestNode(it -> first); - } - } - } - */ return 0; //TODO: deal with hash collisions? } -int HashRing::removeFile(int position){ - ring.erase(position); - fileToClosestNode.erase(position); - return 0; - -} - int HashRing::removeNode(int position){ ring.erase(position); for(uint i = 0; i < nodePositions.size(); i++){ @@ -98,20 +47,11 @@ int HashRing::removeNode(int position){ nodePositions.erase(nodePositions.begin() + i); } } - //find all files that point to this node, and tell them to now point to their next closest node instead. - /* comment out this code since we are making hash ring node only at this point. - for(map<int, int>::iterator it = fileToClosestNode.begin(); it != fileToClosestNode.end(); it ++){ - if(it ->second == position){ - it ->second = locateClosestNode(it ->first); - } - }*/ - return 0; } int HashRing::locateClosestNode(string filename){ int filePosition; - //Hash filename to get file position on hash ring, then use that to find closest node to it filePosition = hash<string>{}(filename) % HASHMODULO; for(int i : nodePositions){ if(i >= filePosition){ @@ -119,7 +59,7 @@ int HashRing::locateClosestNode(string filename){ } } //If we cannnot find a Node at a entry on the hash circle greater than or equal to our file position, we wrap back around and take - // the first node's position as where that file should go. + // the first node's position as where that file should go. return nodePositions[0]; } @@ -162,10 +102,10 @@ int HashRing::getSuccessor(int nodePosition){ //exclude the three positions in the tuple, //return node position in hash ring of a random node that is not one of the -//three nodes in this tuple. +//three nodes in this tuple. int HashRing::getRandomNode(tuple<int, int, int> excludedNodes){ if(nodePositions.size() >= 4){ - //get random node + //get random node // return that random node vector<int> indicesToPickFrom; for(unsigned int i = 0; i < nodePositions.size(); i++){ @@ -177,4 +117,4 @@ int HashRing::getRandomNode(tuple<int, int, int> excludedNodes){ return nodePositions[indicesToPickFrom[randomSelection]]; } return -1; -} \ No newline at end of file +} diff --git a/src/Introducer.cpp b/src/Introducer.cpp deleted file mode 100644 index 1e948d3..0000000 --- a/src/Introducer.cpp +++ /dev/null @@ -1,38 +0,0 @@ -#include "../inc/Introducer.h" -#include <ctime> -#include <chrono> -Introducer::Introducer() -{ - -} -/** - * - * HeartbeatToNode: Sends a string version of the membership list to the receiving node. The receiving node will convert the string to - * a <string, long> map where the key is the Addresss (IP + PORT) and value is the heartbeat counter. We then compare the Member. - * - **/ -int Introducer::heartbeatToNode(Member destination){ - //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter - string mem_list_to_send = ""; - //Case 1 need to add destination to membership list - - - //Case 2: destination already exists in the membership list of introducer, just a normal heartbeat - for(auto& element: this->membershipList){ - pair<string, string> keyPair = element.first; - pair<int, int> valuePair = element.second; - mem_list_to_send += keyPair.first + "," + keyPair.second + ","; - mem_list_to_send += to_string(valuePair.first) + "\n"; - } - //Now we have messages ready to send, need to invoke UDP client to send - return 0; -} - -int Introducer::joinSystem(Member introducer){ - Messages join(JOIN, "IP: " + this->nodeInformation.ip + " PortNum: " + this->nodeInformation.port); - return 0; -} - -int Introducer::listenForHeartbeat(){ - -} diff --git a/src/Logger.cpp b/src/Logger.cpp index cc24aad..5ef2995 100644 --- a/src/Logger.cpp +++ b/src/Logger.cpp @@ -1,16 +1,8 @@ #include "../inc/Logger.h" -Logger::Logger() -{ - filename = LOGFILE; - //loggingFile.open(filename); -} +Logger::Logger() { filename = LOGFILE; } -Logger::Logger(string fileName) -{ - filename = fileName; - //loggingFile.open(filename); -} +Logger::Logger(string fileName) { filename = fileName; } int Logger::writeToFile(string messages) { @@ -72,12 +64,5 @@ int Logger::printTheLog(LogType type, string s) default: break; } - //loggingFile << s; return 0; } - -/*int Logger::closeFile() -{ - loggingFile.close(); - return 0; -}*/ \ No newline at end of file diff --git a/src/Messages.cpp b/src/Messages.cpp index 360efad..02c9b62 100644 --- a/src/Messages.cpp +++ b/src/Messages.cpp @@ -8,9 +8,6 @@ Messages::Messages(string payloadMessage) string token; while ((pos = payloadMessage.find(delimiter)) != string::npos) { token = payloadMessage.substr(0, pos); -//#ifdef LOG_VERBOSE -// cout << line << "/" << token << endl; -//#endif switch (line) { case 0: { type = static_cast<MessageType>(stoi(token)); @@ -22,9 +19,6 @@ Messages::Messages(string payloadMessage) payloadMessage.erase(0, pos + delimiter.length()); line++; } -//#ifdef LOG_VERBOSE -// cout << payloadMessage << endl; -//#endif payload = payloadMessage; } diff --git a/src/Node.cpp b/src/Node.cpp index 33423c8..523e9d6 100644 --- a/src/Node.cpp +++ b/src/Node.cpp @@ -1,20 +1,11 @@ #include "../inc/Node.h" - -//add another function to Node class for failure detection -//call function before sender (heartbeat) after listenForHeartbeat - -Node::Node() -{ - // create a udp object +Node::Node(){ udpServent = new UdpSocket(); tcpServent = new TcpSocket(); hashRing = new HashRing(); localTimestamp = 0; heartbeatCounter = 0; - //time(&startTimestamp); - // byteSent = 0; - // byteReceived = 0; runningMode = ALL2ALL; activeRunning = false; prepareToSwitch = false; @@ -25,162 +16,85 @@ Node::Node() joinTimestamp = ""; possibleSuccessorIP = ""; leaderIP = ""; - leaderPort = ""; - isBlackout = true; -} - -Node::Node(ModeType mode) -{ - // create a udp object - udpServent = new UdpSocket(); - tcpServent = new TcpSocket(); - hashRing = new HashRing(); - localTimestamp = 0; - heartbeatCounter = 0; - //time(&startTimestamp); - // byteSent = 0; - // byteReceived = 0; - runningMode = mode; - activeRunning = false; - prepareToSwitch = false; - logWriter = new Logger(LOGGING_FILE_NAME); - leaderPosition = -1; - proposedTime = 0; - electedTime = 0; - joinTimestamp = ""; - possibleSuccessorIP = ""; - leaderIP = ""; leaderPort = ""; isBlackout = true; } +Node::Node(ModeType mode) : Node() { runningMode = mode; } + void Node::startActive() { membershipList.clear(); restartElection(); // inserting its own into the list time(&startTimestamp); - string startTime = ctime(&startTimestamp); - startTime = startTime.substr(0, startTime.find("\n")); - tuple<string,string,string> mapKey(nodeInformation.ip, nodeInformation.port, startTime); - tuple<int, int, int> valueTuple(nodeInformation.heartbeatCounter, nodeInformation.timestamp, 0); - membershipList[mapKey] = valueTuple; - - debugMembershipList(); + string startTime = updateNodeHeartbeatAndTime(); + debugMembershipList(this); joinTimestamp = startTime; // for hashRing getPositionOnHashring(); // update its hashRingPosition } -void Node::computeAndPrintBW(double diff) -{ -#ifdef LOG_VERBOSE - cout << "total " << udpServent->byteSent << " bytes sent" << endl; - cout << "total " << udpServent->byteReceived << " bytes received" << endl; - printf("elasped time is %.2f s\n", diff); -#endif - if (diff > 0) { - double bandwidth = udpServent->byteSent/diff; - string message = "["+to_string(this->localTimestamp)+"] B/W usage: "+to_string(bandwidth)+" bytes/s"; -#ifdef LOG_VERBOSE - printf("%s\n", message.c_str()); -#endif - this->logWriter->printTheLog(BANDWIDTH, message); - } -} - -void Node::updateNodeHeartbeatAndTime() +string Node::updateNodeHeartbeatAndTime() { string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); tuple<string, string, string> keyTuple(nodeInformation.ip, nodeInformation.port,startTime); tuple<int, int, int> valueTuple(heartbeatCounter, localTimestamp, 0); this->membershipList[keyTuple] = valueTuple; + return startTime; } string Node::populateMembershipMessage() { //The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag string mem_list_to_send = ""; - //Assume destination already exists in the membership list of this node, just a normal heartbeat switch (this->runningMode) { case GOSSIP: - for (auto& element: this->membershipList) { - tuple<string, string, string> keyTuple = element.first; - tuple<int, int, int> valueTuple = element.second; - mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; - mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; - } - break; - + return populateIntroducerMembershipMessage(); // code re-use default: string startTime = ctime(&startTimestamp); startTime = startTime.substr(0, startTime.find("\n")); mem_list_to_send += nodeInformation.ip + "," + nodeInformation.port + "," + startTime + ","; mem_list_to_send += to_string(heartbeatCounter) + "," + to_string(0) + "\n"; - break; + return mem_list_to_send; } - return mem_list_to_send; } -string Node::populateIntroducerMembershipMessage(){ +string Node::populateIntroducerMembershipMessage() { string mem_list_to_send = ""; for (auto& element: this->membershipList) { tuple<string, string, string> keyTuple = element.first; tuple<int, int, int> valueTuple = element.second; - mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; + mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ","; mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n"; } return mem_list_to_send; } -/** - * - * HeartbeatToNode: Sends a string version of the membership list to the receiving node. The receiving node will convert the string to - * a <string, long> map where the key is the Addresss (IP + PORT) and value is the heartbeat counter. We then compare the Member. - * - **/ int Node::heartbeatToNode() { - // 3. prepare to send heartbeating, and + string msg; string mem_list_to_send = populateMembershipMessage(); vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); - - //Now we have messages ready to send, need to invoke UDP client to send #ifdef LOG_VERBOSE cout << "pick " << targetNodes.size() << " of " << this->membershipList.size()-1; cout << " members" << endl; #endif - - // 4. do gossiping for (uint i=0; i<targetNodes.size(); i++) { - //cout << targetNodes[i].first << "/" << targetNodes[i].second << endl; Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); - string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); #ifdef LOG_VERBOSE cout << "[Gossip]" << message.c_str() << endl; #endif this->logWriter->printTheLog(GOSSIPTO, message); - - //cout << mem_list_to_send.size() << " Bytes sent..." << endl; - // byteSent += mem_list_to_send.size(); if (isLeader) { - //Messages msg(LEADERHEARTBEAT, mem_list_to_send); - if (isBlackout) { - string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } else { - string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - } else { - //Messages msg(HEARTBEAT, mem_list_to_send); - string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); + if (isBlackout) msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); + else msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); } + else msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); + udpServent->sendMessage(destination.ip, destination.port, msg); } - return 0; } @@ -197,27 +111,27 @@ int Node::failureDetection(){ cout << "checking " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; #endif if ((get<0>(keyTuple).compare(nodeInformation.ip) == 0) && (get<1>(keyTuple).compare(nodeInformation.port) == 0)) { - // do not check itself #ifdef LOG_VERBOSE - cout << "skip it" << endl; -#endif + cout << "do not check itself" << endl; +#endif continue; } + //node has not failed if(get<2>(valueTuple) == 0){ + //timeout passed, set as failed if(localTimestamp - get<1>(valueTuple) > T_timeout){ //cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl; //cout << "local time " << localTimestamp << " vs. " << get<1>(valueTuple) << endl; get<1>(this->membershipList[keyTuple]) = localTimestamp; get<2>(this->membershipList[keyTuple]) = 1; - string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple)+": Local Failure"; cout << "[FAIL]" << message.c_str() << endl; this->logWriter->printTheLog(FAIL, message); if(isLeader){ - // clearn up fileList Member deletedNode(get<0>(keyTuple), get<1>(keyTuple)); int deletedNodePostion = hashingId(deletedNode, get<2>(keyTuple)); hashRing->removeNode(deletedNodePostion); + //for each file, remove the deleted node from its location vector for (auto& element: fileList) { vector<int> newEntry; for(unsigned int i = 0; i < element.second.size(); i++){ @@ -234,7 +148,8 @@ int Node::failureDetection(){ tuple<string,string,string> sender = senders.second; if ((get<0>(keyTuple).compare(get<0>(sender))==0) && get<0>(pendingRequestSent[sdfsfilename]) && - (get<0>(pendingRequests[sdfsfilename])!=-1)) { + (get<0>(pendingRequests[sdfsfilename])!=-1)) + { // it sent out, and is not finished, cannot help // we lost the data from the client cout << "[PUT] client itself fails, we cannot help, remove request" << endl; @@ -245,7 +160,8 @@ int Node::failureDetection(){ } if ((get<0>(keyTuple).compare(get<1>(sender))==0) && get<1>(pendingRequestSent[sdfsfilename]) && - (get<1>(pendingRequests[sdfsfilename])!=-1)) { + (get<1>(pendingRequests[sdfsfilename])!=-1)) + { // the sender fails during 2nd pass // replace the sent cout << "[PUT] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl; @@ -258,29 +174,24 @@ int Node::failureDetection(){ } if ((get<0>(keyTuple).compare(get<2>(sender))==0) && get<2>(pendingRequestSent[sdfsfilename]) && - (get<2>(pendingRequests[sdfsfilename])!=-1)) { + (get<2>(pendingRequests[sdfsfilename])!=-1)) + { // it sent out, but replicates are failed // restart again cout << "[PUT/REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl; pendingRequests.erase(sdfsfilename); } } - + } - + } } + //check for cleanup on already failed nodes else{ if(localTimestamp - get<1>(valueTuple) > T_cleanup){ - // core dumped happened here; bug fix auto iter = this->membershipList.find(keyTuple); if (iter != this->membershipList.end()) { - //cout << "Got " << get<0>(iter->first) << "/" << get<1>(iter->first) << "/" << get<2>(iter->first); - //cout << " with " << to_string(get<0>(iter->second)) << "/"; - //cout << to_string(get<1>(iter->second)) << "/"; - //cout << to_string(get<2>(iter->second)) << endl; - //cout << this->membershipList[keyTuple] - //this->membershipList.erase(iter); removedVec.push_back(keyTuple); } } @@ -299,12 +210,10 @@ int Node::failureDetection(){ } this->membershipList.erase(iter); - string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(removedVec[i])+"/"+get<1>(removedVec[i])+"/"+get<2>(removedVec[i])+": REMOVED FROM LOCAL MEMBERSHIP LIST"; cout << "[REMOVE]" << message.c_str() << endl; this->logWriter->printTheLog(REMOVE, message); - - //this->debugMembershipList(); + //debugMembershipList(this); } } if (this->membershipList.size()==1 || leaderRemoved) { // Only me or leader failed, restart leader election @@ -315,13 +224,10 @@ int Node::failureDetection(){ return 0; } - -int Node::joinSystem(Member introducer) +int Node::joinSystem(Member introducer) { string mem_list_to_send = populateMembershipMessage(); - //Messages msg(JOIN, mem_list_to_send); string msg = populateSDFSFileList(JOIN, mem_list_to_send); - string message = "["+to_string(this->localTimestamp)+"] sent a request to "+introducer.ip+"/"+introducer.port+", I am "+nodeInformation.ip+"/"+nodeInformation.port; cout << "[JOIN]" << message.c_str() << endl; this->logWriter->printTheLog(JOINGROUP, message); @@ -332,23 +238,18 @@ int Node::joinSystem(Member introducer) int Node::requestSwitchingMode() { string message = nodeInformation.ip+","+nodeInformation.port; - //Messages msg(SWREQ, message); string msg = populateSDFSFileList(SWREQ, message); for(auto& element: this->membershipList) { tuple<string,string,string> keyTuple = element.first; - //tuple<int, int, int> valueTuple = element.second; cout << "[SWITCH] sent a request to " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << endl; udpServent->sendMessage(get<0>(keyTuple), get<1>(keyTuple), msg); } return 0; } -int Node::SwitchMyMode() -{ - // wait for a while - sleep(T_switch); - // empty all messages - udpServent->qMessages = queue<string>(); +int Node::SwitchMyMode() { + sleep(T_switch); // wait for a while + udpServent->qMessages = queue<string>(); // empty all messages switch (this->runningMode) { case GOSSIP: { this->runningMode = ALL2ALL; @@ -363,76 +264,30 @@ int Node::SwitchMyMode() default: break; } - // finishing up - prepareToSwitch = false; + prepareToSwitch = false; // finishing up return 0; } - -int Node::listenToHeartbeats() -{ - //look in queue for any strings --> if non empty, we have received a message and need to check the membership list +int Node::listenToHeartbeats() { // 1. deepcopy and handle queue queue<string> qCopy(udpServent->qMessages); udpServent->qMessages = queue<string>(); - int size = qCopy.size(); //cout << "Got " << size << " messages in the queue" << endl; //cout << "checking queue size " << nodeOwn->udpServent->qMessages.size() << endl; - - // 2. merge membership list for (int j = 0; j < size; j++) { //cout << qCopy.front() << endl; - readMessage(qCopy.front()); - - // Volunteerily leave - if(this->activeRunning == false){ - return 0; - } - // byteReceived += qCopy.front().size(); + handleUdpMessage(qCopy.front()); + if(this->activeRunning == false) return 0; qCopy.pop(); } - return 0; } -void Node::debugMembershipList() -{ - cout << "Membership list [" << this->membershipList.size() << "]:" << endl; - if (isLeader) { - cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; - } else { - cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; - } - string message = ""; - - for (auto& element: this->membershipList) { - tuple<string,string,string> keyTuple = element.first; - tuple<int, int, int> valueTuple = element.second; - - if (nodeInformation.ip.compare(get<0>(keyTuple))==0) { // Myself - if (isLeader) { - message += "[L/M] "; - } else { - message += "[M] "; - } - } else if (leaderIP.compare(get<0>(keyTuple))==0) { - message += "[L] "; - } else { - if (isLeader) { - message += " "; - } else { - message += " "; - } - } - - message += get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple); - message += ": "+to_string(get<0>(valueTuple))+"/"+to_string(get<1>(valueTuple))+"/"+to_string(get<2>(valueTuple))+"\n"; - } - cout << message.c_str() << endl; - this->logWriter->printTheLog(MEMBERS, message); -} - +/* + * Take a hearbeat message, if the member doesn't exist add it, update hashring, and disseminate out memberList + * If it exists, check for failure, and if there is update fail flag, otherwise try ot update heartbeat +*/ void Node::processHeartbeat(string message) { bool changed = false; vector<string> incomingMembershipList = splitString(message, "\n"); @@ -446,10 +301,7 @@ void Node::processHeartbeat(string message) { } membershipListEntry.clear(); membershipListEntry = splitString(list_entry, ","); - if (membershipListEntry.size() != 5) { - // circumvent craching - continue; - } + if (membershipListEntry.size() != 5) continue; int incomingHeartbeatCounter = stoi(membershipListEntry[3]); int failFlag = stoi(membershipListEntry[4]); @@ -459,17 +311,14 @@ void Node::processHeartbeat(string message) { // Volunteerily leave if you are marked as failed if(failFlag == 1){ this->activeRunning = false; - string message = "["+to_string(this->localTimestamp)+"] node "+this->nodeInformation.ip+"/"+this->nodeInformation.port+" is left"; cout << "[VOLUNTARY LEAVE]" << message.c_str() << endl; this->logWriter->printTheLog(LEAVE, message); return; } - - // do not check itself heartbeat #ifdef LOG_VERBOSE - cout << "skip it" << endl; -#endif + cout << "do not check itself heartbeat" << endl; +#endif continue; } @@ -501,7 +350,6 @@ void Node::processHeartbeat(string message) { if(incomingHeartbeatCounter > currentHeartbeatCounter){ get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter; get<1>(this->membershipList[mapKey]) = localTimestamp; - // get<2>(this->membershipList[mapKey]) = failFlag; string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter); #ifdef LOG_VERBOSE cout << "[UPDATE]" << message.c_str() << endl; @@ -526,43 +374,11 @@ void Node::processHeartbeat(string message) { break; } } - } else { - // do nothing - } + } } } - // If membership list changed in all-to-all, full membership list will be sent - if(changed && this->runningMode == ALL2ALL){ - string mem_list_to_send = populateIntroducerMembershipMessage(); - vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo(); - - for (uint i=0; i<targetNodes.size(); i++) { - Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i])); - - string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]); -#ifdef LOG_VERBOSE - cout << "[Gossip]" << message.c_str() << endl; -#endif - this->logWriter->printTheLog(GOSSIPTO, message); - - if (isLeader) { - //Messages msg(LEADERHEARTBEAT, mem_list_to_send); - if (isBlackout) { - string msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } else { - string msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - } else { - //Messages msg(HEARTBEAT, mem_list_to_send); - string msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send); - udpServent->sendMessage(destination.ip, destination.port, msg); - } - - } - } + if(changed && this->runningMode == ALL2ALL) heartbeatToNode(); } void Node::setUpLeader(string message, bool pending) @@ -594,25 +410,20 @@ void Node::setUpLeader(string message, bool pending) /** * given a string message which contains a membership list, we will take the string, split it by returns, and then split it by commas, to then compare the heartbeat counters * of each IP,PORT,timestamp tuple with the membership list of the receiving Node. - * * Found help on how to do string processing part of this at https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c */ -void Node::readMessage(string message){ - - // decapsulate with specific messages - //cout << "readMessage " << message << endl; +void Node::handleUdpMessage(string message){ + //cout << "handleUdpMessage " << message << endl; string deMeg = decapsulateMessage(message); bool pending = true; - //cout << "readMessage deMeg " << deMeg << endl; - + //cout << "handleUdpMessage deMeg " << deMeg << endl; Messages msg(deMeg); switch (msg.type) { case LEADERHEARTBEAT: // Note: not for Gossip-style, only All-to-All //cout << "LEADERHEARTBEAT: " << msg.payload << endl; pending = false; - case LEADERPENDING: - setUpLeader(msg.payload, pending); - case HEARTBEAT: + case LEADERPENDING: setUpLeader(msg.payload, pending); + case HEARTBEAT: case JOINRESPONSE:{ processHeartbeat(msg.payload); break; @@ -624,13 +435,11 @@ void Node::readMessage(string message){ Member member(fields[0], fields[1]); int checkPosition = hashingId(member, fields[2]); if (checkHashNodeCollision(checkPosition)) { - //Messages response(JOINREJECT, ""); string response = populateSDFSFileList(JOINREJECT, ""); udpServent->sendMessage(fields[0], fields[1], response); } else { string introducerMembershipList; introducerMembershipList = populateIntroducerMembershipMessage(); - //Messages response(JOINRESPONSE, introducerMembershipList); string response = populateSDFSFileList(JOINRESPONSE, introducerMembershipList); udpServent->sendMessage(fields[0], fields[1], response); } @@ -646,7 +455,6 @@ void Node::readMessage(string message){ //Messages msgReply(SWRESP, messageReply); string msgReply = populateSDFSFileList(SWRESP, messageReply); udpServent->sendMessage(fields[0], fields[1], msgReply); - prepareToSwitch = true; } break; @@ -669,29 +477,7 @@ void Node::readMessage(string message){ default: break; } - //debugMembershipList(); -} - -vector<string> Node::splitString(string s, string delimiter){ - vector<string> result; - size_t pos_start = 0, pos_end, delim_len = delimiter.length(); - string token; - - while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { - token = s.substr (pos_start, pos_end - pos_start); - pos_start = pos_end + delim_len; - result.push_back (token); - } - - result.push_back (s.substr (pos_start)); - return result; -} - -int Node::hashingId(Member nodeMember, string joinTime) -{ - string toBeHashed = "NODE::" + nodeMember.ip + "::" + nodeMember.port + "::" + joinTime; - int ringPosition = hash<string>{}(toBeHashed) % HASHMODULO; - return ringPosition; + //debugMembershipList(this); } int Node::getPositionOnHashring(){ @@ -778,7 +564,7 @@ bool Node::findWillBeLeader() //cout << "[ELECTION] My Possible Successor is " << ipAddresses[index] << endl; possibleSuccessorIP = ipAddresses[index]; } - + return beLeader; } @@ -792,38 +578,32 @@ void Node::restartElection() // haven't tested yet leaderPort = ""; } -void Node::leaderCreateHashRing() -{ - // The leader or notes creates hashRing +void Node::leaderCreateHashRing() { hashRing->clear(); for (auto& element: this->membershipList) { // update hashRing tuple<string, string, string> keyTuple = element.first; Member member(get<0>(keyTuple), get<1>(keyTuple)); int pos = hashingId(member, get<2>(keyTuple)); - //hashRing->addNode("NODE::"+get<0>(keyTuple), pos); // since we don't store file, remove NODE hashRing->addNode(get<0>(keyTuple), pos); } - //hashRing->debugHashRing(); } -void Node::proposeToBeLeader() -{ - // Start election +void Node::proposeToBeLeader() { Messages msg(ELECTION, to_string(hashRingPosition)); cout << "[ELECTION] Propose to be leader, send to " << possibleSuccessorIP << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString()); } -void Node::processElection(Messages messages) +void Node::electionMessageHandler(Messages messages) { switch (messages.type) { case ELECTION: { // check id int currentId = stoi(messages.payload); - if (hashRingPosition > currentId) { + if (hashRingPosition > currentId) { //incoming is smaller, just forward cout << "[ELECTION] Got Election, agree on voting: " << messages.payload << endl; tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString()); - } else if (hashRingPosition < currentId) { + } else if (hashRingPosition < currentId) { //incoming is biger, replace and send it cout << "[ELECTION] Got Election, against this voting " << messages.payload; cout << ", and using my id " << hashRingPosition << endl; @@ -860,26 +640,19 @@ void Node::processElection(Messages messages) } } -void Node::processTcpMessages() +void Node::tcpElectionProcessor() { queue<string> qCopy(tcpServent->qMessages); tcpServent->qMessages = queue<string>(); - - int size = qCopy.size(); //cout << "Got " << size << " TCP messages" << endl; - - for (int j=0; j<size; j++) { + for (int j=0; j<qCopy.size(); j++) { //cout << qCopy.front() << endl; Messages msg(qCopy.front()); //cout << "Has " << msg.type << " with " << msg.payload << endl; switch (msg.type) { case ELECTION: - case ELECTIONACK: { - processElection(msg); - break; - } - default: - break; + case ELECTIONACK: electionMessageHandler(msg); + default: break; } qCopy.pop(); } @@ -903,10 +676,8 @@ void Node::updateFileList(string sdfsfilename, int nodePosition) } } -//Can only be called when we are the leader node, since we are going to be calling REREPLICATE here, and checking the global file list. -//Called in ProcessRegMessages before we do anything else, since we want to check the global file list consistency before we process the other messages -//In the Queue. void Node::checkFileListConsistency(){ + if (!isLeader) return; if (membershipList.size() < 4) { cout << "[ERROR] The number of members are too small, we need at least 4" << endl; return; @@ -926,12 +697,6 @@ void Node::checkFileListConsistency(){ string nodeInfo = hashRing->getValue(nodesToCheck[i]); Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::"); tuple<int, int, int> request = pendingRequests[element.first]; - if(get<0>(request) == 0 && get<1>(request) == 0 && get<2>(request) == 0){ - pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]); - pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true); - tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); - break; - } if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){ cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl; break; @@ -947,31 +712,51 @@ void Node::checkFileListConsistency(){ } -bool Node::isInVector(vector<int> v, int i){ - for(int element: v){ - if(element == i){ - return true; - } - } - return false; +vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() +{ + vector<tuple<string, string, string>> availableNodesInfo; + vector<tuple<string, string, string>> selectedNodesInfo; + vector<int> indexList; + int availableNodes = 0; + for(auto& element: this->membershipList){ + tuple<string, string, string> keyPair = element.first; + tuple<int, int, int> valueTuple = element.second; + //dont gossip to self or failed nodes + if(get<0>(keyPair).compare(this->nodeInformation.ip) && (get<2>(valueTuple) != 1)){ + availableNodesInfo.push_back(keyPair); + indexList.push_back(availableNodes++); + } + } + switch (this->runningMode) { + case GOSSIP: { + srand(time(NULL)); + // N_b is a predefined number + if (availableNodes <= N_b) return availableNodesInfo; + int nodeCount = 0; + while (nodeCount < N_b) { + int randomNum = rand() % availableNodes; + selectedNodesInfo.push_back(availableNodesInfo[indexList[randomNum]]); + indexList.erase(indexList.begin() + randomNum); + availableNodes--; + nodeCount++; + } + return selectedNodesInfo; + } + //ALL2ALL + default: { + return availableNodesInfo; + } + } } - -void Node::processRegMessages() +void Node::handleTcpMessage() { - //Before we do anything here, we should have the leader check to see if the file list is consistent or not. - //We do this by: - //1. Checking the files in the filelist, making sure each one has 4 entries. If not, then we need to rereplicate. - // We can initiate a PUT, put pending request, setting as -1, -1, and then last one as target node that we want to replicate to (new node to replace the one that failed) - if(isLeader){ - checkFileListConsistency(); - } + //Before we do anything here, we should have the leader check to see if the file list is consistent or not. + checkFileListConsistency(); queue<string> qCopy(tcpServent->regMessages); tcpServent->regMessages = queue<string>(); - int size = qCopy.size(); //cout << "Got " << size << " TCP messages" << endl; - for (int j=0; j<size; j++) { // cout << qCopy.front() << endl; vector<string> msgSplit = splitString(qCopy.front(), "::"); @@ -981,11 +766,8 @@ void Node::processRegMessages() } string payload = ""; for(uint k = 1; k < msgSplit.size(); k++){ - if(k == msgSplit.size() - 1){ - payload += msgSplit[k]; - } else { - payload += msgSplit[k] + "::"; - } + if(k == msgSplit.size() - 1) payload += msgSplit[k]; + else payload += msgSplit[k] + "::"; } MessageType msgType = static_cast<MessageType>(stoi(msgSplit[0])); Messages msg(msgType, payload); @@ -994,13 +776,9 @@ void Node::processRegMessages() case PUTACK: { vector<string> inMsg = splitString(msg.payload, "::"); if(inMsg.size() >= 4){ - string inMsgIP = inMsg[0]; - string sdfsfilename = inMsg[1]; - string localfilename = inMsg[2]; - string remoteLocalname = inMsg[3]; - + string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; + string localfilename = inMsg[2], remoteLocalname = inMsg[3]; cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl; - localFilelist[sdfsfilename] = localfilename; Messages outMsg(ACK, to_string(this->hashRingPosition)+"::"+sdfsfilename+"::"+remoteLocalname); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); @@ -1011,9 +789,7 @@ void Node::processRegMessages() if (isLeader) { vector<string> inMsg = splitString(msg.payload, "::"); if(inMsg.size() >= 2){ - string inMsgIP = inMsg[0]; - string sdfsfilename = inMsg[1]; - + string inMsgIP = inMsg[0], sdfsfilename = inMsg[1]; cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl; localFilelist.erase(sdfsfilename); fileList.erase(sdfsfilename); @@ -1024,16 +800,13 @@ void Node::processRegMessages() } case DNSGET: { if(isLeader){ - // Do replicating to the node - //isBlackout = true; vector<string> inMsg = splitString(msg.payload, "::"); cout << "msg.payload " << msg.payload << endl; if(inMsg.size() >= 4){ string inMsgIP = inMsg[0]; int nodePosition = stoi(inMsg[1]); int selectedNodePosition = nodePosition; - string sdfsfilename = inMsg[2]; - string localfilename = inMsg[3]; + string sdfsfilename = inMsg[2], localfilename = inMsg[3]; cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl; vector<int> positions = fileList[sdfsfilename]; if (positions.size() == 0) { @@ -1042,7 +815,6 @@ void Node::processRegMessages() fileList.erase(sdfsfilename); Messages outMsg(GETNULL, sdfsfilename+": the file is not available::"); this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString()); - //isBlackout = false; break; } cout << "[DNSGET] we have "; @@ -1080,7 +852,6 @@ void Node::processRegMessages() cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename; cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl; - //this->localFilelist[sdfsfilename] = localfilename; // update fileList, client itself is one of the replicas updateFileList(sdfsfilename, nodePosition); hashRing->debugHashRing(); @@ -1089,7 +860,7 @@ void Node::processRegMessages() int succ = hashRing->getSuccessor(closestNode); if (hashRing->getValue(closestNode).compare(inMsgIP)==0) { closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); - cout << "[DNS] we need one more node " << closestNode << endl; + cout << "[DNS] we need one more node " << closestNode << endl; } if (hashRing->getValue(pred).compare(inMsgIP)==0) { pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ)); @@ -1119,13 +890,10 @@ void Node::processRegMessages() // since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address string nodeIP = hashRing->getValue(nodePosition); //cout << "nodeIP " << nodeIP << endl; - cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP; cout << " using localfilename: " << inMsg[1] << endl; - string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"; this->tcpServent->pendSendMessages.push(sendMsg); - //this->tcpServent->sendFile(nodeIP, TCPPORT, inMsg[1], inMsg[2], ""); } break; } @@ -1178,13 +946,12 @@ void Node::processRegMessages() string sdfsfilename = inMsg[1]; string localfilename = inMsg[2]; localFilelist[sdfsfilename] = localfilename; - Messages outMsg(LEADERACK, this->nodeInformation.ip + "::" + to_string(this->hashRingPosition) + "::" + msg.payload); cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename; cout << " on node " << nodePosition << ", and ACK back to the leader" << endl; this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString()); } - + break; } case LEADERACK:{ @@ -1200,7 +967,7 @@ void Node::processRegMessages() cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl; string closestNodeIP = ""; - + // update fileList updateFileList(sdfsfilename, inMsgnodePosition); updateFileList(sdfsfilename, nodePosition); @@ -1249,14 +1016,14 @@ void Node::processRegMessages() pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), inMsgIP, get<2>(pendingSenderRequests[sdfsfilename])); } if((get<2>(pendingRequests[sdfsfilename]) != -1) && (!get<2>(pendingRequestSent[sdfsfilename]))){ - Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); + Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename); // cout << "Sending out rereplicate to " << closestNodeIP << "with message " << outMsg.toString() << endl; cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos "; cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl; this->tcpServent->sendMessage(closestNodeIP, TCPPORT, outMsg.toString()); pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), get<1>(pendingRequestSent[sdfsfilename]), true); pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), get<1>(pendingSenderRequests[sdfsfilename]), inMsgIP); - } + } } } break; @@ -1268,28 +1035,6 @@ void Node::processRegMessages() } } -/** - * Store the given filename in your sdfs filename, discard the original name, and - * give it a new name. The hashing will be done based on this sdfs filename. - * - * Can be called by any node, this one will be called by sender - * -*/ -// int Node::putFileSender(string filename, string sdfsfilename){ -// tcpServent->sendFile(leaderIP, leaderPort, filename); -// return 0; - -// } - -// int Node::putFileMaster(string sdfsfilename){ -// return 0; -// } - -// int Node::putFileReeiver(string sdfsfilename){ -// return 0; - -// } - void Node::listLocalFiles(){ cout << "sdfsfilename ---> localfilename" << endl; for (auto& element: localFilelist) { @@ -1297,22 +1042,6 @@ void Node::listLocalFiles(){ } } -void Node::debugSDFSFileList() { - cout << "sdfsfilename ---> positions,..." << endl; - for (auto& element: fileList) { - cout << element.first << " ---> "; - for (uint i=0; i<element.second.size(); i++) { - cout << element.second[i]; - if (i == element.second.size()-1) { - continue; - } else { - cout << ", "; - } - } - cout << endl; - } -} - void Node::listSDFSFileList(string sdfsfilename) { bool found = false; vector<int> foundPositions; @@ -1324,22 +1053,25 @@ void Node::listSDFSFileList(string sdfsfilename) { } } if (found) { + cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; if (foundPositions.size() > 0) { - cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; - cout << "=========" << endl; for (uint i=0; i<foundPositions.size(); i++) { string storedIP = hashRing->getValue(foundPositions[i]); cout << storedIP << " at " << foundPositions[i] << endl; } - } else { - cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl; - cout << "=== Current list is empty ===" << endl; - } + } else { cout << "=== Current list is empty ===" << endl; } } else { cout << "sdfsfilename " << sdfsfilename << " is not existed" << endl; } } +/* + * Leader sends out fileList in the following string format: + * first 2 bytes are filename len, FILENAME msg type, filename itself, + * 2 bytes for the number of positions the file has, FILEPOSITION msg type, + * and a string of a commas seperated list of positions following that, ending in null byte. + * All files are encapsulated in this way and joined to make one string +*/ string Node::encapsulateFileList() { string enMeg = ""; @@ -1391,6 +1123,7 @@ string Node::encapsulateFileList() return enMeg; } +//(len, PayloadType, message, \0) encoding where len is 2 bytes. string Node::encapsulateMessage(map<PayloadType,string> payloads) { string enMeg = ""; @@ -1401,7 +1134,6 @@ string Node::encapsulateMessage(map<PayloadType,string> payloads) //cout << "message " << message << endl; //cout << "message.length " << message.length() << endl; //cout << "type " << type << endl; - char *cstr = new char[message.length()+4]; size_t len = message.length()+3; cstr[0] = len & 0xff; @@ -1547,11 +1279,3 @@ string Node::populateSDFSFileList(MessageType type, string mem_list_to_send) string enMeg = encapsulateMessage(payloads); return enMeg; } - -void Node::findNodesWithFile(string sdfsfilename){ - /*tuple<int, int, int> nodes = fileList[sdfsfilename]; - cout << hashRing->getValue(get<0>(nodes)) << endl; - cout << hashRing->getValue(get<1>(nodes)) << endl; - cout << hashRing->getValue(get<2>(nodes)) << endl;*/ -} - diff --git a/src/RandomGenerator.cpp b/src/RandomGenerator.cpp deleted file mode 100644 index b70b6eb..0000000 --- a/src/RandomGenerator.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include "../inc/Node.h" - -vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo() -{ - //make an index list, then math.random to get index - // from list (remember to exclude the node itself) - vector<int> indexList; - vector<string> IPList; - vector<string> portList; - vector<string> timestampList; - vector<tuple<string, string, string>> selectedNodesInfo; - //int i = 0; - for(auto& element: this->membershipList){ - tuple<string, string, string> keyPair = element.first; - tuple<int, int, int> valueTuple = element.second; - //cout << "run " << keyPair.first << "/" << keyPair.second << endl; - //add check to make sure we don't add any failed nodes to gossip to - if(get<0>(keyPair).compare(this->nodeInformation.ip) != 0 && get<2>(valueTuple) != 1){ - IPList.push_back(get<0>(keyPair)); - portList.push_back(get<1>(keyPair)); - timestampList.push_back(get<2>(keyPair)); - indexList.push_back(IPList.size()-1); // bug fix - } - //i++; - } - // on one to gossip, return an empty vector - if (IPList.size() == 0) { - return selectedNodesInfo; - } - - /*for (uint j=0; j<indexList.size(); j++) { - cout << indexList[j] << ":" << IPList[j] << "/" << portList[j] << endl; - }*/ - - switch (this->runningMode) { - case GOSSIP: { - srand(time(NULL)); - int nodesSelected = 0; - // N_b is a predefined number - if(IPList.size() <= N_b){ - for (uint j=0; j<indexList.size(); j++) { - //int chosenIndex = indexList[j]; - //cout << "put " << IPList[j] << "/" << portList[j] << endl; - selectedNodesInfo.push_back(make_tuple(IPList[j], portList[j], timestampList[j])); - } - } - else{ - while (nodesSelected < N_b) { - int randomNum = rand() % indexList.size(); - int chosenIndex = indexList[randomNum]; - selectedNodesInfo.push_back(make_tuple(IPList[chosenIndex], portList[chosenIndex], timestampList[chosenIndex])); - indexList.erase(indexList.begin() + randomNum); - nodesSelected++; - } - } - break; - } - default: { - // All2All - for (uint j=0; j<indexList.size(); j++) { - //int chosenIndex = indexList[j]; - //cout << "put " << IPList[j] << "/" << portList[j] << endl; - selectedNodesInfo.push_back(make_tuple(IPList[j], portList[j], timestampList[j])); - } - break; - } - } - return selectedNodesInfo; -} \ No newline at end of file diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 2860bf1..a833e09 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -3,15 +3,10 @@ #include "../inc/Messages.h" #include "../inc/FileObject.h" -void sigchld_handler(int s) -{ - while(waitpid(-1, NULL, WNOHANG) > 0); -} - -TcpSocket::TcpSocket() -{ +// just loop through cleaning up threads +void sigchld_handler(int s){ while(waitpid(-1, NULL, WNOHANG) > 0); } -} +TcpSocket::TcpSocket(){} void TcpSocket::bindServer(string port) { @@ -20,13 +15,9 @@ void TcpSocket::bindServer(string port) struct sockaddr_storage their_addr; // connector's address information socklen_t sin_size; struct sigaction sa; - int yes = 1; - char s[INET6_ADDRSTRLEN]; - int rv; + int yes = 1, rv = 0, numbytes = 0; char buf[DEFAULT_TCP_BLKSIZE]; - int numbytes; string delimiter = "::"; - memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -37,26 +28,22 @@ void TcpSocket::bindServer(string port) return; } - // loop through all the results and bind to the first we can for(p = servinfo; p != NULL; p = p->ai_next) { if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { perror("server: socket"); continue; } - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { perror("setsockopt"); exit(1); } - - if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + if (::bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { close(sockfd); perror("server: bind"); continue; } - break; } @@ -79,128 +66,23 @@ void TcpSocket::bindServer(string port) perror("sigaction"); exit(1); } - - //printf("server: waiting for connections...\n"); - while(1) { // main accept() loop sin_size = sizeof their_addr; + char remoteIP[INET6_ADDRSTRLEN]; new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); - if (new_fd == -1) { - perror("accept"); - continue; - } - - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s); - //printf("server: got connection from %s\n", s); - + if (new_fd == -1) { perror("accept"); continue; } + inet_ntop(their_addr.ss_family,get_in_addr((struct sockaddr *)&their_addr),remoteIP, sizeof(remoteIP)); bzero(buf, sizeof(buf)); - MessageType type = JOIN; // for default case here - string payload = ""; if ((numbytes = recv(new_fd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { - //buf[numbytes] = '\0'; - //printf("Got %s\n", buf); - string payloadMessage(buf); - Messages msg(payloadMessage); - //printf("message type: %d\n", msg.type); - type = msg.type; - payload = msg.payload; - } - - switch (type) { - case ELECTION: - case ELECTIONACK: { - //cout << "election id is " << payload << endl; - string payloadMessage(buf); - //cout << "payloadMessage " << payloadMessage << endl; - qMessages.push(payloadMessage); - //cout << "qMessages size 1 " << qMessages.size() << endl; - break; - } - case PUT: { - FILE *fp; - int filesize = 0; - int byteReceived = 0; - string sdfsfilename = ""; - string incomingChecksum = ""; - string remoteLocalname = ""; - string overwriteFilename = ""; - // format: size,checksum,sdfsfilename - vector<string> fields = splitString(payload, ","); - if (fields.size() >= 5) { - filesize = stoi(fields[0]); - incomingChecksum = fields[1]; - sdfsfilename = fields[2]; - remoteLocalname = fields[3]; - overwriteFilename = fields[4]; - } - cout << "file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; - - time_t fileTimestamp; - time(&fileTimestamp); - string localfilename = sdfsfilename+"_"+to_string(fileTimestamp); - if (overwriteFilename.compare("") != 0) { - localfilename = overwriteFilename; - cout << "it's GET with filename " << overwriteFilename << endl; - } - cout << "backup filename " << localfilename << endl; - fp = fopen(localfilename.c_str(), "wb"); - if (fp == NULL) { - cout << "file error" << endl; - close(new_fd); - exit(0); - } - - bzero(buf, sizeof(buf)); - while ((numbytes=recv(new_fd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { - //printf("Got %d\n", numbytes); - fwrite(buf, sizeof(char), numbytes, fp); - byteReceived += numbytes; - if (byteReceived >= filesize) { - break; - } - bzero(buf, sizeof(buf)); - } - cout << "we have all the file, finishing this connections" << endl; - fclose(fp); - - FileObject f(localfilename); - if(incomingChecksum.compare(f.checksum) != 0){ - cout << "[ERROR] FILE CORRUPTED" << endl; - // TODO: Handel file corruption here - } else { - string returnIP(s); - Messages putack(PUTACK, returnIP + "::" + sdfsfilename + "::" + localfilename+"::"+remoteLocalname); - regMessages.push(putack.toString()); - } - - break; - } - case DNSANS: - case ACK: - case PUTACK: - case LEADERACK: - case REREPLICATE: - case REREPLICATEGET: - case DNSGET: - case DELETE: - case GETNULL: - case DNS:{ - string payloadMessage(buf); - cout << "Type: " << type << " payloadMessage: " << payloadMessage << endl; - regMessages.push(payloadMessage); - break; - } - default: - break; + string returnIP(remoteIP); + if (messageHandler(new_fd, payloadMessage, returnIP)) continue; } close(new_fd); } } -string TcpSocket::getFileMetadata(int size, string checksum, +string TcpSocket::getFileMetadata(int size, string checksum, string sdfsfilename, string localfilename, string remoteLocalfilename) { // format: size,checksum,sdfsfilename @@ -208,57 +90,15 @@ string TcpSocket::getFileMetadata(int size, string checksum, return msg; } -void TcpSocket::sendFile(string ip, string port, +void TcpSocket::sendFile(string ip, string port, string localfilename, string sdfsfilename, string remoteLocalfilename) { - int sockfd, numbytes; + int numbytes, sockfd; char buf[DEFAULT_TCP_BLKSIZE]; - struct addrinfo hints, *servinfo, *p; - int rv; - char s[INET6_ADDRSTRLEN]; FILE *fp; - int size = 0; - + int size = 0, sendSize = 0; bzero(buf, sizeof(buf)); - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return; - } - - // loop through all the results and connect to the first we can - for (p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("client: socket"); - continue; - } - - if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - close(sockfd); - perror("client: connect"); - continue; - } - - break; - } - - if (p == NULL) { - fprintf(stderr, "client: failed to connect\n"); - return; - } - - inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), - s, sizeof s); - printf("client: connecting to %s\n", s); - - freeaddrinfo(servinfo); // all done with this structure - - // read file + if ((sockfd = createConnection(ip, port)) == -1) return; fp = fopen(localfilename.c_str(), "rb"); if (fp == NULL) { printf("Could not open file to send."); @@ -272,31 +112,18 @@ void TcpSocket::sendFile(string ip, string port, FileObject f(localfilename); Messages msg(PUT, getFileMetadata(size, f.checksum, sdfsfilename, localfilename, remoteLocalfilename)); string payload = msg.toString(); - if (send(sockfd, payload.c_str(), strlen(payload.c_str()), 0) == -1) { perror("send"); } sleep(1); - while (!feof(fp) && size > 0) { - if (size < DEFAULT_TCP_BLKSIZE) { - bzero(buf, sizeof(buf)); - numbytes = fread(buf, sizeof(char), size, fp); - //printf("11 numbytes %d, size %d\n", numbytes, size); - if (send(sockfd, buf, numbytes, 0) == -1) { - perror("send"); - } - } else { - bzero(buf, sizeof(buf)); - numbytes = fread(buf, sizeof(char), DEFAULT_TCP_BLKSIZE, fp); - //printf("22 numbytes %d, size %d\n", numbytes, size); - size -= numbytes; - //printf("33 numbytes %d, size %d\n", numbytes, size); - if (send(sockfd, buf, numbytes, 0) == -1) { - perror("send"); - } + sendSize = (size < DEFAULT_TCP_BLKSIZE) ? size : DEFAULT_TCP_BLKSIZE; + bzero(buf, sizeof(buf)); + numbytes = fread(buf, sizeof(char), sendSize, fp); + size -= numbytes; + if (send(sockfd, buf, numbytes, 0) == -1) { + perror("send"); } - } fclose(fp); close(sockfd); @@ -305,114 +132,122 @@ void TcpSocket::sendFile(string ip, string port, void TcpSocket::sendMessage(string ip, string port, string message) { int sockfd; - struct addrinfo hints, *servinfo, *p; - int rv; - char s[INET6_ADDRSTRLEN]; + if ((sockfd = createConnection(ip, port)) == -1) return; + if (send(sockfd, message.c_str(), strlen(message.c_str()), 0) == -1) { + perror("send"); + } + close(sockfd); +} +int TcpSocket::createConnection(string ip, string port){ + int sockfd, rv; + struct addrinfo hints, *servinfo, *p; memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; - if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return; + return -1; } - - // loop through all the results and connect to the first we can for (p = servinfo; p != NULL; p = p->ai_next) { if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { perror("client: socket"); continue; } - if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { close(sockfd); perror("client: connect"); continue; } - break; } - if (p == NULL) { fprintf(stderr, "client: failed to connect\n"); - return; + return -1; } - - inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), - s, sizeof s); - //printf("client: connecting to %s\n", s); - - freeaddrinfo(servinfo); // all done with this structure - - if (send(sockfd, message.c_str(), strlen(message.c_str()), 0) == -1) { - perror("send"); - } - - close(sockfd); + freeaddrinfo(servinfo); + return sockfd; } -// copy from Node.cpp -vector<string> TcpSocket::splitString(string s, string delimiter){ - vector<string> result; - size_t pos_start = 0, pos_end, delim_len = delimiter.length(); - string token; - - while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { - token = s.substr (pos_start, pos_end - pos_start); - pos_start = pos_end + delim_len; - result.push_back (token); - } - - result.push_back (s.substr (pos_start)); - return result; -} - -// void *runTcpServer(void *tcpSocket) -// { -// TcpSocket* tcp; -// tcp = (TcpSocket*) tcpSocket; -// tcp->bindServer(TCPPORT); -// pthread_exit(NULL); -// } - -// void *runTcpClient(void *tcpSocket) -// { -// TcpSocket* tcp; -// tcp = (TcpSocket*) tcpSocket; -// sleep(1); -// // testing election -// Messages msg(ELECTION, "my_id"); -// tcp->sendMessage("127.0.0.1", TCPPORT, msg.toString()); - -// // testing to send file -// for (int i=0; i<2; i++) { -// sleep(1); -// tcp->sendFile("127.0.0.1", TCPPORT, "file_example_MP3_700KB.mp3"); -// } -// pthread_exit(NULL); -// } - -/*int main(int argc, char *argv[]) -{ - TcpSocket *tcpSocket = new TcpSocket(); - - pthread_t threads[2]; - - int rc; +int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP){ + char buf[DEFAULT_TCP_BLKSIZE]; + int numbytes = 0; + Messages msg(payloadMessage); + switch (msg.type) { + case ELECTION: + case ELECTIONACK: { + qMessages.push(payloadMessage); + break; + } + case PUT: { + FILE *fp; + int filesize = 0, byteReceived = 0; + string sdfsfilename = "", incomingChecksum = "", remoteLocalname = "", overwriteFilename = ""; + // format: size,checksum,sdfsfilename + vector<string> fields = splitString(msg.payload, ","); + if (fields.size() >= 5) { + filesize = stoi(fields[0]); + incomingChecksum = fields[1]; + sdfsfilename = fields[2]; + remoteLocalname = fields[3]; + overwriteFilename = fields[4]; + } + cout << "file is " << sdfsfilename << " with size " << filesize << " and checksum " << incomingChecksum << endl; + time_t fileTimestamp; + time(&fileTimestamp); + string localfilename = sdfsfilename+"_"+to_string(fileTimestamp); + if (overwriteFilename.compare("") != 0) { + localfilename = overwriteFilename; + cout << "it's GET with filename " << overwriteFilename << endl; + } + cout << "backup filename " << localfilename << endl; + fp = fopen(localfilename.c_str(), "wb"); + if (fp == NULL) { + cout << "file error" << endl; + close(sockfd); + return 1; + } - if ((rc = pthread_create(&threads[0], NULL, runTcpServer, (void *)tcpSocket)) != 0) { - cout << "Error:unable to create thread," << rc << endl; - exit(-1); - } + bzero(buf, sizeof(buf)); + while ((numbytes=recv(sockfd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) { + fwrite(buf, sizeof(char), numbytes, fp); + byteReceived += numbytes; + if (byteReceived >= filesize) { + break; + } + bzero(buf, sizeof(buf)); + } + cout << "we have all the file, finishing this connections" << endl; + fclose(fp); + + FileObject f(localfilename); + if(incomingChecksum.compare(f.checksum) != 0){ + cout << "[ERROR] FILE CORRUPTED" << endl; + // TODO: Handel file corruption here + } else { + Messages putack(PUTACK, returnIP + "::" + sdfsfilename + "::" + localfilename+"::"+remoteLocalname); + regMessages.push(putack.toString()); + } - if ((rc = pthread_create(&threads[1], NULL, runTcpClient, (void *)tcpSocket)) != 0) { - cout << "Error:unable to create thread," << rc << endl; - exit(-1); + break; + } + case DNSANS: + case ACK: + case PUTACK: + case LEADERACK: + case REREPLICATE: + case REREPLICATEGET: + case DNSGET: + case DELETE: + case GETNULL: + case DNS:{ + cout << "Type: " << msg.type << " payloadMessage: " << payloadMessage << endl; + regMessages.push(payloadMessage); //handle from queue + break; + } + default: + break; } - - pthread_exit(NULL); - return 0; -}*/ \ No newline at end of file +} diff --git a/src/TestBench.cpp b/src/TestBench.cpp new file mode 100644 index 0000000..75b080c --- /dev/null +++ b/src/TestBench.cpp @@ -0,0 +1,164 @@ +#include "../inc/Node.h" + +void computeAndPrintBW(Node * n, double diff) +{ +#ifdef LOG_VERBOSE + cout << "total " << n->udpServent->byteSent << " bytes sent" << endl; + cout << "total " << n->udpServent->byteReceived << " bytes received" << endl; + printf("elasped time is %.2f s\n", diff); +#endif + if (diff > 0) { + double bandwidth = n->udpServent->byteSent/diff; + string message = "["+to_string(n->localTimestamp)+"] B/W usage: "+to_string(bandwidth)+" bytes/s"; +#ifdef LOG_VERBOSE + printf("%s\n", message.c_str()); +#endif + n->logWriter->printTheLog(BANDWIDTH, message); + } +} + +void debugMembershipList(Node * n) +{ + cout << "Membership list [" << n->membershipList.size() << "]:" << endl; + if (n->isLeader) { + cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; + } else { + cout << "[T] IP/Port/JoinedTime:Heartbeat/LocalTimestamp/FailFlag" << endl; + } + string message = ""; + + for (auto& element: n->membershipList) { + tuple<string,string,string> keyTuple = element.first; + tuple<int, int, int> valueTuple = element.second; + + if (n->nodeInformation.ip.compare(get<0>(keyTuple))==0) { // Myself + if (n->isLeader) { + message += "[L/M] "; + } else { + message += "[M] "; + } + } else if (n->leaderIP.compare(get<0>(keyTuple))==0) { + message += "[L] "; + } else { + if (n->isLeader) { + message += " "; + } else { + message += " "; + } + } + + message += get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple); + message += ": "+to_string(get<0>(valueTuple))+"/"+to_string(get<1>(valueTuple))+"/"+to_string(get<2>(valueTuple))+"\n"; + } + cout << message.c_str() << endl; + n->logWriter->printTheLog(MEMBERS, message); +} + +void debugSDFSFileList(Node * n) { + cout << "sdfsfilename ---> positions,..." << endl; + for (auto& element: n->fileList) { + cout << element.first << " ---> "; + for (uint i=0; i<element.second.size(); i++) { + cout << element.second[i]; + if (i == element.second.size()-1) { + continue; + } else { + cout << ", "; + } + } + cout << endl; + } +} + +void *runServerTEST(void *udpSocket) +{ + UdpSocket* udp; + udp = (UdpSocket*) udpSocket; + udp->bindServer("4950"); + pthread_exit(NULL); +} + +void *runClientTEST(void *udpSocket) +{ + UdpSocket* udp; + udp = (UdpSocket*) udpSocket; + for (int i = 0; i < 3; i++) { + sleep(2); + udp->sendMessage("127.0.0.1", "4950", "test message"); + } + pthread_exit(NULL); +} + +void testMessages(UdpSocket* udp) +{ + sleep(2); + for (int j = 0; j < 4; j++) { + udp->sendMessage("127.0.0.1", PORT, "test message "+to_string(j)); + } + sleep(1); +} + +void *runTcpServerTEST(void *tcpSocket) +{ + TcpSocket* tcp; + tcp = (TcpSocket*) tcpSocket; + tcp->bindServer(TCPPORT); + pthread_exit(NULL); +} + +void *runTcpClientTEST(void *tcpSocket) +{ + TcpSocket* tcp; + tcp = (TcpSocket*) tcpSocket; + sleep(1); + // testing election + Messages msg(ELECTION, "my_id"); + tcp->sendMessage("127.0.0.1", TCPPORT, msg.toString()); + + // testing to send file + for (int i=0; i<2; i++) { + sleep(1); + tcp->sendFile("127.0.0.1", TCPPORT, "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3", "file_example_MP3_700KB.mp3"); + } + pthread_exit(NULL); +} + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} +/* +int main(int argc, char *argv[]) +{ + int isTcp = 0; + if (argc > 1 && (strcmp(argv[1], "tcp") == 0)) isTcp = 1; + UdpSocket *udpSocket = new UdpSocket(); + TcpSocket *tcpSocket = new TcpSocket(); + pthread_t threads[2]; + int rc; + void * socket = (isTcp) ? (void*)tcpSocket : (void*)udpSocket; + void* (*serverFunc)(void*) = (isTcp) ? runTcpServerTEST : runServerTEST; + void* (*clientFunc)(void*) = (isTcp) ? runTcpClientTEST : runClientTEST; + if ((rc = pthread_create(&threads[0], NULL, serverFunc, socket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + if ((rc = pthread_create(&threads[1], NULL, clientFunc, socket)) != 0) { + cout << "Error:unable to create thread," << rc << endl; + exit(-1); + } + + if (!isTcp){ + if (strcmp(argv[1], "client") == 0) { + udpSocket->sendMessage("127.0.0.1", "4950", "test message"); + } else { + udpSocket->bindServer("4950"); + } + } + pthread_exit(NULL); + return 0; +} +*/ diff --git a/src/Threads.cpp b/src/Threads.cpp index c6d0196..ede4363 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -1,24 +1,13 @@ #include "../inc/Node.h" -/** - * - * runUdpServer: Enqueue each heartbeat it receives - * - **/ -void *runUdpServer(void *udpSocket) +void *runUdpServer(void *udpSocket) { - // acquire UDP object UdpSocket* udp; udp = (UdpSocket*) udpSocket; udp->bindServer(PORT); pthread_exit(NULL); } -/** - * - * runTcpServer: Enqueue each request it receives - * - **/ void *runTcpServer(void *tcpSocket) { TcpSocket* tcp; @@ -27,21 +16,6 @@ void *runTcpServer(void *tcpSocket) pthread_exit(NULL); } -vector<string> splitString(string s, string delimiter){ - vector<string> result; - size_t pos_start = 0, pos_end, delim_len = delimiter.length(); - string token; - - while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { - token = s.substr (pos_start, pos_end - pos_start); - pos_start = pos_end + delim_len; - result.push_back (token); - } - - result.push_back (s.substr (pos_start)); - return result; -} - void *runTcpSender(void *tcpSocket) { TcpSocket* tcp; @@ -50,10 +24,8 @@ void *runTcpSender(void *tcpSocket) while (!tcp->pendSendMessages.empty()) { vector<string> msgSplit = splitString(tcp->pendSendMessages.front(), "::"); if (msgSplit.size() >= 4) { - string nodeIP = msgSplit[0]; - string localfilename = msgSplit[1]; - string sdfsfilename = msgSplit[2]; - string remoteLocalfilename = msgSplit[3]; + string nodeIP = msgSplit[0], localfilename = msgSplit[1]; + string sdfsfilename = msgSplit[2], remoteLocalfilename = msgSplit[3]; cout << "[DOSEND] nodeIP " << nodeIP << ", localfilename " << localfilename; cout << ", sdfsfilename " << sdfsfilename << ", remoteLocalfilename " << remoteLocalfilename << endl; tcp->sendFile(nodeIP, TCPPORT, localfilename, sdfsfilename, remoteLocalfilename); @@ -64,41 +36,22 @@ void *runTcpSender(void *tcpSocket) pthread_exit(NULL); } -void testMessages(UdpSocket* udp) -{ - sleep(2); - for (int j = 0; j < 4; j++) { - udp->sendMessage("127.0.0.1", PORT, "test message "+to_string(j)); - } - sleep(1); -} - -/** - * - * runSenderThread: - * 1. handle messages in queue - * 2. merge membership list - * 3. prepare to send heartbeating - * 4. do gossiping - * - **/ -void *runSenderThread(void *node) +void *runSenderThread(void *node) { // acquire node object Node *nodeOwn = (Node *) node; - nodeOwn->activeRunning = true; - // step: joining to the group -> just heartbeating to introducer - Member introducer(INTRODUCER, PORT); + // heartbeat to introducer to join the system + Member introducer(getIP(INTRODUCER), PORT); nodeOwn->joinSystem(introducer); while (nodeOwn->activeRunning) { - + // 1. deepcopy and handle queue, and // 2. merge membership list nodeOwn->listenToHeartbeats(); - + // Volunteerily leave if(nodeOwn->activeRunning == false){ pthread_exit(NULL); @@ -106,26 +59,25 @@ void *runSenderThread(void *node) //add failure detection in between listening and sending out heartbeats nodeOwn->failureDetection(); - + // keep heartbeating nodeOwn->localTimestamp++; nodeOwn->heartbeatCounter++; nodeOwn->updateNodeHeartbeatAndTime(); - - // 3. prepare to send heartbeating, and + + // 3. prepare to send heartbeating, and // 4. do gossiping nodeOwn->heartbeatToNode(); // 5. check for regular TCP messages - nodeOwn->processRegMessages(); + nodeOwn->handleTcpMessage(); // 6. check leader (If hashRing is sent via heartbeat, then we have a leader) if (!nodeOwn->checkLeaderExist()) { // If no leader - nodeOwn->processTcpMessages(); + nodeOwn->tcpElectionProcessor(); if (nodeOwn->findWillBeLeader()) { - //cout << "Try to propose to be leader" << endl; - if (nodeOwn->localTimestamp-nodeOwn->electedTime > T_election) { // when entering to stable state - if (nodeOwn->localTimestamp-nodeOwn->proposedTime > T_election) { + if (nodeOwn->localTimestamp - nodeOwn->electedTime > T_election) { // when entering to stable state + if (nodeOwn->localTimestamp - nodeOwn->proposedTime > T_election) { nodeOwn->proposeToBeLeader(); nodeOwn->proposedTime = nodeOwn->localTimestamp; } @@ -133,15 +85,11 @@ void *runSenderThread(void *node) } } - // for debugging - //nodeOwn->debugMembershipList(); + // 7. bandwidth and mode switch handled (optional) time_t endTimestamp; time(&endTimestamp); double diff = difftime(endTimestamp, nodeOwn->startTimestamp); - nodeOwn->computeAndPrintBW(diff); -#ifdef LOG_VERBOSE - cout << endl; -#endif + computeAndPrintBW(nodeOwn, diff); if (nodeOwn->prepareToSwitch) { cout << "[SWITCH] I am going to swtich my mode in " << T_switch << "s" << endl; nodeOwn->SwitchMyMode(); @@ -149,6 +97,5 @@ void *runSenderThread(void *node) usleep(T_period); } } - pthread_exit(NULL); -} \ No newline at end of file +} diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 5d17e06..c7997ab 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -5,30 +5,19 @@ UdpSocket::UdpSocket(){ byteReceived = 0; } -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); - } - - return &(((struct sockaddr_in6*)sa)->sin6_addr); -} - void UdpSocket::bindServer(string port) { int sockfd; struct addrinfo hints, *servinfo, *p; int rv; - int numbytes; + int numbytes = 0; struct sockaddr_storage their_addr; char buf[MAXBUFLEN]; socklen_t addr_len; - //char s[INET6_ADDRSTRLEN]; - memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; // set to AF_INET to force IPv4 - hints.ai_socktype = SOCK_DGRAM; + hints.ai_socktype = SOCK_DGRAM; // UDP hints.ai_flags = AI_PASSIVE; // use my IP if ((rv = getaddrinfo(NULL, port.c_str(), &hints, &servinfo)) != 0) { @@ -44,7 +33,7 @@ void UdpSocket::bindServer(string port) continue; } - if (bind(sockfd, p->ai_addr, p->ai_addrlen) < 0) { + if (::bind(sockfd, p->ai_addr, p->ai_addrlen) < 0) { close(sockfd); perror("bindServer: bind"); continue; @@ -72,10 +61,7 @@ void UdpSocket::bindServer(string port) //cout << "bindServer: packet is " << numbytes << " bytes long" << endl; buf[numbytes] = '\0'; //cout << ": " << buf << endl; - - // put into queue qMessages.push(buf); - bzero(buf, sizeof(buf)); } @@ -87,7 +73,7 @@ void UdpSocket::sendMessage(string ip, string port, string message) int sockfd; struct addrinfo hints, *servinfo, *p; int rv; - int numbytes; + int numbytes = 0; int lucky_number; memset(&hints, 0, sizeof hints); @@ -129,55 +115,6 @@ void UdpSocket::sendMessage(string ip, string port, string message) } freeaddrinfo(servinfo); - //cout << "sendMessage: sent " << numbytes << " bytes to " << ip << endl; close(sockfd); } - -// void *runServer(void *udpSocket) -// { -// UdpSocket* udp; -// udp = (UdpSocket*) udpSocket; -// udp->bindServer("4950"); -// pthread_exit(NULL); -// } - -// void *runClient(void *udpSocket) -// { -// UdpSocket* udp; -// udp = (UdpSocket*) udpSocket; -// for (int i = 0; i < 3; i++) { -// sleep(2); -// udp->sendMessage("127.0.0.1", "4950", "test message"); -// } -// pthread_exit(NULL); -// } - -/*int main(int argc, char *argv[]) -{ - UdpSocket *udpSocket = new UdpSocket(); - - if (strcmp(argv[1], "client") == 0) { - udpSocket.send_message("127.0.0.1", "4950", "test message"); - } else { - udpSocket.bind_server("4950"); - } - - pthread_t threads[NUM_THREADS]; - - int rc; - - if ((rc = pthread_create(&threads[0], NULL, runServer, (void *)udpSocket)) != 0) { - cout << "Error:unable to create thread," << rc << endl; - exit(-1); - } - - if ((rc = pthread_create(&threads[1], NULL, runClient, (void *)udpSocket)) != 0) { - cout << "Error:unable to create thread," << rc << endl; - exit(-1); - } - - pthread_exit(NULL); - - return 0; -}*/ \ No newline at end of file diff --git a/src/Utils.cpp b/src/Utils.cpp new file mode 100644 index 0000000..e427357 --- /dev/null +++ b/src/Utils.cpp @@ -0,0 +1,48 @@ +#include "../inc/Utils.h" + +vector<string> splitString(string s, string delimiter){ + vector<string> result; + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + while ((pos_end = s.find (delimiter, pos_start)) != string::npos) { + token = s.substr (pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + result.push_back(token); + } + result.push_back (s.substr (pos_start)); + return result; +} + +string getIP(){ + char host[100] = {0}; + if (gethostname(host, sizeof(host)) < 0) { + perror("error: gethostname"); + } + return getIP(host); +} + +string getIP(const char * host){ + struct hostent *hp; + if (!(hp = gethostbyname(host)) || (hp->h_addr_list[0] == NULL)) { + perror("error: no ip"); + exit(1); + } + return inet_ntoa(*(struct in_addr*)hp->h_addr_list[0]); +} + +int new_thread_id() { + int rv; + pthread_mutex_lock(&thread_counter_lock); + rv = ++thread_counter; + pthread_mutex_unlock(&thread_counter_lock); + return rv; +} + +bool isInVector(vector<int> v, int i){ + for(int element: v){ + if(element == i){ + return true; + } + } + return false; +} diff --git a/src/main.cpp b/src/main.cpp index 3e20e04..1e15562 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,49 +1,23 @@ #include "../inc/Node.h" -int main(int argc, char *argv[]) +int main(int argc, char *argv[]) { - pthread_t threads[4]; + pthread_t threads[5]; int rc; Node *node; - cout << "Mode: " << ALL2ALL << "->All-to-All, "; cout << GOSSIP << "->Gossip-style" << endl; - if (argc < 2) { - node = new Node(); - } else { + if (argc < 2) node = new Node(); + else { ModeType mode = ALL2ALL; - if (atoi(argv[1]) == 1) { - mode = GOSSIP; - } + if (atoi(argv[1]) == 1) mode = GOSSIP; node = new Node(mode); } cout << "Running mode: " << node->runningMode << endl; - cout << endl; - - char host[100] = {0}; - struct hostent *hp; - - if (gethostname(host, sizeof(host)) < 0) { - cout << "error: gethostname" << endl; - return 0; - } - - if ((hp = gethostbyname(host)) == NULL) { - cout << "error: gethostbyname" << endl; - return 0; - } - - if (hp->h_addr_list[0] == NULL) { - cout << "error: no ip" << endl; - return 0; - } - //cout << "hostname " << hp->h_name << endl; - Member own(inet_ntoa(*(struct in_addr*)hp->h_addr_list[0]), PORT, - node->localTimestamp, node->heartbeatCounter); + Member own(getIP(), PORT, node->localTimestamp, node->heartbeatCounter); node->nodeInformation = own; cout << "[NEW] Starting Node at " << node->nodeInformation.ip << "/"; cout << node->nodeInformation.port << "..." << endl; - int *ret; string s; string cmd; @@ -54,17 +28,14 @@ int main(int argc, char *argv[]) cout << "Error:unable to create thread," << rc << endl; exit(-1); } - if ((rc = pthread_create(&threads[2], NULL, runTcpServer, (void *)node->tcpServent)) != 0) { cout << "Error:unable to create thread," << rc << endl; exit(-1); } - if ((rc = pthread_create(&threads[4], NULL, runTcpSender, (void *)node->tcpServent)) != 0) { cout << "Error:unable to create thread," << rc << endl; exit(-1); } - node->localFilelist.clear(); // for testing /*node->localFilelist["sdfsfilename1"] = "localfilename1"; node->localFilelist["sdfsfilename2"] = "localfilename2";*/ @@ -96,26 +67,21 @@ int main(int argc, char *argv[]) node->membershipList.clear(); node->restartElection(); // clean up leader info pthread_join(threads[1], (void **)&ret); - string message = "["+to_string(node->localTimestamp)+"] node "+node->nodeInformation.ip+"/"+node->nodeInformation.port+" is left"; cout << "[LEAVE]" << message.c_str() << endl; node->logWriter->printTheLog(LEAVE, message); sleep(2); // wait for logging - joined = false; } else if(cmd == "id"){ cout << "ID: (" << node->nodeInformation.ip << ", " << node->nodeInformation.port << ")" << endl; } else if(cmd == "member"){ - node->debugMembershipList(); + debugMembershipList(node); } else if(cmd == "switch") { - if(joined){ - node->requestSwitchingMode(); - } + if(joined) node->requestSwitchingMode(); } else if(cmd == "mode") { cout << "In " << node->runningMode << " mode" << endl; } else if(cmd == "exit"){ - cout << "exiting..." << endl; - break; + cout << "exiting..." << endl; break; } else if (cmd == "put" && joined){ // MP2 op1 if(cmdLineInput.size() < 3){ cout << "USAGE: put filename sdfsfilename" << endl; @@ -125,13 +91,12 @@ int main(int argc, char *argv[]) string localfilename = cmdLineInput[1]; string sdfsfilename = cmdLineInput[2]; Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); - cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; + cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl; if (access(localfilename.c_str(), F_OK) != -1) { node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); } else { cout << "[PUT] The file " << localfilename << " is not existed" << endl; } - } else { cout << "[BLACKOUT] Leader cannot accept the request" << endl; } @@ -143,24 +108,21 @@ int main(int argc, char *argv[]) string sdfsfilename = cmdLineInput[1]; string localfilename = cmdLineInput[2]; if (node->localFilelist.find(sdfsfilename) != node->localFilelist.end()) { - // found cout << "[GET] You have sdfsfilename " << sdfsfilename << " as " << node->localFilelist[sdfsfilename] << endl; continue; } if (node->fileList.find(sdfsfilename) == node->fileList.end()) { - // not found cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl; continue; } else { if (node->fileList[sdfsfilename].size()==1 && node->isBlackout) { - // in 1st pass, the Leader is backup, wait for a minute cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl; continue; } } Messages outMsg(DNSGET, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename); - cout << "[GET] Got sdfsfilename: " << sdfsfilename << " with localfilename: " << localfilename << endl; + cout << "[GET] Got sdfsfilename: " << sdfsfilename << " with localfilename: " << localfilename << endl; node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); } else if (cmd == "delete" && joined){ // MP2 op3 if(cmdLineInput.size() < 2){ @@ -170,7 +132,7 @@ int main(int argc, char *argv[]) if (!node->isBlackout) { string sdfsfilename = cmdLineInput[1]; Messages outMsg(DELETE, node->nodeInformation.ip + "::" + sdfsfilename); - cout << "[DELETE] Got sdfsfilename: " << sdfsfilename << endl; + cout << "[DELETE] Got sdfsfilename: " << sdfsfilename << endl; node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString()); } else { cout << "[BLACKOUT] Leader cannot accept the request" << endl; @@ -189,7 +151,7 @@ int main(int argc, char *argv[]) } else if (cmd == "store"){ // MP2 op5 node->listLocalFiles(); } else if (cmd == "lsall"){ - node->debugSDFSFileList(); + debugSDFSFileList(node); } else { cout << "[join] join to a group via fixed introducer" << endl; cout << "[leave] leave the group" << endl; @@ -205,7 +167,7 @@ int main(int argc, char *argv[]) cout << "[ls] list all machine (VM) addresses where this file is currently being stored" << endl; cout << "[lsall] list all sdfsfilenames with positions" << endl; cout << "[store] list all files currently being stored at this machine" << endl << endl; - } // More command line interface if wanted + } // More command line interface if wanted } pthread_kill(threads[0], SIGUSR1); @@ -213,8 +175,6 @@ int main(int argc, char *argv[]) if(joined){ pthread_kill(threads[1], SIGUSR1); } - pthread_exit(NULL); - return 1; -} \ No newline at end of file +} -- GitLab