diff --git a/includes/message.hpp b/includes/message.hpp index 5c1e9a88951efdf2aba55f4e02f2d14cf90decb5..e070cb94002297157b1b30d29d5e6777ef721af1 100644 --- a/includes/message.hpp +++ b/includes/message.hpp @@ -21,6 +21,8 @@ enum class MessageType { DeleteFile = 12, Get = 13, GetResponse = 14, + Write = 15, + WriteAck = 16, }; class Message { diff --git a/includes/node.hpp b/includes/node.hpp index e8043858e5fda0108870ac06bf56b30fbe884636..f9aa95bf4267a5266b1e433357367584e13d69dd 100644 --- a/includes/node.hpp +++ b/includes/node.hpp @@ -38,6 +38,9 @@ public: void HandleGet(const std::string& sdfsfilename); void HandleGetRequest(const std::string& file_name, const std::string& source); void HandleGetResponse(const std::string& sdfsfileinfo); + void HandleWrite(const std::string& source); + void HandleWriteAck(); + private: void LogMemberJoinIntroducer(const Member& member); @@ -46,7 +49,7 @@ private: void LogMemberLeave(const Member& member); void LogMemberLeaveForwarded(const Member& member); void LogMemberFailure(const Member& member); - void LogPut(const std::set<int>& replicas, const std::string& sdfsfile); + void LogPut(const std::set<Member>& replicas, const std::string& sdfsfile); void LogReplicate(const std::string& file_name); void LogMemberAddFile(const std::string& file_name); void LogFileDelete(const std::vector<Member>& replicas, const std::string& file_name); @@ -80,6 +83,10 @@ private: std::string current_local_file_name_ = ""; std::map<std::string, std::string> current_sdfs_version_file_info_; int requested_num_versions_ = 1; + std::string write_file_info_ = ""; + std::string write_sdfsfilename_ = ""; + std::vector<Member> write_replica_members_; + int current_num_write_acks_ = 0; std::chrono::time_point<std::chrono::steady_clock> start_time_; }; diff --git a/src/node.cpp b/src/node.cpp index d597bcb73195769aaa92e26adf1055d64f4d4604..e384d08003077d0a2367c79332eeca2186f2b0aa 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -285,6 +285,20 @@ void Node::MessageHandler() { break; } + case MessageType::Write: { + if (is_member_) { + HandleWrite(sender); + } + + break; + } + case MessageType::WriteAck: { + if (is_member_) { + HandleWriteAck(); + } + + break; + } default: { LogMisc("ERROR", "Unknown MessageType '" + std::to_string(static_cast<unsigned int>(received_message.GetType())) + "' found, exiting message handler"); exit(1); @@ -501,23 +515,21 @@ void Node::HandlePut(const std::string& localfilename, const std::string& sdfsfi } } - // Send Replicate to each replica - Message replica_outbound_message{MessageType::Replicate, {}, file_info}; - std::string replica_serialized_outbound_message = Message::Serialize(replica_outbound_message); - + // TODO: send write + Message write_outbound_message{MessageType::Write, {}, ""}; + std::string write_serialized_outbound_message = Message::Serialize(write_outbound_message); for (const Member& member : replicas_members) { - Socket::TrySendMessage(replica_serialized_outbound_message, cs425::PORT, member.address, loss_rate_); + Socket::TrySendMessage(write_serialized_outbound_message, cs425::PORT, member.address, loss_rate_); } - // TODO::ReplicaAcks + write_file_info_ = file_info; + write_replica_members_ = replicas_members; + write_sdfsfilename_ = sdfsfilename; - Message addfile_outbound_message{MessageType::AddNewFile, replicas_members, sdfsfilename}; - std::string addfile_serialized_outbound_message = Message::Serialize(addfile_outbound_message); - for (const Member& successor : GetSuccessors()) { - Socket::TrySendMessage(addfile_serialized_outbound_message, cs425::PORT, successor.address, loss_rate_); - } + // TODO: receive write, send write ack + // TODO: receive write ack, on W acks send replicate - LogPut(replicas, sdfsfilename); + // Send Replicate to each replica } void Node::HandleReplicate(const std::string& file_info, const std::string& source) { @@ -555,8 +567,6 @@ void Node::HandleReplicate(const std::string& file_info, const std::string& sour return; } - // TODO: Send Replicate Ack to source - self_.files_present += file_name + ","; files_.insert(file_name); std::cout << "sender: " << source << std::endl; @@ -634,8 +644,6 @@ void Node::HandleFileDelete(const std::string& sdfsfilename) { Socket::TrySendMessage(serialized_outbound_message, cs425::PORT, member.address, loss_rate_); } - // TODO: Delete Ack - // Forward to successors Message deletefile_outbound_message{MessageType::DeleteFile, members, sdfsfilename}; std::string deletefile_serialized_outbound_message = Message::Serialize(deletefile_outbound_message); @@ -659,8 +667,6 @@ void Node::HandleDelete(const std::string& sdfsfilename) { files_.erase(file_name); - // TODO: Delete ACK - LogDelete(file_name); } @@ -830,6 +836,43 @@ void Node::HandleGetResponse(const std::string& sdfsfileinfo) { current_local_file_name_ = ""; } } +void Node::HandleWrite(const std::string& source) { + Message outbound_message{MessageType::WriteAck, {}, ""}; + std::string serialized_outbound_message = Message::Serialize(outbound_message); + Socket::TrySendMessage(serialized_outbound_message, cs425::PORT, source, loss_rate_); + +} + +void Node::HandleWriteAck() { + if (write_sdfsfilename_ == "") { + return; + } + + current_num_write_acks_++; + if (current_num_write_acks_ < std::min(cs425::W, (int)members_.size())) { + return; + } + Message replica_outbound_message{MessageType::Replicate, {}, write_file_info_}; + std::string replica_serialized_outbound_message = Message::Serialize(replica_outbound_message); + + for (const Member& member : write_replica_members_) { + Socket::TrySendMessage(replica_serialized_outbound_message, cs425::PORT, member.address, loss_rate_); + } + + Message addfile_outbound_message{MessageType::AddNewFile, write_replica_members_, write_sdfsfilename_}; + std::string addfile_serialized_outbound_message = Message::Serialize(addfile_outbound_message); + for (const Member& successor : GetSuccessors()) { + Socket::TrySendMessage(addfile_serialized_outbound_message, cs425::PORT, successor.address, loss_rate_); + } + + std::set<Member> replica_members(write_replica_members_.begin(), write_replica_members_.end()); + LogPut(replica_members, write_sdfsfilename_); + + write_replica_members_.clear(); + write_sdfsfilename_ = ""; + write_file_info_ = ""; + current_num_write_acks_ = 0; +} void Node::LogMemberJoinIntroducer(const Member& member) { std::cout << "---------------------------------------------------------------" << std::endl; @@ -880,10 +923,10 @@ void Node::LogMemberFailure(const Member& member) { PrintMembers(); } -void Node::LogPut(const std::set<int>& replicas, const std::string& sdfsfile) { +void Node::LogPut(const std::set<Member>& replicas, const std::string& sdfsfile) { std::cout << "---------------------------------------------------------------" << std::endl; - for (int replica : replicas) { - std::cout << "[PUT] - " << members_[replica].id << " - Time = " << GetElapsedSeconds() << "s" << std::endl; + for (Member replica : replicas) { + std::cout << "[PUT] - " << replica.id << " - Time = " << GetElapsedSeconds() << "s" << std::endl; } std::cout << "Putting file: " << sdfsfile << std::endl; }