Skip to content
Snippets Groups Projects
main.cpp 6.94 KiB
#include "../inc/Node.h"

int main(int argc, char *argv[])
{
	pthread_t threads[5];
	int rc;
	Node *node;
	cout << "Mode: " << ALL2ALL << "->All-to-All, ";
	cout << GOSSIP << "->Gossip-style" << endl;
	if (argc < 2) node = new Node();
	else {
		ModeType mode = ALL2ALL;
		if (atoi(argv[1]) == 1) mode = GOSSIP;
		node = new Node(mode);
	}
	cout << "Running mode: " << node->runningMode << endl;
	Member own(getIP(), PORT, node->localTimestamp, node->heartbeatCounter);
	node->nodeInformation = own;
	cout << "[NEW] Starting Node at " << node->nodeInformation.ip << "/";
	cout << node->nodeInformation.port << "..." << endl;
	int *ret;
	string s;
	string cmd;
	bool joined = false;

	// listening server can run first regardless of running time commands
	if ((rc = pthread_create(&threads[0], NULL, runUdpServer, (void *)node->udpServent)) != 0) {
		cout << "Error:unable to create thread," << rc << endl;
		exit(-1);
	}
	if ((rc = pthread_create(&threads[2], NULL, runTcpServer, (void *)node->tcpServent)) != 0) {
		cout << "Error:unable to create thread," << rc << endl;
		exit(-1);
	}
	if ((rc = pthread_create(&threads[4], NULL, runTcpSender, (void *)node->tcpServent)) != 0) {
		cout << "Error:unable to create thread," << rc << endl;
		exit(-1);
	}
	node->localFilelist.clear(); // for testing
	/*node->localFilelist["sdfsfilename1"] = "localfilename1";
	node->localFilelist["sdfsfilename2"] = "localfilename2";*/

	while(1){
		string s;
		getline (cin, s);
		vector<string> cmdLineInput;
		string delimiter = " ";
		size_t pos_start = 0, pos_end, delim_len = delimiter.length();
		string token;
		while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
			token = s.substr (pos_start, pos_end - pos_start);
			pos_start = pos_end + delim_len;
			cmdLineInput.push_back (token);
		}
		cmdLineInput.push_back (s.substr (pos_start));
		cmd = cmdLineInput[0];
		// Deal with multiple cmd input
		if(cmd == "join"){
			node->startActive();
			if ((rc = pthread_create(&threads[1], NULL, runSenderThread, (void *)node)) != 0) {
				cout << "Error:unable to create thread," << rc << endl;
				exit(-1);
			}
			joined = true;
		} else if(cmd == "leave" && joined){
				node->activeRunning = false;
				node->membershipList.clear();
				node->restartElection(); // clean up leader info
				pthread_join(threads[1], (void **)&ret);
				string message = "["+to_string(node->localTimestamp)+"] node "+node->nodeInformation.ip+"/"+node->nodeInformation.port+" is left";
				cout << "[LEAVE]" << message.c_str() << endl;
				node->logWriter->printTheLog(LEAVE, message);
				sleep(2); // wait for logging
				joined = false;
		} else if(cmd == "id"){
			cout << "ID: (" << node->nodeInformation.ip << ", " << node->nodeInformation.port << ")" << endl;
		} else if(cmd == "member"){
			debugMembershipList(node);
		} else if(cmd == "switch") {
			if(joined) node->requestSwitchingMode();
		} else if(cmd == "mode") {
			cout << "In " << node->runningMode << " mode" << endl;
		} else if(cmd == "exit"){
			cout << "exiting..." << endl; break;
		} else if (cmd == "put" && joined){ // MP2 op1
			if(cmdLineInput.size() < 3){
				cout << "USAGE: put filename sdfsfilename" << endl;
				continue;
			}
			if (!node->isBlackout) {
				string localfilename = cmdLineInput[1];
				string sdfsfilename = cmdLineInput[2];
				Messages outMsg(DNS, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename);
				cout << "[PUT] Got localfilename: " << localfilename << " with sdfsfilename: " << sdfsfilename << endl;
				if (access(localfilename.c_str(), F_OK) != -1) {
					node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString());
				} else {
					cout << "[PUT] The file " << localfilename << " is not existed" << endl;
				}
			} else {
				cout << "[BLACKOUT] Leader cannot accept the request" << endl;
			}
		} else if (cmd == "get" && joined){ // MP2 op2
			if(cmdLineInput.size() < 3){
				cout << "USAGE: get sdfsfilename filename" << endl;
				continue;
			}
			string sdfsfilename = cmdLineInput[1];
			string localfilename = cmdLineInput[2];
			if (node->localFilelist.find(sdfsfilename) != node->localFilelist.end()) {
				cout << "[GET] You have sdfsfilename " << sdfsfilename << " as " << node->localFilelist[sdfsfilename] << endl;
				continue;
			}
			if (node->fileList.find(sdfsfilename) == node->fileList.end()) {
				cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl;
				continue;
			} else {
				if (node->fileList[sdfsfilename].size()==1 && node->isBlackout) {
					cout << "[GET] Get sdfsfilename " << sdfsfilename << " failed" << endl;
					continue;
				}
			}

			Messages outMsg(DNSGET, node->nodeInformation.ip + "::" + to_string(node->hashRingPosition) + "::" + sdfsfilename + "::" + localfilename);
			cout << "[GET] Got sdfsfilename: " << sdfsfilename << " with localfilename: " << localfilename << endl;
			node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString());
		} else if (cmd == "delete" && joined){ // MP2 op3
			if(cmdLineInput.size() < 2){
				cout << "USAGE: delete sdfsfilename" << endl;
				continue;
			}
			if (!node->isBlackout) {
				string sdfsfilename = cmdLineInput[1];
				Messages outMsg(DELETE, node->nodeInformation.ip + "::" + sdfsfilename);
				cout << "[DELETE] Got sdfsfilename: " << sdfsfilename << endl;
				node->tcpServent->sendMessage(node->leaderIP, TCPPORT, outMsg.toString());
			} else {
				cout << "[BLACKOUT] Leader cannot accept the request" << endl;
			}
		} else if (cmd == "ls" && joined){ // MP2 op4
			if(cmdLineInput.size() < 2){
				cout << "USAGE: ls sdfsfilename" << endl;
				continue;
			}
			if (!node->isBlackout) {
				string sdfsfilename = cmdLineInput[1];
				node->listSDFSFileList(sdfsfilename);
			} else {
				cout << "[BLACKOUT] Leader cannot accept the request" << endl;
			}
		} else if (cmd == "store"){ // MP2 op5
			node->listLocalFiles();
		} else if (cmd == "lsall"){
			debugSDFSFileList(node);
		} else {
			cout << "[join] join to a group via fixed introducer" << endl;
			cout << "[leave] leave the group" << endl;
			cout << "[id] print id (IP/PORT)" << endl;
			cout << "[member] print all membership list" << endl;
			cout << "[switch] switch to other mode (All-to-All to Gossip, and vice versa)" << endl;
			cout << "[mode] show in 0/1 [All-to-All/Gossip] modes" << endl;
			cout << "[exit] terminate process" << endl;
			cout << " === New since MP2 === " << endl;
			cout << "[put] localfilename sdfsfilename" << endl;
			cout << "[get] sdfsfilename localfilename" << endl;
			cout << "[delete] sdfsfilename" << endl;
			cout << "[ls] list all machine (VM) addresses where this file is currently being stored" << endl;
			cout << "[lsall] list all sdfsfilenames with positions" << endl;
			cout << "[store] list all files currently being stored at this machine" << endl << endl;
		} // More command line interface if wanted
	}

	pthread_kill(threads[0], SIGUSR1);
	pthread_kill(threads[4], SIGUSR1);
	if(joined){
		pthread_kill(threads[1], SIGUSR1);
	}
	pthread_exit(NULL);
	return 1;
}