Skip to content
Snippets Groups Projects
Commit 0c26b2eb authored by grizz9233's avatar grizz9233
Browse files

DistributedGroupMembership

parents
No related branches found
No related tags found
No related merge requests found
all: main
main: main.o
g++ -std=c++11 -pthread main.o -o main
main.o: main.cpp server.h config.h member.h
g++ -std=c++11 -pthread -c main.cpp
clean:
rm *.o
rm main
File added
//
// config.cpp
// DistributedGroupMembership
//
// Created by Tangrizzly on 2019/9/22.
// Copyright © 2019 Tangrizzly. All rights reserved.
//
#ifndef config_h
#define config_h
#include <string>
#include <unordered_set>
using namespace std;
unordered_set<string> introducers { // 1 4 5 8 9
"172.22.154.15", // 4
"172.22.156.15", // 8
"172.22.152.20", // 1
"172.22.154.16", // 5
"172.22.156.16", // 9
"172.22.152.21", // 2
"172.22.154.17", // 6
"172.22.156.17", // 10
"172.22.152.22", // 3
"172.22.154.18" // 7
};
#define LOG_PATH_FORMAT "../log/groupMembership%d.log"
#define PREDECESSOR 3 // heartbeat monitored
#define SUCCESSOR 3 // heartbeat receiver
#define PORT 8083
// TODO: reset time
#define HEARTBEAT_FREQUENCY 500000 // us = 0.500000s
#define CHECK_FREQUENCY 600000 // us = 0.600000s
#define T_FAIL 1600 // ms
#define BUFFER_SIZE 2048
#define T_OUT 100000 // us
#define HOSTNAME_SIZE 64
#endif /* config_h */
//
// main.cpp
// DistributedGroupMembership
//
// Created by Tangrizzly on 2019/9/22.
// Copyright © 2019 Tangrizzly. All rights reserved.
//
#include <iostream>
#include <unistd.h>
#include <limits.h>
#include <stdio.h>
#include <fstream>
#include <map>
#include "server.h"
int main(int argc, const char * argv[]) {
bool debug = false;
if (argc >= 2 && (strcmp(argv[1], "d") == 0)) {
cout << "debug" << endl;
debug = true;
}
Server server(debug);
thread receive_msg(&Server::receive, &server);
thread send_hb(&Server::send_heartbeat, &server);
thread check_crsh(&Server::check_crash, &server);
string str;
while (getline(cin, str)) {
if (str == "leave" && server.state()) {
server.leave();
check_crsh.join();
receive_msg.join();
send_hb.join();
} else if (str == "join" && !server.state()) {
server.init_membershiplist();
receive_msg = thread(&Server::receive, &server);
send_hb = thread(&Server::send_heartbeat, &server);
check_crsh = thread(&Server::check_crash, &server);
}
}
return 0;
}
//
// member.h
// DistributedGroupMembership
//
// Created by Tangrizzly on 2019/9/27.
// Copyright © 2019 Tangrizzly. All rights reserved.
//
#ifndef member_h
#define member_h
#include <chrono>
class Member {
private:
long long joinstmp;
long long hbstmp;
public:
long long getCurrtime() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
}
Member(long long _joinstmp):joinstmp(_joinstmp) {
hbstmp = getCurrtime();
}
Member() {
joinstmp = getCurrtime();
}
long long getJoinstmp() {
return joinstmp;
}
long long getHbstmp() {
return hbstmp;
}
void refreshHbstmp() {
hbstmp = getCurrtime();
}
};
#endif /* member_h */
//
// machine.h
// DistributedGroupMembership
//
// Created by Tangrizzly on 2019/9/27.
// Copyright © 2019 Tangrizzly. All rights reserved.
//
#ifndef server_h
#define server_h
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <stdio.h>
#include <unistd.h>
#include <mutex>
#include <thread>
#include <sstream>
#include <cstring>
#include <netdb.h>
#include <vector>
#include <unistd.h>
#include "config.h"
#include "member.h"
class Server {
private:
mutex mux;
// ostream* ofs;
string localIp;
map<string, Member> memberships;
enum GossipType {heartbeat, join, fail, intro, intro_rsp};
bool running;
int recvsockfd;
public:
void init_membershiplist() {
memberships[localIp] = Member();
int sockfd;
sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
memset((char(*)) &addr, 0, sizeof(addr));
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("Socket failed");
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
for (unordered_set<string>::iterator it = introducers.begin(); it != introducers.end(); it++) {
if (*it == localIp) {
continue;
}
if (inet_aton((*it).c_str(), &addr.sin_addr) == 0) {
perror("Assign ip failed");
continue;
}
string msg = to_string(intro) + ';';
if (sendto(sockfd, msg.c_str(), msg.length(), 0, (sockaddr*) &addr, sizeof(addr)) < 0) {
perror("Send introduction failed");
continue;
}
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = T_OUT;
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("Setsockopt failed");
continue;
}
// receive membership list
char buffer[BUFFER_SIZE];
ssize_t buflen;
if ((buflen = recvfrom(sockfd, buffer, BUFFER_SIZE, 0, (sockaddr*) &addr, &addrlen)) < 0) {
perror("Receive failed");
cout << *it + ": Timeout reached." << endl;
continue;
}
buffer[buflen] = '\0';
cout << "Get introducer respond: " << *it << endl;
msg = string(buffer);
cout << "Received message \"" << msg << "\"" << endl;
istringstream iss(msg);
string mode;
getline(iss, mode, ';');
if (static_cast<GossipType>(stoi(mode)) != intro_rsp) {
continue;
}
string hostname, joinstmp;
while (getline(iss, hostname, ';')) {
getline(iss, joinstmp, ';');
memberships[hostname] = Member(stoll(joinstmp.c_str()));
}
print_membershipList();
break;
}
close(sockfd);
running = true;
}
Server(bool debug) {
// get localIp
char localhost_c[HOSTNAME_SIZE];
gethostname(localhost_c, HOSTNAME_SIZE);
int machine_id = stoi(string(localhost_c).substr(15, 2));
char log_path_c[32];
sprintf(log_path_c, LOG_PATH_FORMAT, machine_id);
// fout = fout->open(string(log_path_c), ofstream::out);
char *IPbuffer;
hostent *host_entry;
host_entry = gethostbyname(localhost_c);
// To convert an Internet network
// address into ASCII string
IPbuffer = inet_ntoa(*((in_addr*) host_entry->h_addr_list[0]));
localIp = string(IPbuffer);
cout << "Host IP: " << localIp << endl;
// TODO: add Exception
init_membershiplist();
}
bool state() {
return running;
}
void updateNeighbors() {
map<string, Member>::iterator it = memberships.find(localIp);
for (int i = 0; i < PREDECESSOR; i++) {
if (it == memberships.begin()) {
it = memberships.end();
}
--it;
if (it->first == localIp) {
break;
}
it->second.refreshHbstmp();
}
}
void print_membershipList() {
cout << "Membership list: ";
cout << memberships.size() << endl;
for (auto &it: memberships) {
cout << it.first << ',' << it.second.getJoinstmp() << endl;
}
}
void leave() {
running = false;
shutdown(recvsockfd, SHUT_RDWR);
string msg = to_string(fail) + ';' + localIp + ';';
mux.lock();
gossip(msg);
memberships.clear();
print_membershipList();
mux.unlock();
}
void gossip(string msg) {
int sockfd;
sockaddr_in addr;
memset((char*) &addr, 0, sizeof(addr));
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("Socket failed.");
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
map<string, Member>::iterator it = memberships.find(localIp);
for (int i = 0; i < SUCCESSOR; i++) {
if (++it == memberships.end()) {
it = memberships.begin();
}
if (it->first == localIp) {
break;
}
if (inet_aton((it->first).c_str(), &addr.sin_addr) < 0) {
perror("Assign ip failed");
continue;
}
if (sendto(sockfd, msg.c_str(), msg.length(), 0, (sockaddr*) &addr, sizeof(addr)) < 0) {
perror("Send heartbeat failed");
continue;
}
}
close(sockfd);
}
/*
receive join, fail/leave, heartbeat, and self-introduction message
join format: GossipType.join;new_join_node_hostname;join_timestamp
fail/leave format: GossipType.fail;failed_node_hostname
heartbeat format: GossipType.heartbeat;
self-intro format: GossipType.intro;
*/
void receive() {
cout << "Setting up thread to receive messages." << endl;
sockaddr_in addr, cltaddr;
socklen_t cltaddrlen = sizeof(cltaddr);
memset((char*) &addr, 0, sizeof(addr));
if ((recvsockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("Socket failed");
return;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
if (::bind(recvsockfd, (sockaddr*) &addr, sizeof(addr)) < 0) {
perror("Bind failed");
return;
}
char buffer[BUFFER_SIZE];
while (running) {
ssize_t buflen = recvfrom(recvsockfd, buffer, BUFFER_SIZE, 0, (sockaddr*) &cltaddr, &cltaddrlen);
if (buflen <= 0) {
break;
}
buffer[buflen] = '\0';
string msg = string(buffer);
istringstream iss(msg);
string line;
getline(iss, line, ';');
GossipType mode = static_cast<GossipType>(stoi(line));
string hostname = inet_ntoa(cltaddr.sin_addr);
mux.lock();
if (mode == heartbeat) {
// cout << "Received message \"" << msg << "\": heartbeat from" << hostname << endl;
if (memberships.find(hostname) != memberships.end()) {
memberships[hostname].refreshHbstmp();
}
} else if (mode == join) {
getline(iss, line, ';'); // get hostname
cout << "Received message \"" << msg << "\": join " << line << endl;
if (memberships.find(line) == memberships.end()) {
memberships[hostname].refreshHbstmp();
cout << "New node joined." << endl;
string jointime;
getline(iss, jointime, ';');
memberships[line] = Member(stoll(jointime));
updateNeighbors();
gossip(msg);
print_membershipList();
}
} else if (mode == fail) {
getline(iss, line, ';'); // get hostname
cout << "Received message \"" << msg << "\": fail " << line << endl;
if (memberships.find(line) != memberships.end()) {
if (line == localIp) {
cout << "False positive detection." << endl;
} else {
cout << "Old node failed." << endl;
gossip(msg);
memberships.erase(line);
updateNeighbors();
print_membershipList();
}
}
} else if (mode == intro) {
cout << "Received message \"" << msg << "\": intro from " << hostname << endl;
// if (memberships.find(hostname) == memberships.end()) {
cout << "Detected new node." << endl;
memberships[hostname] = Member();
updateNeighbors();
string msg = to_string(intro_rsp) + ';';
for (auto &it: memberships) {
msg += it.first + ';' + to_string(it.second.getJoinstmp()) + ';';
}
cout << "Send back msg: \"" << msg << "\"" << endl;
if (sendto(recvsockfd, msg.c_str(), msg.length(), 0, (sockaddr*) &cltaddr, cltaddrlen) < 0) {
perror("Failed to send membershiplist");
}
gossip(to_string(join) + ';' + hostname + ';' + to_string(memberships[hostname].getJoinstmp()) + ';');
print_membershipList();
// }
} else {
cout << "Unknown mode: " << mode << endl;
}
mux.unlock();
}
cout << "Stop receiving." << endl;
close(recvsockfd);
}
// thread: send heartbeat
void send_heartbeat() {
cout << "Setting up thread to send heartbeat." << endl;
while (running) {
string msg = to_string(heartbeat) + ';';
mux.lock();
gossip(msg);
mux.unlock();
usleep(HEARTBEAT_FREQUENCY);
}
cout << "Stop sending heartbeats." << endl;
}
// thread: check crash
void check_crash() {
cout << "Setting up thread to check crash." << endl;
while (running) {
mux.lock();
vector<string> toBeDeleted;
map<string, Member>::iterator it = memberships.find(localIp);
for (int i = 0; i < PREDECESSOR; i++) {
if (it == memberships.begin()) {
it = memberships.end();
}
--it;
if (it->first == localIp) {
break;
}
long long curTime = it->second.getCurrtime();
if (curTime - it->second.getHbstmp() > T_FAIL) {
cout << "Detected node failed: " << (it->first) << endl;
// gossip failed
toBeDeleted.push_back(it->first);
}
}
if (toBeDeleted.size() > 0) {
for (string& s: toBeDeleted) {
memberships.erase(s);
}
updateNeighbors();
print_membershipList();
for (string& ip: toBeDeleted) {
string msg = to_string(fail) + ';' + ip + ';';
gossip(msg);
}
}
mux.unlock();
usleep(CHECK_FREQUENCY);
}
cout << "Stop checking crash." << endl;
}
};
#endif /* server_h */
File added
package main
import (
"net"
"bufio"
"os"
"log"
"fmt"
"io"
"bytes"
"strconv"
"os/exec"
)
var IPS = []string {
"fa19-cs425-g05-01.cs.illinois.edu",
"fa19-cs425-g05-02.cs.illinois.edu",
"fa19-cs425-g05-03.cs.illinois.edu",
"fa19-cs425-g05-04.cs.illinois.edu",
"fa19-cs425-g05-05.cs.illinois.edu",
"fa19-cs425-g05-06.cs.illinois.edu",
"fa19-cs425-g05-07.cs.illinois.edu",
"fa19-cs425-g05-08.cs.illinois.edu",
"fa19-cs425-g05-09.cs.illinois.edu",
"fa19-cs425-g05-10.cs.illinois.edu",
// hostname list: machine01, machine02, ...
}
func getGrepPattern() string {
// read grep pattern from stdin
reader := bufio.NewReader(os.Stdin)
fmt.Print("Grep Pattern: ")
grepPattern, err := reader.ReadString('\n')
if err != nil {
log.Fatal(err, "\n")
}
return grepPattern
}
func request(grepPattern string, counter *Counter, host string, response chan string, id int) {
var ip int
fmt.Sscanf(host, "fa19-cs425-g05-%d.cs.illinois.edu", &ip)
if (ip != id) {
conn, err := net.Dial("tcp", host)
if err != nil {
log.Println(err)
if (counter.check()) {
close(response)
}
return
}
// send grep pattern
fmt.Fprintf(conn, grepPattern + "\n")
// receive grep result
log.Printf("Requesting grep result from remote host: %s\n", host)
var buf bytes.Buffer
io.Copy(&buf, conn)
response <- buf.String()
} else {
log.Printf("Requesting grep result from local host: %s\n", host)
pattern := string(grepPattern[:len(grepPattern) - 1])
pattern = "grep -HnE " + pattern + " vm" + strconv.Itoa(id) + ".log"
log.Print("Pattern: ", pattern, "\n")
cmd := exec.Command("/bin/bash", "-c", pattern)
// cmd.Env = append(os.Environ(), "LC_ALL=C")
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Print(err, "\n")
} else {
response <- string(stdoutStderr)
}
}
if (counter.check()) {
close(response)
}
}
func grepRequest(grepPattern string, port string, id int) chan string {
counter := &Counter{val: 0, cap: len(IPS)}
response := make(chan string, len(IPS))
for _, hostname := range IPS {
go request(grepPattern, counter, hostname + ":" + port, response, id)
}
return response
}
func client(port *string, debug *bool) {
hostname, _ := os.Hostname()
var id int
fmt.Sscanf(hostname, "fa19-cs425-g05-%d.cs.illinois.edu", &id)
if !(*debug) {
// set up client log
f, err := os.OpenFile(fmt.Sprintf("./client.%d.log", id), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("Error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
}
grepPattern := getGrepPattern()
response := grepRequest(grepPattern, *port, id)
for resp := range response {
fmt.Print(resp)
}
}
package main
import (
"sync"
)
type Counter struct {
val int
cap int
mux sync.Mutex
}
func (c *Counter) check() bool {
c.mux.Lock()
defer c.mux.Unlock()
c.val++
if c.val == c.cap {
return true
}
return false
}
\ No newline at end of file
package main
import (
"net"
"bufio"
"os/exec"
"os"
"log"
"fmt"
"strconv"
)
func grepResponse(conn *net.Conn, id int) {
pattern, err := bufio.NewReader(*conn).ReadString('\n')
pattern = string(pattern[:len(pattern) - 1])
if err != nil {
log.Print(err, "\n")
return
}
log.Print("Grep Pattern Received: ", pattern, "\n")
pattern = "grep -HnE " + pattern + " vm" + strconv.Itoa(id) + ".log"
cmd := exec.Command("/bin/bash", "-c", pattern)
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Print(err, "\n")
} else {
// send new string back to client
(*conn).Write(stdoutStderr)
}
(*conn).Close()
}
func server(port *string, debug *bool) {
// create machine log
hostname, _ := os.Hostname()
var id int
fmt.Sscanf(hostname, "fa19-cs425-g05-%d.cs.illinois.edu", &id)
if !(*debug) {
// set up server log
f, err := os.OpenFile(fmt.Sprintf("./server.%d.log", id), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
}
log.Print("Launching server on ", *port)
ln, err := net.Listen("tcp", ":" + *port)
if err != nil {
log.Fatal(err, "\n")
}
for {
conn, err := ln.Accept()
if err != nil {
log.Print(err, "\n")
continue
}
log.Println("Serving ", conn.RemoteAddr().String())
go grepResponse(&conn, id)
}
}
\ No newline at end of file
package main
import (
"flag"
"strings"
)
// https://opensource.com/article/18/5/building-concurrent-tcp-server-go
// https://www.thepolyglotdeveloper.com/2017/05/network-sockets-with-the-go-programming-language
func main() {
mode := flag.String("m", "server", "Mode: server | client | createlog")
port := flag.String("p", "8081", "Server listenning Port")
debug := flag.Bool("d", false, "Debug: Print Log to Stdout or File")
flag.Parse()
if strings.ToLower(*mode) == "server" {
server(port, debug)
} else if strings.ToLower(*mode) == "createlog" {
createLog()
} else {
client(port, debug)
}
}
\ No newline at end of file
# Distributed System Services
[TOC]
## Distributed Log Querier
A Go-based multi-threading program to query log files among distributed systems for debugging distributed services.
### Usage
1. Copy the folder `DistributedLogQuerier` into your `$GOPATH`
2. Install the tcplog package on each machine by `go install tcplog`
3. Set up server process on each machine by `tcplog`
4. Grep distributed log from any machine by `tcplog -m="client"`
5. For debug mode, `-d` can be append to the tcplog command
### Design
For server, we use a for loop to keep accepting requests. Upon receiving a request, it starts a thread which greps the requested pattern from the log file on the server and sends back the results. For each grep client, we first read the pattern from standard input. Then, we start a thread to connect to each of the 10 remote machines, send the grep pattern, and receive grep results. In particularly, when the requested machine is the local machine, the thread will directly run the grep command locally instead of requesting the remote server. Additionally, we use a counter to count the machines who has sent back the results and who we fail to connect to. Once the counter reaches 10, grep will end.
## Distributed Group Membership
A distributed group membership service, maintaining group membership lists with scalable failure detection and gossip dissemination. It can ensure completeness for 3 simultaneous failures and low bandwidth usage.
### Usage
1. Cd into the folder `DistributedGroupMembership`
2. Compile the c++ code by `make`
3. Run the program on each machine by `./main`
4. To voluntarily leave the group, type `leave`, and to rejoin the group, type `join`
5. To simulate a machine failure, use `Ctrl-C`.
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment