logger.cpp 7.42 KiB
#include <sys/socket.h>
#include <sys/types.h>
#include <iostream>
#include <unistd.h>
#include <string>
#include <strings.h>
#include <netdb.h>
#include <chrono>
#include <vector>
#include <arpa/inet.h>
#include <thread>
#include <atomic>
#include <fstream>
#include <mutex>
#include <time.h>
#include <math.h>
class Metrics{
public:
int counter;
static std::ofstream of;
static std::atomic<double> prev_timestamp;
static std::atomic<double> curr_timestamp;
static std::atomic<long> bytes;
double min_delay;
double max_delay;
double median;
double perc_90;
Metrics();
void write_metrics();
static std::mutex lock;
};
std::ofstream Metrics::of;
std::atomic<double> Metrics::prev_timestamp;
std::atomic<double> Metrics::curr_timestamp;
std::atomic<long> Metrics::bytes;
std::mutex Metrics::lock;
Metrics::Metrics(){
if(!of.is_open()){
of.open("metric_data2.txt");
if(!of.is_open()) std:: cerr << "Failed to open file\n";
}
counter = 0;
min_delay = 10000.0;
max_delay = 0;
perc_90 = 0;
}
void Metrics::write_metrics(){
lock.lock();
double prev_val = prev_timestamp.load();
double curr_val = curr_timestamp.load();
long curr_bytes = bytes.load();
double curr_delay = fabs(curr_val - prev_val);
if(curr_delay){
min_delay = std::min(min_delay, curr_delay);
max_delay = std::max(max_delay, curr_delay);
}
counter++;
perc_90 = ( (perc_90 * counter + curr_delay) * .9) / counter;
of << curr_delay << " " << min_delay << " " << max_delay << " " << (max_delay - min_delay) / 2 << " " << perc_90 << " " << curr_bytes << "\n";
bytes.store(0);
prev_timestamp.store(0);
curr_timestamp.store(0);
lock.unlock();
}
// Parse message for node name
std::string get_name(char* buf){
std::string working;
int curr_idx = 0;
while(buf[curr_idx] != ' '){
if(buf[curr_idx] == '\0') return "";
curr_idx++;
}
while(!isalpha(buf[curr_idx])){
curr_idx++;
}
while(buf[curr_idx] != ' '){
working += buf[curr_idx];
curr_idx++;
}
return working;
}
std::string get_send_timestamp(char* buf){
int curr_idx = 0;
while(buf[curr_idx] != ' '){
if(buf[curr_idx] == '\0') return "";
curr_idx++;
}
std::string out(buf, curr_idx - 2);
return out.substr(6);
}
void metric_thread(){
Metrics met_ob;
met_ob.lock.lock();
met_ob.prev_timestamp.store(0);
met_ob.curr_timestamp.store(0);
met_ob.lock.unlock();
while(met_ob.counter < 40){
std::this_thread::sleep_for(std::chrono::seconds(1));
met_ob.write_metrics();
}
std::cout << "Thread finished\n";
met_ob.of.close();
return;
}
// 1 arg = port num
int main(int argc, char* argv[]){
// Server socket info
int fd;
struct sockaddr_in addr;
// Create TCP socket
if( ( fd = socket(AF_INET, SOCK_STREAM, 0) ) == -1){
std::cerr << "Failed to create socket\n";
return 0;
};
// Clear struct
bzero((char*)&addr, sizeof(addr));
// Validate command line args
if(argc != 2){
std::cerr << "Incorrect arguments. Only argument should be port number.\n";
return 0;
}
char* port_arg = argv[1];
short port_num = (short) atoi(port_arg);
// IP info
std::string IP_str;
char host_buf[256];
struct hostent *host_data; // Local address info
// Get local host name and use to get local host data
gethostname(host_buf, 256);
host_data = gethostbyname(host_buf);
// Extract IP address from host data
struct in_addr IP_ = *((struct in_addr*)(host_data->h_addr_list[0]));
// Fill socket data (IPv4 addr family, IP address, port (network order))
addr.sin_family = AF_INET;
addr.sin_addr = IP_;
addr.sin_port = htons(port_num);
// Print readable IP for user
char* IP_buf = inet_ntoa(IP_);
if(!IP_buf){
std::cerr << "Error fetching IP\n";
return 0;
}
std::cout << IP_buf <<"\n";
// Bind socket to port
if( bind(fd, (sockaddr*) &addr, sizeof(addr)) == -1 ){
std::cerr << "Could not bind socket\n";
return 0;
}
// Listen for connections
if( (listen(fd, 10)) == -1 ){
std::cerr << "Failed to listen\n";
return 0;
}
while(true){
// Create socket struct to receive message from clients
struct sockaddr_in client;
socklen_t client_socklen;
int client_fd = accept(fd, (sockaddr*) &client, &client_socklen);
if(client_fd == -1){
std::cerr << "Failed to accept connection\n";
continue;
}
// Create child process for client socket
// Maintain parent process as server while child procs running
pid_t pid = fork();
if(pid < 0){
std::cerr << "Failed to create new proess\n";
return 0;
}
// Child process
else if(pid == 0){
// Close server socket within child process
std::thread met_thread(metric_thread);
met_thread.detach();
Metrics log_mets;
close(fd);
char buf[400];
std::string name;
while(true){
bzero(buf, 400);
// Await messages
ssize_t read_bytes = recv(client_fd, buf, 400, 0);
uint64_t microseconds_since_epoch = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// clock_t begin = clock();
float ex_time = (float)microseconds_since_epoch;
std::string as_str(std::to_string(ex_time));
as_str.insert(10, 1, '.');
as_str = as_str.substr(6, as_str.find_last_of(".") - 2);
// If node disconnected, exit child proc
if(read_bytes <= 0){
uint64_t microseconds_since_epoch = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
float ex_time = (float)microseconds_since_epoch;
std::string as_str(std::to_string(ex_time));
as_str.insert(10, 1, '.');
as_str = as_str.substr(0, as_str.find_last_of("."));
std::cout << as_str << " - " << name << " disconnected\n";
exit(0);
}
// if(read_bytes == 0) continue;
if(name.empty()){
name = get_name(buf);
}
// clock_t end = clock();
// double time_dif = end - begin;
// Gathering metrics
// Execute atomically
log_mets.lock.lock();
log_mets.prev_timestamp.store(std::stod(get_send_timestamp(buf)));
log_mets.curr_timestamp.store(std::stod(as_str));
long already_in = log_mets.bytes.load();
log_mets.bytes.store(already_in + read_bytes);
log_mets.lock.unlock();
std::cout << buf << "\n";
}
}
// Parent process
// pid == process id of child
// Child procs will kill themselves when exit called upon disconnection of node
else{
close(client_fd);
}
}
return 1;
}