Skip to content
Snippets Groups Projects
TcpSocket.cpp 13.07 KiB
#include "../inc/TcpSocket.h"

void *get_in_addr(struct sockaddr *sa)
{
	if (sa->sa_family == AF_INET) {
		return &(((struct sockaddr_in*)sa)->sin_addr);
	}
	return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

TcpSocket::TcpSocket(){}

void TcpSocket::bindServer(string port)
{
	int sockfd, new_fd;  // listen on sock_fd, new connection on new_fd
	struct addrinfo hints, *servinfo, *p;
	struct sockaddr_storage their_addr; // connector's address information
	socklen_t sin_size;
	struct sigaction sa;
	int yes = 1, rv = 0, numbytes = 0;
	char buf[DEFAULT_TCP_BLKSIZE];
	string delimiter = "::";
	memset(&hints, 0, sizeof hints);
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_flags = AI_PASSIVE; // use my IP

	if ((rv = getaddrinfo(NULL, port.c_str(), &hints, &servinfo)) != 0) {
		fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
		return;
	}

	for(p = servinfo; p != NULL; p = p->ai_next) {
		if ((sockfd = socket(p->ai_family, p->ai_socktype,
				p->ai_protocol)) == -1) {
			perror("server: socket");
			continue;
		}
		if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
				sizeof(int)) == -1) {
			perror("setsockopt");
			exit(1);
		}
		if (::bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
			close(sockfd);
			perror("server: bind");
			continue;
		}
		break;
	}

	if (p == NULL)  {
		fprintf(stderr, "server: failed to bind\n");
		return;
	}

	freeaddrinfo(servinfo); // all done with this structure

	if (listen(sockfd, BACKLOG) == -1) {
		perror("listen");
		exit(1);
	}

	sa.sa_handler = sigchld_handler; // reap all dead processes
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;
	if (sigaction(SIGCHLD, &sa, NULL) == -1) {
		perror("sigaction");
		exit(1);
	}
	while(1) {  // main accept() loop
		sin_size = sizeof their_addr;
		char remoteIP[INET6_ADDRSTRLEN];
		new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
		if (new_fd == -1) { perror("accept"); continue; }
		inet_ntop(their_addr.ss_family,get_in_addr((struct sockaddr *)&their_addr),remoteIP, sizeof(remoteIP));
		bzero(buf, sizeof(buf));
		if ((numbytes = recv(new_fd, buf, DEFAULT_TCP_BLKSIZE, 0)) > 0) {
			string payloadMessage(buf);
			string returnIP(remoteIP);
			if (messageHandler(new_fd, payloadMessage, returnIP)) continue;
		}
		close(new_fd);
	}
}

string TcpSocket::getDirMetadata()
{
	struct dirent *entry = nullptr;
    DIR *dp = nullptr;
	FILE * fp;
    string match = "tmp-";
    int matchLen = match.size();
	vector<string> split;
	int size = 0;
	string msg;
    if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;}
    while ((entry = readdir(dp))){
        if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){
			split.clear();
            split = splitString(entry->d_name, "-");
			if (split.size() > 2) continue;
			fp = fopen(entry->d_name, "rb");
			if (fp == NULL) {
				printf("Could not open file to send.");
				continue;
			}
			fseek(fp, 0, SEEK_END);
			size = ftell(fp);
			fseek(fp, 0, SEEK_SET);
			if (msg.size()) msg += ",";
			msg += entry->d_name;
			msg += ",";
			msg += to_string(size);
			fclose(fp);
        }
    }
	closedir(dp);
	return msg;
}

void TcpSocket::mergeFiles(string ip, string port, string handler, string filedest, string header, string toSend, string starts) {
	FILE * fp;
	int sockfd = -1, index = 0;
	if (!toSend.size()) return;
	vector<string> toProcess = splitString(toSend, ",");
	vector<string> toStart = splitString(starts, ",");
	int dirSize = (int)(toProcess.size()) - 1;
	int mode = stoi(handler);
	string payload = handler + "," + filedest + "," + header + "," + toSend;
	payload = to_string(payload.size()) + "," + payload;
	//cout << "[PUT] payload: " << payload << ", range: " << starts << " sending to " << ip << endl;
	Messages msg(MERGE, payload);
	if ((sockfd = createConnection(ip, port)) == -1) return;
	if (send(sockfd, msg.toString().c_str(), strlen(msg.toString().c_str()), 0) == -1) {
		//perror("send");
	}
	while (index < dirSize){
		if (mode == CHUNKACK) sendLines(sockfd, toProcess[index], stoi(toStart[index]), stoi(toStart[index+1]));
		else{
			fp = fopen(toProcess[index].c_str(), "rb");
			if (fp == NULL) printf("Could not open file to send %s.", toProcess[index].c_str());
			else {
				sendFile(sockfd, fp, stoi(toProcess[index+1]));
				fclose(fp);
			}
		}
		index += 2;
	}
	close(sockfd);
}

void TcpSocket::sendFile(int sockfd, FILE * fp, int size) {
	int numbytes, sendSize;
	char buf[DEFAULT_TCP_BLKSIZE];
	bzero(buf, sizeof(buf));
	while (!feof(fp) && size > 0) {
		sendSize = (size < DEFAULT_TCP_BLKSIZE) ? size : DEFAULT_TCP_BLKSIZE;
		bzero(buf, sizeof(buf));
		numbytes = fread(buf, sizeof(char), sendSize, fp);
		size -= numbytes;
		if (send(sockfd, buf, numbytes, 0) == -1) {
			//perror("send");
		}
	}
}

void TcpSocket::sendLines(int sockfd, string filename, int start, int end)
{
	ifstream file(filename.c_str());
	if(file.fail()) printf("Could not open file to send %s.", filename.c_str());
	int lineCounter = -1;
	string str;
    while (getline(file, str) && (lineCounter < end - 1))
    {
		lineCounter++;
        if (lineCounter < start) continue;
		if (lineCounter == start) //cout << "[CHUNK] starting to send at line " << to_string(lineCounter) << endl;
		str += '\n';
		//if (lineCounter == (end-1)) str += '\n';
		if (send(sockfd, str.c_str(), strlen(str.c_str()), 0) == -1) {
			//perror("send");
		}
    }
}

int TcpSocket::sendMessage(string ip, string port, string message)
{
	int sockfd;
	if ((sockfd = createConnection(ip, port)) == -1) return -1;
	if (send(sockfd, message.c_str(), strlen(message.c_str()), 0) == -1) {
		perror("send"); return -1;
	}
	close(sockfd);
	return 0;
}

int TcpSocket::createConnection(string ip, string port){
	int sockfd, rv;
	struct addrinfo hints, *servinfo, *p;
	memset(&hints, 0, sizeof hints);
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	if ((rv = getaddrinfo(ip.c_str(), port.c_str(), &hints, &servinfo)) != 0) {
		fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
		return -1;
	}
	for (p = servinfo; p != NULL; p = p->ai_next) {
		if ((sockfd = socket(p->ai_family, p->ai_socktype,
				p->ai_protocol)) == -1) {
			//perror("client: socket");
			continue;
		}
		if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
			close(sockfd);
			//perror("client: connect");
			continue;
		}
		break;
	}
	if (p == NULL) {
		//fprintf(stderr, "client: failed to connect\n");
		return -1;
	}
	freeaddrinfo(servinfo);
	return sockfd;
}

int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP){
	char buf[DEFAULT_TCP_BLKSIZE];
	int numbytes = 0, filesize = 0;
	FILE *fp;
	Messages msg(payloadMessage);
	switch (msg.type) {
		case ELECTION:
		case ELECTIONACK: {
			qMessages.push(payloadMessage);
			break;
		}
		case MERGE: {
			vector<string> metainfo = splitString(msg.payload, ",");
			//correction if merge information made it into the header
			string payload = msg.payload.substr(metainfo[0].size() + 1, stoi(metainfo[0])), extra = "";
			try { extra = msg.payload.substr(metainfo[0].size() + 1 + stoi(metainfo[0])); }
			catch (const out_of_range&) { extra = ""; }
			vector<string> filesAndSizes = splitString(payload, ",");
			int returnType = stoi(filesAndSizes[0]), processedCounter = 0;
			string returnTypeString = "", remoteLocalname = "", execfilename = "", mode = "wb";
			int dirSize = (int)filesAndSizes.size() - 1, fail = 0;
			int index = 3;
			int bytesLeft = 0, offset = strlen(extra.c_str()), buffersize = DEFAULT_TCP_BLKSIZE;
			vector<string> format;
			char c;
			string preProcessed = "";
			string filedest = filesAndSizes[1], processed = "", filename = "", sdfsfilename = "";

			if (returnType == MAPLEACK) returnTypeString = "MAPLEMERGE";
			if (returnType == JUICEACK) {
				returnTypeString = "JUICEMERGE";
				mode = "ab";
				index--;
				while (index < (dirSize - 1)){
					if (preProcessed.size()) preProcessed += ",";
					preProcessed += filesAndSizes[index];
					index++;
				}
			}
			if (returnType == PUTACK) {
				returnTypeString = "PUT";
				sdfsfilename = filesAndSizes[2];
				remoteLocalname = filesAndSizes[3];
				//cout << "[PUT] sdfs: " << sdfsfilename << ", remoteLocal: " << remoteLocalname << endl;
				index = 4;
			}
			if (returnType == CHUNKACK) {
				returnTypeString = "CHUNK";
				sdfsfilename = filesAndSizes[2]; //sdfs file
				execfilename = filesAndSizes[3]; //exec file name
				//cout << "[CHUNK] exec: " << execfilename << ", start: " << filesAndSizes[4] << ", sdfs: " << sdfsfilename << endl;
				index = 5;
			}
			//cout << "[MERGE] type:" << returnTypeString << ", payload: " << payload << endl;
			//cout << "[MERGE] type:" << returnTypeString << ", start index: " << to_string(index) << ", correction to extra -> " << extra << endl;
			while (index < dirSize){
				format.clear();
				string scopy(filesAndSizes[index]);
				format = splitString(scopy, "-"); //cut the tmp off
				filename = (filedest.size()) ? filedest : "tmp-" + returnIP + "-" + format[1];
				//cout << "[MERGE] index:" << to_string(index) << " , dest:" << filename << " , size:" << filesAndSizes[index+1] << endl;
				numbytes = 0;
				filesize = stoi(filesAndSizes[index+1]);
				bytesLeft = filesize;
				buffersize = DEFAULT_TCP_BLKSIZE;
				buffersize = (bytesLeft < buffersize) ? bytesLeft : DEFAULT_TCP_BLKSIZE;
				fp = fopen(filename.c_str(), mode.c_str());
				bzero(buf, sizeof(buf));
				if (extra.size()) {
					offset = strlen(extra.c_str());
					offset = (offset <= buffersize) ? offset : buffersize;
					memcpy(buf, extra.c_str(), offset);
				}
				//cout << "		bytesleft:" << to_string(bytesLeft) << ", offset: " << to_string(offset) << endl;
				while ((((numbytes=recv(sockfd, buf + offset, buffersize - offset, 0)) > 0) || (offset > 0)) && (bytesLeft > 0)) {
					bytesLeft -= numbytes;
					bytesLeft -= offset;
					if (bytesLeft >= 0) fwrite(buf, sizeof(char), numbytes + offset, fp);
					buffersize = (bytesLeft < buffersize) ? bytesLeft : DEFAULT_TCP_BLKSIZE;
					bzero(buf, sizeof(buf));
					if (offset > 0){
						try { extra = extra.substr(offset); }
						catch ( const out_of_range&) { extra = ""; }
						offset = strlen(extra.c_str());
						offset = (offset <= buffersize) ? offset : buffersize;
						memcpy(buf, extra.c_str(), offset);
					}
					//cout << "		bytesleft:" << to_string(bytesLeft) << ", offset: " << to_string(offset) << ", numbytes: " << to_string(numbytes) << endl;
				}
				fclose(fp);
				////bad if corrupt
				if (bytesLeft) {
					cout <<"[MERGE] file corruption! bytesLeft: " << to_string(bytesLeft);
					fail = 1;
					while ((numbytes <= 0) && (bytesLeft > 0) && (offset > 0)){
						try { extra = extra.substr(bytesLeft); }
						catch ( const out_of_range&) { extra = ""; }
						offset = extra.size();
						buffersize = (bytesLeft < buffersize) ? bytesLeft : DEFAULT_TCP_BLKSIZE;
						offset = (offset <= buffersize) ? offset : buffersize;
						bytesLeft -= offset;
						cout << ". error fix, move offet ahead: " << to_string(offset);
					}
					if (returnType != JUICEACK) remove(filename.c_str());
					else {
						int removal = (filesize - bytesLeft);
						fp = fopen(filename.c_str(), "rb");
						fseek(fp, 0, SEEK_END);
						int size = ftell(fp) - removal;
						fseek(fp, 0, SEEK_SET);
						FILE * copyFile = fopen("tmp-rewrite-corrupt-file", "ab");
						cout <<" | removing " << to_string(removal) << " bytes";
						c = fgetc(fp);
					    while (c != EOF && (size > 0))
					    {
					        fputc(c, copyFile);
					        c = fgetc(fp);
							size--;
					    }
						fclose(fp);
						remove(filename.c_str());
						fclose(copyFile);
						rename("tmp-rewrite-corrupt-file", filename.c_str());
					}
					cout << endl;
				}
				else {
					if (processed.size()) processed += ",";
					//return list of processed keys. Manipulate this in JUICE ack to account for directories
					processed += format[1];
					processedCounter++;
				}
				index += 2;
			}
			if (fail && (returnType == MAPLEACK)) { Messages ack(MERGEFAIL, returnIP + "::"); regMessages.push(ack.toString()); }
			else if (returnType == MAPLEACK){ Messages ack(MERGECOMPLETE, returnIP + "::" + processed); regMessages.push(ack.toString()); }
			else if (returnType == JUICEACK){ Messages ack(JUICEACK, returnIP + "::" + preProcessed); regMessages.push(ack.toString()); }
			else if (returnType == CHUNKACK){ Messages ack(CHUNKACK, returnIP + "::" + execfilename + "::" + filesAndSizes[4] + "::" + filename + "::" + sdfsfilename); regMessages.push(ack.toString()); }
			else if (returnType == PUTACK){ Messages ack(PUTACK, returnIP + "::" + sdfsfilename + "::" + filename+ "::" + remoteLocalname); regMessages.push(ack.toString()); }
			else { cout << "[MERGE bad return type " << to_string(returnType) << endl;}
			break;
		}
		case DNSANS:
		case ACK:
		case PUTACK:
		case LEADERACK:
		case REREPLICATE:
		case REREPLICATEGET:
		case DNSGET:
		case DELETE:
		case GETNULL:
		case MAPLESTART:
		case JUICESTART:
		case PHASESTART:
		case MAPLEACK:
		case CHUNK:
		case CHUNKACK:
		case JUICE:
		case JUICEACK:
		case STARTMERGE:
		case MERGECOMPLETE:
		case MERGEFAIL:
		case DNS:{
			//cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl;
			regMessages.push(payloadMessage); //handle from queue
			break;
		}
		default:
			break;
	}
	return 0;
}