Skip to content
Snippets Groups Projects
logger.cpp 7.42 KiB
#include <sys/socket.h>
#include <sys/types.h>
#include <iostream>
#include <unistd.h>
#include <string>
#include <strings.h>
#include <netdb.h>
#include <chrono>
#include <vector>
#include <arpa/inet.h>
#include <thread>
#include <atomic>
#include <fstream>
#include <mutex>
#include <time.h>
#include <math.h>


class Metrics{
    public:
        int counter;

        static std::ofstream of;

        static std::atomic<double> prev_timestamp;
        static std::atomic<double> curr_timestamp;
        static std::atomic<long> bytes;

        double min_delay;
        double max_delay;
        double median;
        double perc_90; 

        Metrics();
        void write_metrics();

        static std::mutex lock;
};

std::ofstream Metrics::of;
std::atomic<double> Metrics::prev_timestamp;
std::atomic<double> Metrics::curr_timestamp;
std::atomic<long> Metrics::bytes;
std::mutex Metrics::lock;

Metrics::Metrics(){
    if(!of.is_open()){
        of.open("metric_data2.txt");
        if(!of.is_open()) std:: cerr << "Failed to open file\n";
    }
    counter = 0;
    min_delay = 10000.0;
    max_delay = 0;
    perc_90 = 0;
}

void Metrics::write_metrics(){
    lock.lock();
    double prev_val = prev_timestamp.load();
    double curr_val = curr_timestamp.load();

    long curr_bytes = bytes.load();
    double curr_delay = fabs(curr_val - prev_val);
    if(curr_delay){
        min_delay = std::min(min_delay, curr_delay);
        max_delay = std::max(max_delay, curr_delay);
    }
    counter++;
    perc_90 = ( (perc_90 * counter + curr_delay) * .9) / counter;
    of << curr_delay << " " << min_delay << " " << max_delay << " " << (max_delay - min_delay) / 2 << " " << perc_90 << " " << curr_bytes << "\n";
    
    bytes.store(0);
    prev_timestamp.store(0);
    curr_timestamp.store(0);
    lock.unlock();
}


// Parse message for node name
std::string get_name(char* buf){
    std::string working; 
    int curr_idx = 0;
    while(buf[curr_idx] != ' '){
        if(buf[curr_idx] == '\0') return ""; 
        curr_idx++;
    }
    while(!isalpha(buf[curr_idx])){
        curr_idx++;
    }
    while(buf[curr_idx] != ' '){
        working += buf[curr_idx];
        curr_idx++;
    }
    return working;
}

std::string get_send_timestamp(char* buf){ 
    int curr_idx = 0;
    while(buf[curr_idx] != ' '){
        if(buf[curr_idx] == '\0') return ""; 
        curr_idx++;
    }
    std::string out(buf, curr_idx - 2);
    return out.substr(6);
}

void metric_thread(){
    Metrics met_ob;
    met_ob.lock.lock();
    met_ob.prev_timestamp.store(0);
    met_ob.curr_timestamp.store(0);
    met_ob.lock.unlock();

    while(met_ob.counter < 40){
        std::this_thread::sleep_for(std::chrono::seconds(1));
        met_ob.write_metrics();
    }
    std::cout << "Thread finished\n";
    met_ob.of.close();
    return;
}

// 1 arg = port num
int main(int argc, char* argv[]){
    // Server socket info
    int fd;
    struct sockaddr_in addr;
    
    // Create TCP socket
    if( ( fd = socket(AF_INET, SOCK_STREAM, 0) ) == -1){
        std::cerr << "Failed to create socket\n";
        return 0;
    };

    // Clear struct
    bzero((char*)&addr, sizeof(addr));

    // Validate command line args
    if(argc != 2){
        std::cerr << "Incorrect arguments. Only argument should be port number.\n";
        return 0;
    }
    char* port_arg = argv[1];
    short port_num = (short) atoi(port_arg);

    // IP info
    std::string IP_str;
    char host_buf[256];
    struct hostent *host_data; // Local address info

    // Get local host name and use to get local host data
    gethostname(host_buf, 256);
    host_data = gethostbyname(host_buf);

    // Extract IP address from host data
    struct in_addr IP_ = *((struct in_addr*)(host_data->h_addr_list[0]));

    // Fill socket data (IPv4 addr family, IP address, port (network order))
    addr.sin_family = AF_INET;
    addr.sin_addr = IP_;
    addr.sin_port = htons(port_num);

    // Print readable IP for user
    char* IP_buf = inet_ntoa(IP_);
    if(!IP_buf){
        std::cerr << "Error fetching IP\n";
        return 0;
    }
    std::cout << IP_buf <<"\n";
    
    // Bind socket to port
    if( bind(fd, (sockaddr*) &addr, sizeof(addr)) == -1 ){
        std::cerr << "Could not bind socket\n";
        return 0;
    }

    // Listen for connections
    if( (listen(fd, 10)) == -1 ){
        std::cerr << "Failed to listen\n";
        return 0;
    }

    while(true){
        // Create socket struct to receive message from clients
        struct sockaddr_in client;
        socklen_t client_socklen;

        int client_fd = accept(fd, (sockaddr*) &client, &client_socklen);

        if(client_fd == -1){
            std::cerr << "Failed to accept connection\n";
            continue;
        }

        // Create child process for client socket
        // Maintain parent process as server while child procs running
        pid_t pid = fork();

        if(pid < 0){
            std::cerr << "Failed to create new proess\n";
            return 0;
        }
        // Child process
        else if(pid == 0){
            // Close server socket within child process
            std::thread met_thread(metric_thread);
            met_thread.detach();

            Metrics log_mets;

            close(fd);
            char buf[400];
            std::string name;
            while(true){
                bzero(buf, 400);
                // Await messages
                ssize_t read_bytes = recv(client_fd, buf, 400, 0);

                uint64_t microseconds_since_epoch = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();

                // clock_t begin = clock();

                float ex_time = (float)microseconds_since_epoch;
                std::string as_str(std::to_string(ex_time));
                as_str.insert(10, 1, '.');
                as_str = as_str.substr(6, as_str.find_last_of(".") - 2);

                // If node disconnected, exit child proc
                if(read_bytes <= 0){
                    uint64_t microseconds_since_epoch = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
                    float ex_time = (float)microseconds_since_epoch;
                    std::string as_str(std::to_string(ex_time));
                    as_str.insert(10, 1, '.');
                    as_str = as_str.substr(0, as_str.find_last_of("."));
                    std::cout <<  as_str << " - " << name << " disconnected\n";
                    exit(0);
                }

                // if(read_bytes == 0) continue;
                
                if(name.empty()){
                    name = get_name(buf);
                }

                // clock_t end = clock();
                // double time_dif = end - begin;

                // Gathering metrics
                // Execute atomically
                log_mets.lock.lock();
                log_mets.prev_timestamp.store(std::stod(get_send_timestamp(buf)));
                log_mets.curr_timestamp.store(std::stod(as_str));
                long already_in = log_mets.bytes.load();
                log_mets.bytes.store(already_in + read_bytes);
                log_mets.lock.unlock();

                std::cout << buf << "\n";
            }
        }
        
        // Parent process
        // pid == process id of child
        // Child procs will kill themselves when exit called upon disconnection of node
        else{
            close(client_fd);
        }
    }

    return 1;
}