Skip to content
Snippets Groups Projects
Commit 07e9153a authored by Arpitha Raghunandan's avatar Arpitha Raghunandan
Browse files

Add handle write quorum

parent 24a6daa4
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,8 @@ enum class MessageType { ...@@ -21,6 +21,8 @@ enum class MessageType {
DeleteFile = 12, DeleteFile = 12,
Get = 13, Get = 13,
GetResponse = 14, GetResponse = 14,
Write = 15,
WriteAck = 16,
}; };
class Message { class Message {
......
...@@ -38,6 +38,9 @@ public: ...@@ -38,6 +38,9 @@ public:
void HandleGet(const std::string& sdfsfilename); void HandleGet(const std::string& sdfsfilename);
void HandleGetRequest(const std::string& file_name, const std::string& source); void HandleGetRequest(const std::string& file_name, const std::string& source);
void HandleGetResponse(const std::string& sdfsfileinfo); void HandleGetResponse(const std::string& sdfsfileinfo);
void HandleWrite(const std::string& source);
void HandleWriteAck();
private: private:
void LogMemberJoinIntroducer(const Member& member); void LogMemberJoinIntroducer(const Member& member);
...@@ -46,7 +49,7 @@ private: ...@@ -46,7 +49,7 @@ private:
void LogMemberLeave(const Member& member); void LogMemberLeave(const Member& member);
void LogMemberLeaveForwarded(const Member& member); void LogMemberLeaveForwarded(const Member& member);
void LogMemberFailure(const Member& member); void LogMemberFailure(const Member& member);
void LogPut(const std::set<int>& replicas, const std::string& sdfsfile); void LogPut(const std::set<Member>& replicas, const std::string& sdfsfile);
void LogReplicate(const std::string& file_name); void LogReplicate(const std::string& file_name);
void LogMemberAddFile(const std::string& file_name); void LogMemberAddFile(const std::string& file_name);
void LogFileDelete(const std::vector<Member>& replicas, const std::string& file_name); void LogFileDelete(const std::vector<Member>& replicas, const std::string& file_name);
...@@ -80,6 +83,10 @@ private: ...@@ -80,6 +83,10 @@ private:
std::string current_local_file_name_ = ""; std::string current_local_file_name_ = "";
std::map<std::string, std::string> current_sdfs_version_file_info_; std::map<std::string, std::string> current_sdfs_version_file_info_;
int requested_num_versions_ = 1; int requested_num_versions_ = 1;
std::string write_file_info_ = "";
std::string write_sdfsfilename_ = "";
std::vector<Member> write_replica_members_;
int current_num_write_acks_ = 0;
std::chrono::time_point<std::chrono::steady_clock> start_time_; std::chrono::time_point<std::chrono::steady_clock> start_time_;
}; };
......
...@@ -285,6 +285,20 @@ void Node::MessageHandler() { ...@@ -285,6 +285,20 @@ void Node::MessageHandler() {
break; break;
} }
case MessageType::Write: {
if (is_member_) {
HandleWrite(sender);
}
break;
}
case MessageType::WriteAck: {
if (is_member_) {
HandleWriteAck();
}
break;
}
default: { default: {
LogMisc("ERROR", "Unknown MessageType '" + std::to_string(static_cast<unsigned int>(received_message.GetType())) + "' found, exiting message handler"); LogMisc("ERROR", "Unknown MessageType '" + std::to_string(static_cast<unsigned int>(received_message.GetType())) + "' found, exiting message handler");
exit(1); exit(1);
...@@ -501,23 +515,21 @@ void Node::HandlePut(const std::string& localfilename, const std::string& sdfsfi ...@@ -501,23 +515,21 @@ void Node::HandlePut(const std::string& localfilename, const std::string& sdfsfi
} }
} }
// Send Replicate to each replica // TODO: send write
Message replica_outbound_message{MessageType::Replicate, {}, file_info}; Message write_outbound_message{MessageType::Write, {}, ""};
std::string replica_serialized_outbound_message = Message::Serialize(replica_outbound_message); std::string write_serialized_outbound_message = Message::Serialize(write_outbound_message);
for (const Member& member : replicas_members) { for (const Member& member : replicas_members) {
Socket::TrySendMessage(replica_serialized_outbound_message, cs425::PORT, member.address, loss_rate_); Socket::TrySendMessage(write_serialized_outbound_message, cs425::PORT, member.address, loss_rate_);
} }
// TODO::ReplicaAcks write_file_info_ = file_info;
write_replica_members_ = replicas_members;
write_sdfsfilename_ = sdfsfilename;
Message addfile_outbound_message{MessageType::AddNewFile, replicas_members, sdfsfilename}; // TODO: receive write, send write ack
std::string addfile_serialized_outbound_message = Message::Serialize(addfile_outbound_message); // TODO: receive write ack, on W acks send replicate
for (const Member& successor : GetSuccessors()) {
Socket::TrySendMessage(addfile_serialized_outbound_message, cs425::PORT, successor.address, loss_rate_);
}
LogPut(replicas, sdfsfilename); // Send Replicate to each replica
} }
void Node::HandleReplicate(const std::string& file_info, const std::string& source) { void Node::HandleReplicate(const std::string& file_info, const std::string& source) {
...@@ -555,8 +567,6 @@ void Node::HandleReplicate(const std::string& file_info, const std::string& sour ...@@ -555,8 +567,6 @@ void Node::HandleReplicate(const std::string& file_info, const std::string& sour
return; return;
} }
// TODO: Send Replicate Ack to source
self_.files_present += file_name + ","; self_.files_present += file_name + ",";
files_.insert(file_name); files_.insert(file_name);
std::cout << "sender: " << source << std::endl; std::cout << "sender: " << source << std::endl;
...@@ -634,8 +644,6 @@ void Node::HandleFileDelete(const std::string& sdfsfilename) { ...@@ -634,8 +644,6 @@ void Node::HandleFileDelete(const std::string& sdfsfilename) {
Socket::TrySendMessage(serialized_outbound_message, cs425::PORT, member.address, loss_rate_); Socket::TrySendMessage(serialized_outbound_message, cs425::PORT, member.address, loss_rate_);
} }
// TODO: Delete Ack
// Forward to successors // Forward to successors
Message deletefile_outbound_message{MessageType::DeleteFile, members, sdfsfilename}; Message deletefile_outbound_message{MessageType::DeleteFile, members, sdfsfilename};
std::string deletefile_serialized_outbound_message = Message::Serialize(deletefile_outbound_message); std::string deletefile_serialized_outbound_message = Message::Serialize(deletefile_outbound_message);
...@@ -659,8 +667,6 @@ void Node::HandleDelete(const std::string& sdfsfilename) { ...@@ -659,8 +667,6 @@ void Node::HandleDelete(const std::string& sdfsfilename) {
files_.erase(file_name); files_.erase(file_name);
// TODO: Delete ACK
LogDelete(file_name); LogDelete(file_name);
} }
...@@ -830,6 +836,43 @@ void Node::HandleGetResponse(const std::string& sdfsfileinfo) { ...@@ -830,6 +836,43 @@ void Node::HandleGetResponse(const std::string& sdfsfileinfo) {
current_local_file_name_ = ""; current_local_file_name_ = "";
} }
} }
void Node::HandleWrite(const std::string& source) {
Message outbound_message{MessageType::WriteAck, {}, ""};
std::string serialized_outbound_message = Message::Serialize(outbound_message);
Socket::TrySendMessage(serialized_outbound_message, cs425::PORT, source, loss_rate_);
}
void Node::HandleWriteAck() {
if (write_sdfsfilename_ == "") {
return;
}
current_num_write_acks_++;
if (current_num_write_acks_ < std::min(cs425::W, (int)members_.size())) {
return;
}
Message replica_outbound_message{MessageType::Replicate, {}, write_file_info_};
std::string replica_serialized_outbound_message = Message::Serialize(replica_outbound_message);
for (const Member& member : write_replica_members_) {
Socket::TrySendMessage(replica_serialized_outbound_message, cs425::PORT, member.address, loss_rate_);
}
Message addfile_outbound_message{MessageType::AddNewFile, write_replica_members_, write_sdfsfilename_};
std::string addfile_serialized_outbound_message = Message::Serialize(addfile_outbound_message);
for (const Member& successor : GetSuccessors()) {
Socket::TrySendMessage(addfile_serialized_outbound_message, cs425::PORT, successor.address, loss_rate_);
}
std::set<Member> replica_members(write_replica_members_.begin(), write_replica_members_.end());
LogPut(replica_members, write_sdfsfilename_);
write_replica_members_.clear();
write_sdfsfilename_ = "";
write_file_info_ = "";
current_num_write_acks_ = 0;
}
void Node::LogMemberJoinIntroducer(const Member& member) { void Node::LogMemberJoinIntroducer(const Member& member) {
std::cout << "---------------------------------------------------------------" << std::endl; std::cout << "---------------------------------------------------------------" << std::endl;
...@@ -880,10 +923,10 @@ void Node::LogMemberFailure(const Member& member) { ...@@ -880,10 +923,10 @@ void Node::LogMemberFailure(const Member& member) {
PrintMembers(); PrintMembers();
} }
void Node::LogPut(const std::set<int>& replicas, const std::string& sdfsfile) { void Node::LogPut(const std::set<Member>& replicas, const std::string& sdfsfile) {
std::cout << "---------------------------------------------------------------" << std::endl; std::cout << "---------------------------------------------------------------" << std::endl;
for (int replica : replicas) { for (Member replica : replicas) {
std::cout << "[PUT] - " << members_[replica].id << " - Time = " << GetElapsedSeconds() << "s" << std::endl; std::cout << "[PUT] - " << replica.id << " - Time = " << GetElapsedSeconds() << "s" << std::endl;
} }
std::cout << "Putting file: " << sdfsfile << std::endl; std::cout << "Putting file: " << sdfsfile << std::endl;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment