Skip to content
Snippets Groups Projects
Node.cpp 68.54 KiB
#include "../inc/Node.h"

Node::Node(){
	udpServent = new UdpSocket();
	tcpServent = new TcpSocket();
	hashRing = new HashRing();
	workerRing = new HashRing();
	localTimestamp = 0;
	heartbeatCounter = 0;
	runningMode = ALL2ALL;
	activeRunning = false;
	prepareToSwitch = false;
	logWriter = new Logger(LOGGING_FILE_NAME);
	leaderPosition = -1;
	proposedTime = 0;
	electedTime = 0;
	isJuicePhase = false;
	isMaplePhase = false;
	joinTimestamp = "";
	exe = "";
	sdfsPre = "";
	possibleSuccessorIP = "";
	leaderIP = "";
	leaderPort = "";
	maplejuiceClear = false;
	isBlackout = true;
	struct sigaction sa;
	sa.sa_handler = sigchld_handler; // reap all dead processes
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;
	if (sigaction(SIGCHLD, &sa, NULL) == -1) {
		perror("sigaction");
		exit(1);
	}
}

Node::Node(ModeType mode) : Node() { runningMode = mode; }

void Node::startActive()
{
	queue<string> empty;
    swap( maplejuiceQ , empty );
	resetMapleJuice();
	restartElection();
	// inserting its own into the list
	time(&startTimestamp);
	sentLocally.clear();
	string startTime = updateNodeHeartbeatAndTime();
	debugMembershipList(this);
	joinTimestamp = startTime; // for hashRing
	getPositionOnHashring(); // update its hashRingPosition
}

string Node::updateNodeHeartbeatAndTime()
{
	string startTime = ctime(&startTimestamp);
	startTime = startTime.substr(0, startTime.find("\n"));
	tuple<string, string, string> keyTuple(nodeInformation.ip, nodeInformation.port,startTime);
	tuple<int, int, int> valueTuple(heartbeatCounter, localTimestamp, 0);
	this->membershipList[keyTuple] = valueTuple;
	return startTime;
}

string Node::populateMembershipMessage()
{
	//The string we send will be seperated line by line --> IP,PORT,HeartbeatCounter,FailFlag
	string mem_list_to_send = "";
	switch (this->runningMode)
	{
		case GOSSIP:
			return populateIntroducerMembershipMessage(); // code re-use
		default:
			string startTime = ctime(&startTimestamp);
			startTime = startTime.substr(0, startTime.find("\n"));
			mem_list_to_send += nodeInformation.ip + "," + nodeInformation.port + "," + startTime + ",";
			mem_list_to_send += to_string(heartbeatCounter) + "," + to_string(0) + "\n";
			return mem_list_to_send;
	}
}

string Node::populateIntroducerMembershipMessage() {
	string mem_list_to_send = "";
	for (auto& element: this->membershipList) {
		tuple<string, string, string> keyTuple = element.first;
		tuple<int, int, int> valueTuple = element.second;
		mem_list_to_send += get<0>(keyTuple) + "," + get<1>(keyTuple) + "," + get<2>(keyTuple) + ",";
		mem_list_to_send += to_string(get<0>(valueTuple)) + "," + to_string(get<2>(valueTuple)) + "\n";
	}
	return mem_list_to_send;
}

int Node::heartbeatToNode()
{
	string msg;
	string mem_list_to_send = populateMembershipMessage();
	vector<tuple<string,string,string>> targetNodes = getRandomNodesToGossipTo();
#ifdef LOG_VERBOSE
	cout << "pick " << targetNodes.size() << " of " << this->membershipList.size()-1;
	cout << " members" << endl;
#endif
	for (uint i=0; i<targetNodes.size(); i++) {
		Member destination(get<0>(targetNodes[i]), get<1>(targetNodes[i]));
		string message = "["+to_string(this->localTimestamp)+"] node "+destination.ip+"/"+destination.port+"/"+get<2>(targetNodes[i]);
#ifdef LOG_VERBOSE
		cout << "[Gossip]" << message.c_str() << endl;
#endif
		this->logWriter->printTheLog(GOSSIPTO, message);
		if (isLeader) {
			if (isBlackout) msg = populateSDFSFileList(LEADERPENDING, mem_list_to_send);
			else msg = populateSDFSFileList(LEADERHEARTBEAT, mem_list_to_send);
		}
		else msg = populateSDFSFileList(HEARTBEAT, mem_list_to_send);
		udpServent->sendMessage(destination.ip, destination.port, msg);
	}
	return 0;
}

int Node::failureDetection(){
	//1. check local membership list for any timestamps whose curr_time - timestamp > T_fail
	//2. If yes, mark node as local failure, update fail flag to 1 and update timestamp to current time
	//3. for already failed nodes, check to see if curr_time - time stamp > T_cleanup
	//4. If yes, remove node from membership list
	vector<tuple<string,string,string>> removedVec;
	vector<string> messagesToSend;
	for(auto& element: this->membershipList){
		tuple<string,string,string> keyTuple = element.first;
		tuple<int, int, int> valueTuple = element.second;
#ifdef LOG_VERBOSE
		cout << "checking " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl;
#endif
		if ((get<0>(keyTuple).compare(nodeInformation.ip) == 0) && (get<1>(keyTuple).compare(nodeInformation.port) == 0)) {
#ifdef LOG_VERBOSE
			cout << "do not check itself" << endl;
#endif
			continue;
		}
		//node has not failed
		if(get<2>(valueTuple) == 0){
			if(localTimestamp - get<1>(valueTuple) > T_timeout){
				//cout << "Got " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << "/" << get<2>(keyTuple) << endl;
				//cout << "local time " << localTimestamp << " vs. " << get<1>(valueTuple) << endl;
				get<1>(this->membershipList[keyTuple]) = localTimestamp;
				get<2>(this->membershipList[keyTuple]) = 1;

				string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(keyTuple)+"/"+get<1>(keyTuple)+"/"+get<2>(keyTuple)+": Local Failure";
				cout << "[FAIL]" << message.c_str() << endl;
				this->logWriter->printTheLog(FAIL, message);
				if(isLeader){
					// clearn up fileList
					Member deletedNode(get<0>(keyTuple), get<1>(keyTuple));
					int deletedNodePostion = hashingId(deletedNode, get<2>(keyTuple));
					hashRing->removeNode(deletedNodePostion);
					for (auto& element: fileList) {
						vector<int> newEntry;
						for(unsigned int i = 0; i < element.second.size(); i++){
							if(element.second[i] != deletedNodePostion){
								newEntry.push_back(element.second[i]);
							}
						}
						fileList[element.first] = newEntry;
					}

					// chech if the failure is the sender in pending requests
					for (auto& senders: pendingSenderRequests) {
						string sdfsfilename = senders.first;
						tuple<string,string,string> sender = senders.second;
						if ((get<0>(keyTuple).compare(get<0>(sender))==0) &&
							get<0>(pendingRequestSent[sdfsfilename]) &&
							(get<0>(pendingRequests[sdfsfilename])!=-1)) {
							// it sent out, and is not finished, cannot help
							// we lost the data from the client
							cout << "[REREPLICATE] client itself fails, we cannot help, remove request" << endl;
							isBlackout = false;
							pendingRequests.erase(sdfsfilename);
							pendingRequestSent.erase(sdfsfilename);
							continue;
						}
						if ((get<0>(keyTuple).compare(get<1>(sender))==0) &&
							get<1>(pendingRequestSent[sdfsfilename]) &&
							(get<1>(pendingRequests[sdfsfilename])!=-1)) {
							// the sender fails during 2nd pass
							// replace the sent
							cout << "[REREPLICATE] One of the sender " << get<0>(keyTuple) << " failed, try again" << endl;
							if (get<2>(pendingRequests[sdfsfilename])!=-1) {
								tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), false, get<2>(pendingRequestSent[sdfsfilename]));
							} else {
								pendingRequests.erase(sdfsfilename);
							}
							continue;
						}
						if ((get<0>(keyTuple).compare(get<2>(sender))==0) &&
							get<2>(pendingRequestSent[sdfsfilename]) &&
							(get<2>(pendingRequests[sdfsfilename])!=-1)) {
							// it sent out, but replicates are failed
							// restart again
							cout << "[REREPLICATE] The sender " << get<0>(keyTuple) << " failed, try again" << endl;
							pendingRequests.erase(sdfsfilename);
						}
					}

					//////////////////////////////////////////////////////
					//1) remove from HashRing
					//2) if processing, reassign
					//2a) if no extra nodes, assign to successor, else add new node to ring
					//3) if a sender, reassign replica holders as new senders for each thing sent
					if (workerRing->size() > 0 && workerRing->getValue(deletedNodePostion).compare("No node found") != 0){
						set<int> currentWorkers;
						for (auto &w: workerRing->nodePositions) currentWorkers.insert(w);
						int nextId;
						string nextIp = "";
						int newSize = workerRing->nodePositions.size()-1;
						int available = hashRing->nodePositions.size()-1; //cant include master node
						if (newSize >= available){
							nextId = workerRing->getSuccessor(deletedNodePostion);
							nextIp = workerRing->getValue(nextId);
						} else {
							for (auto &w : hashRing->nodePositions){
								if (currentWorkers.count(w) == 0){
									nextId = w;
									nextIp = hashRing->getValue(nextId);
									Messages startMsg(PHASESTART, "filling in for failed worker");
									tcpServent->sendMessage(nextIp, TCPPORT, startMsg.toString());
									break;
								}
							}
						}
						workerRing->removeNode(deletedNodePostion);
						workerRing->addNode(nextIp, nextId);
						string workToDo = "", mapleS = "";
						for (auto &e : mapleSending){
							if (get<0>(e.second).compare(get<0>(keyTuple))) continue;
							vector<int> temp = randItems(1, fileList[get<0>(e.first)]);
							string tempIp = hashRing->getValue(temp[0]);
							auto task = make_tuple(get<0>(e.first), get<1>(e.first));
							mapleSending[e.first] = make_tuple(tempIp, 0);
							string processor = "";
							for (auto &worker : workerTasks){
								if (worker.second.count(task) > 0) processor = worker.first;
							}
							mapleS = tempIp + "::" + processor + "::" + exe + "::" + get<0>(e.first) + "::" + get<1>(e.first);
							if (get<1>(e.second) == 0) messagesToSend.push_back(mapleS);
						}

						if (workerTasks.find(get<0>(keyTuple)) != workerTasks.end()){
							auto vecCopy(workerTasks[get<0>(keyTuple)]);
							for (auto el : vecCopy) {
								workerProcessing[nextIp].push_back(make_tuple(get<0>(el), get<1>(el)));
								workerTasks[nextIp].insert(make_tuple(get<0>(el), get<1>(el)));
								if (!isJuicePhase){
									string sender = get<0>(mapleSending[make_tuple(get<0>(el), get<1>(el))]);
									mapleS = sender + "::" + nextIp + "::" + exe + "::" + get<0>(el) + "::" + get<1>(el);
									messagesToSend.push_back(mapleS);
								}
								else workToDo += get<0>(el);
							}
							if (isJuicePhase) messagesToSend.push_back("JUICE::" + nextIp + "::" + exe + "::" + sdfsOut + "::" + workToDo);
						}
						//come back
						workerProcessing.erase(get<0>(keyTuple));
						workerTasks.erase(get<0>(keyTuple));
						struct dirent *entry = nullptr;
					    DIR *dp = nullptr;
						string match = "tmp-" + get<0>(keyTuple) + "-";
					    if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;}
					    while ((entry = readdir(dp))){
					        if (strncmp(entry->d_name, match.c_str(), match.size())){
					            remove(entry->d_name);
							}
						}
						closedir(dp);
					}
				}
			}
		} else { //check for cleanup on already failed nodes
			if(localTimestamp - get<1>(valueTuple) > T_cleanup){
				auto iter = this->membershipList.find(keyTuple);
				if (iter != this->membershipList.end()) {
					removedVec.push_back(keyTuple);
				}
			}
		}
	}
	set<string> aliveIps;
	for (auto alive : hashRing->nodePositions) aliveIps.insert(hashRing->getValue(alive));
	for (auto pendMsg : messagesToSend){
		string scopy(pendMsg);
		vector<string> s = splitString(pendMsg, "::");
		if (aliveIps.count(s[0]) == 0 && s[0].compare("JUICE")) continue;
		if (aliveIps.count(s[1]) == 0) continue;
		cout << "[FIX] " << scopy << endl;
		tcpServent->mapleMessages.push(scopy);
	}

	// O(c*n) operation, but it ensures safety
	bool leaderRemoved = false;
	for (uint i=0; i<removedVec.size(); i++) {
		auto iter = this->membershipList.find(removedVec[i]);
		if (iter != this->membershipList.end()) {

			if (leaderIP.compare(get<0>(removedVec[i]))==0) { // this is the leader
				leaderRemoved = true;
				cout << "[ELECTION] leader " << leaderIP << " is removed" << endl;
			}

			this->membershipList.erase(iter);
			string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(removedVec[i])+"/"+get<1>(removedVec[i])+"/"+get<2>(removedVec[i])+": REMOVED FROM LOCAL MEMBERSHIP LIST";
			cout << "[REMOVE]" << message.c_str() << endl;
			this->logWriter->printTheLog(REMOVE, message);
			//debugMembershipList(this);
		}
	}
	if (this->membershipList.size()==1 || leaderRemoved) { // Only me or leader failed, restart leader election
		if (checkLeaderExist()) { // restart if we have a leader
			restartElection();
		}
	}
	return 0;
}

int Node::joinSystem(Member introducer)
{
	string mem_list_to_send = populateMembershipMessage();
	string msg = populateSDFSFileList(JOIN, mem_list_to_send);
	string message = "["+to_string(this->localTimestamp)+"] sent a request to "+introducer.ip+"/"+introducer.port+", I am "+nodeInformation.ip+"/"+nodeInformation.port;
	cout << "[JOIN]" << message.c_str() << endl;
	this->logWriter->printTheLog(JOINGROUP, message);
	udpServent->sendMessage(introducer.ip, introducer.port, msg);
	return 0;
}

int Node::requestSwitchingMode()
{
	string message = nodeInformation.ip+","+nodeInformation.port;
	string msg = populateSDFSFileList(SWREQ, message);
	for(auto& element: this->membershipList) {
		tuple<string,string,string> keyTuple = element.first;
		cout << "[SWITCH] sent a request to " << get<0>(keyTuple) << "/" << get<1>(keyTuple) << endl;
		udpServent->sendMessage(get<0>(keyTuple), get<1>(keyTuple), msg);
	}
	return 0;
}

int Node::SwitchMyMode() {
	sleep(T_switch); // wait for a while
	udpServent->qMessages = queue<string>(); // empty all messages
	switch (this->runningMode) {
		case GOSSIP: {
			this->runningMode = ALL2ALL;
			cout << "[SWITCH] === from gossip to all-to-all ===" << endl;
			break;
		}
		case ALL2ALL: {
			this->runningMode = GOSSIP;
			cout << "[SWITCH] === from all-to-all to gossip ===" << endl;
			break;
		}
		default:
			break;
	}
	prepareToSwitch = false; // finishing up
	return 0;
}

int Node::listenToHeartbeats() {
	// 1. deepcopy and handle queue
	queue<string> qCopy(udpServent->qMessages);
	udpServent->qMessages = queue<string>();
	size_t size = qCopy.size();
	//cout << "Got " << size << " messages in the queue" << endl;
	//cout << "checking queue size " << nodeOwn->udpServent->qMessages.size() << endl;
	for (size_t j = 0; j < size; j++) {
		//cout << qCopy.front() << endl;
		handleUdpMessage(qCopy.front());
		if(this->activeRunning == false) return 0;
		qCopy.pop();
	}
	return 0;
}

/*
 * Take a hearbeat message, if the member doesn't exist add it, update hashring, and disseminate out memberList
 * If it exists, check for failure, and if there is update fail flag, otherwise try ot update heartbeat
*/
void Node::processHeartbeat(string message) {
	bool changed = false;
	vector<string> incomingMembershipList = splitString(message, "\n");
	vector<string> membershipListEntry;
	for(string list_entry: incomingMembershipList){
#ifdef LOG_VERBOSE
		cout << "handling with " << list_entry << endl;
#endif
		if (list_entry.size() == 0) {
			continue;
		}
		membershipListEntry.clear();
		membershipListEntry = splitString(list_entry, ",");
		if (membershipListEntry.size() != 5) continue;

		int incomingHeartbeatCounter = stoi(membershipListEntry[3]);
		int failFlag = stoi(membershipListEntry[4]);
		tuple<string,string,string> mapKey(membershipListEntry[0], membershipListEntry[1], membershipListEntry[2]);

		if ((get<0>(mapKey).compare(nodeInformation.ip) == 0) && (get<1>(mapKey).compare(nodeInformation.port) == 0)) {
			// Volunteerily leave if you are marked as failed
			if(failFlag == 1){
				this->activeRunning = false;
				string message = "["+to_string(this->localTimestamp)+"] node "+this->nodeInformation.ip+"/"+this->nodeInformation.port+" is left";
				cout << "[VOLUNTARY LEAVE]" << message.c_str() << endl;
				this->logWriter->printTheLog(LEAVE, message);
				return;
			}
#ifdef LOG_VERBOSE
			cout << "do not check itself heartbeat" << endl;
#endif
			continue;
		}

		map<tuple<string,string,string>, tuple<int, int, int>>::iterator it;
		it = this->membershipList.find(mapKey);
		if (it == this->membershipList.end() && failFlag == 0) {
			tuple<int, int, int> valueTuple(incomingHeartbeatCounter, localTimestamp, failFlag);
			this->membershipList[mapKey] = valueTuple;
			updateHashRing();
			string message = "["+to_string(this->localTimestamp)+"] new node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" is joined";
			cout << "[JOIN]" << message.c_str() << endl;
			this->logWriter->printTheLog(JOINGROUP, message);
			changed = true;
		} else if(it != this->membershipList.end()) {
			// update heartbeat count and local timestamp if fail flag of node is not equal to 1. If it equals 1, we ignore it.
			if(get<2>(this->membershipList[mapKey]) != 1){
				//if incoming membership list has node with fail flag = 1, but fail flag in local membership list = 0, we have to set fail flag = 1 in local
				switch (this->runningMode) {
					case GOSSIP: {
						if(failFlag == 1){
							get<2>(this->membershipList[mapKey]) = 1;
							get<1>(this->membershipList[mapKey]) = localTimestamp;
							string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+": Disseminated Failure";
							cout << "[FAIL]" << message.c_str() << endl;
							this->logWriter->printTheLog(FAIL, message);
						}
						else{
							int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]);
							if(incomingHeartbeatCounter > currentHeartbeatCounter){
								get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter;
								get<1>(this->membershipList[mapKey]) = localTimestamp;
								string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter);
#ifdef LOG_VERBOSE
								cout << "[UPDATE]" << message.c_str() << endl;
#endif
								this->logWriter->printTheLog(UPDATE, message);
							}
						}
						break;
					}
					default: { // ALL2ALL doesn't disseminate
						int currentHeartbeatCounter = get<0>(this->membershipList[mapKey]);
						if(incomingHeartbeatCounter > currentHeartbeatCounter){
							get<0>(this->membershipList[mapKey]) = incomingHeartbeatCounter;
							get<1>(this->membershipList[mapKey]) = localTimestamp;
							get<2>(this->membershipList[mapKey]) = failFlag;
							string message = "["+to_string(this->localTimestamp)+"] node "+get<0>(mapKey)+"/"+get<1>(mapKey)+"/"+get<2>(mapKey)+" from "+to_string(currentHeartbeatCounter)+" to "+to_string(incomingHeartbeatCounter);
#ifdef LOG_VERBOSE
							cout << "[UPDATE]" << message.c_str() << endl;
#endif
							this->logWriter->printTheLog(UPDATE, message);
						}
						break;
					}
				}
			}
		}
	}
	// If membership list changed in all-to-all, full membership list will be sent
	if(changed && this->runningMode == ALL2ALL) heartbeatToNode();
}

void Node::setUpLeader(string message, bool pending)
{
	string msg(message);
	vector<string> fields = splitString(msg, ",");
	if(fields.size() >= 3){
		Member leader(fields[0], fields[1]);
		leaderPosition = hashingId(leader, fields[2]);
		leaderIP = fields[0];
		leaderPort = fields[1];
	}
	leaderCreateHashRing(); // local copy of hashRing on each node

	if (pending != isBlackout) {
		if (isBlackout) {
			//cout << "[BLACKOUT] Leader is ready now" << endl;
		} else {
			//cout << "[BLACKOUT] Leader is busy now" << endl;
		}
	}
	if (pending) {
		isBlackout = true;
	} else {
		isBlackout = false;
	}
}

/**
 * given a string message which contains a membership list, we will take the string, split it by returns, and then split it by commas, to then compare the heartbeat counters
 * of each IP,PORT,timestamp tuple with the membership list of the receiving Node.
 * Found help on how to do string processing part of this at https://stackoverflow.com/questions/14265581/parse-split-a-string-in-c-using-string-delimiter-standard-c
 */
void Node::handleUdpMessage(string message){
	//cout << "handleUdpMessage " << message << endl;
	string deMeg = decapsulateMessage(message);
	bool pending = true;
	//cout << "handleUdpMessage deMeg " << deMeg << endl;
	Messages msg(deMeg);
	switch (msg.type) {
		case LEADERHEARTBEAT: // Note: not for Gossip-style, only All-to-All
			//cout << "LEADERHEARTBEAT: " << msg.payload << endl;
			pending = false;
		case LEADERPENDING: setUpLeader(msg.payload, pending);
		case HEARTBEAT:
		case JOINRESPONSE:{
			processHeartbeat(msg.payload);
			break;
		}
		case JOIN:{
			// introducer checks collision here
			vector<string> fields = splitString(msg.payload, ",");
			if(fields.size() >= 3){
				Member member(fields[0], fields[1]);
				int checkPosition = hashingId(member, fields[2]);
				if (checkHashNodeCollision(checkPosition)) {
					string response = populateSDFSFileList(JOINREJECT, "");
					udpServent->sendMessage(fields[0], fields[1], response);
				} else {
					string introducerMembershipList;
					introducerMembershipList = populateIntroducerMembershipMessage();
					string response = populateSDFSFileList(JOINRESPONSE, introducerMembershipList);
					udpServent->sendMessage(fields[0], fields[1], response);
				}
			}
			break;
		}
		case SWREQ: {
			// got a request, send an ack back
			vector<string> fields = splitString(msg.payload, ",");
			if (fields.size() == 2) {
				cout << "[SWITCH] got a request from "+fields[0]+"/"+fields[1] << endl;
				string messageReply = nodeInformation.ip+","+nodeInformation.port;
				//Messages msgReply(SWRESP, messageReply);
				string msgReply = populateSDFSFileList(SWRESP, messageReply);
				udpServent->sendMessage(fields[0], fields[1], msgReply);
				prepareToSwitch = true;
			}
			break;
		}
		case SWRESP: {
			// got an ack
			vector<string> fields = splitString(msg.payload, ",");
			if (fields.size() == 2) {
				cout << "[SWITCH] got an ack from "+fields[0]+"/"+fields[1] << endl;
			}
			break;
		}
		case JOINREJECT: {
			cout << "[JOINREJECT] There is a collision, and I have to leave..." << endl;
			this->activeRunning = false;
			pthread_exit(NULL);
			break;
		}
		default:
			break;
	}
	//debugMembershipList(this);
}

int Node::getPositionOnHashring(){
	hashRingPosition = hashingId(nodeInformation, joinTimestamp);
	cout << "[ELECTION] This node is at hash position: " << hashRingPosition << endl;
	return 0;
}

int Node::updateHashRing(){
	bool needToUpdate = true;
	for(auto& it: membershipList){
		needToUpdate = true;
		string ip = get<0>(it.first);
		for(int i: hashRing->nodePositions){
			if(ip.compare(hashRing->getValue(i)) == 0){
				needToUpdate = false;
				break;
			}
		}
		if(needToUpdate){
			Member toBeInserted(ip, get<1>(it.first));
			int hashPosition = hashingId(toBeInserted,  get<2>(it.first));
			hashRing->addNode(ip, hashPosition);
		}
	}
	return 0;
}

bool Node::checkLeaderExist()
{
	return leaderPosition != -1;
}

bool Node::checkHashNodeCollision(int checkPosition)
{
	// if True, the position is full
	for (auto& element: this->membershipList) {
		tuple<string, string, string> keyTuple = element.first;
		Member member(get<0>(keyTuple), get<1>(keyTuple));
		if (nodeInformation.ip.compare(member.ip)==0) { // myself, skip it
			continue;
		}
		int pos = hashingId(member, get<2>(keyTuple));
		if (pos == checkPosition) {
			return true;
		}
	}
	return false;
}

bool Node::findWillBeLeader()
{
	bool beLeader = true;
	vector<int> positions;
    vector<string> ipAddresses;
	if (membershipList.size() > 1) { // only 1 member does not need the leader
		for (auto& element: this->membershipList) {
			tuple<string, string, string> keyTuple = element.first;
			Member member(get<0>(keyTuple), get<1>(keyTuple));
			int pos = hashingId(member, get<2>(keyTuple));
			if (pos < hashRingPosition) {
				//cout << get<0>(keyTuple) << " with id " << pos << " is smaller" << endl;
				beLeader = false;
			}
			if (nodeInformation.ip.compare(get<0>(keyTuple))!=0) {
				int posNext = (pos + (HASHMODULO-hashRingPosition)) % HASHMODULO;
				positions.push_back(posNext);
				ipAddresses.push_back(get<0>(keyTuple));
			}
		}
	} else {
		beLeader = false;
	}

	if (positions.size() > 0) {
		int index = 0;
		int possibleSuccessor = positions[index];
		for (uint i=1; i<positions.size(); i++) {
			if (positions[i] < possibleSuccessor) {
				possibleSuccessor = positions[i];
				index = i;
			}
		}
		//cout << "[ELECTION] My Possible Successor is " << ipAddresses[index] << endl;
		possibleSuccessorIP = ipAddresses[index];
	}

	return beLeader;
}

void Node::restartElection() // haven't tested yet
{
	cout << "[ELECTION] No leader now, restart election..." << endl;
	electedTime = localTimestamp;
	isLeader = false;
	leaderPosition = -1;
	leaderIP = "";
	leaderPort = "";
}

void Node::leaderCreateHashRing() {
	hashRing->clear();
	for (auto& element: this->membershipList) { // update hashRing
		tuple<string, string, string> keyTuple = element.first;
		Member member(get<0>(keyTuple), get<1>(keyTuple));
		int pos = hashingId(member, get<2>(keyTuple));
		hashRing->addNode(get<0>(keyTuple), pos);
	}
}

void Node::proposeToBeLeader() {
	Messages msg(ELECTION, to_string(hashRingPosition));
	cout << "[ELECTION] Propose to be leader, send to " << possibleSuccessorIP << endl;
 	tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString());
}

void Node::electionMessageHandler(Messages messages)
{
	switch (messages.type) {
		case ELECTION: { // check id
			int currentId = stoi(messages.payload);
			if (hashRingPosition > currentId) {
				//incoming is smaller, just forward
				cout << "[ELECTION] Got Election, agree on voting: " << messages.payload << endl;
				tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString());
			} else if (hashRingPosition < currentId) {
				//incoming is biger, replace and send it
				cout << "[ELECTION] Got Election, against this voting " << messages.payload;
				cout << ", and using my id " << hashRingPosition << endl;
				Messages msg(ELECTION, to_string(hashRingPosition));
				tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString());
			} else { // finish 1st pass
				cout << "[ELECTION] Got Election, everyone voted on me and start acking" << endl;
				Messages msg(ELECTIONACK, to_string(hashRingPosition));
				tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, msg.toString());
			}
			break;
		}
		case ELECTIONACK: {
			int currentId = stoi(messages.payload);
			if (hashRingPosition == currentId) { // finish 2 pass
				cout << "[ELECTION] I am the leader now" << endl;
				isBlackout = false;
				leaderPosition = hashRingPosition;
				isLeader = true;
				leaderIP = nodeInformation.ip;
				leaderPort = nodeInformation.port;
				leaderCreateHashRing();
			} else {
				// Not me, just forward
				cout << "[ELECTION] Pass ACK " << messages.payload << endl;
				tcpServent->sendMessage(possibleSuccessorIP, TCPPORT, messages.toString());
			}
			electedTime = localTimestamp; // update elected time
			cout << "[ELECTION] Elected at Local Time " << electedTime << endl;
			break;
		}
		default:
			break;
	}
}

void Node::tcpElectionProcessor()
{
	queue<string> qCopy(tcpServent->qMessages);
	tcpServent->qMessages = queue<string>();
	//cout << "Got " << size << " TCP messages" << endl;
	for (size_t j=0; j<qCopy.size(); j++) {
		//cout << qCopy.front() << endl;
		Messages msg(qCopy.front());
		//cout << "Has " << msg.type << " with " << msg.payload << endl;
		switch (msg.type) {
			case ELECTION:
			case ELECTIONACK: electionMessageHandler(msg);
			default: break;
		}
		qCopy.pop();
	}
}

void Node::updateFileList(string sdfsfilename, int nodePosition)
{
	if (isLeader) {
		vector<int> positions = fileList[sdfsfilename];
		bool existed = false;
		for (uint i=0; i<positions.size(); i++) {
			if (positions[i] == nodePosition) {
				existed = true;
			}
		}
		if (!existed) {
			positions.push_back(nodePosition);
		}
		vector<int> storedPositionsCopy(positions);
		fileList[sdfsfilename] = storedPositionsCopy;
	}
}
//Can only be called when we are the leader node, since we are going to be calling REREPLICATE here, and checking the global file list.
//Called in ProcessRegMessages before we do anything else, since we want to check the global file list consistency before we process the other messages
//In the Queue.v
void Node::checkFileListConsistency(){
	if (membershipList.size() < 4) {
		cout << "[ERROR] The number of members are too small, we need at least 4" << endl;
		return;
	}
	for (auto& element: fileList) {
		if(element.second.size() < 4){
			//Need to rereplicate --> do this one at a time
			//First check the closest node, successor and predecessor
			int closestNodePostion = hashRing->locateClosestNode(element.first);
			int pred = hashRing->getPredecessor(closestNodePostion);
			int succ = hashRing->getSuccessor(closestNodePostion);
			int randomNode = hashRing->getRandomNode(tuple<int, int, int>(closestNodePostion, pred, succ));
			vector<int> nodesToCheck = {closestNodePostion, pred, succ, randomNode};
			for(unsigned int i = 0; i < nodesToCheck.size(); i++){
				if (!isInVector(element.second, nodesToCheck[i]))
				{
					string nodeInfo = hashRing->getValue(nodesToCheck[i]);
					Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::");
					tuple<int, int, int> request = pendingRequests[element.first];
					if(get<0>(request) == 0 && get<1>(request) == 0 && get<2>(request) == 0){
						pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]);
						pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true);
						tcpServent->regMessages.push(outMsg.toString());
						break;
					}
					if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){
						cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl;
						break;
					}
					pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]);
					pendingRequestSent[element.first] = tuple<int, int, int>(true, true, true);
					tcpServent->regMessages.push(outMsg.toString());
					break;
				}
			}
		}
	}
}

vector<tuple<string,string, string>> Node::getRandomNodesToGossipTo()
{
    vector<tuple<string, string, string>> availableNodesInfo;
    vector<tuple<string, string, string>> selectedNodesInfo;
    vector<int> indexList;
    int availableNodes = 0;
    for(auto& element: this->membershipList){
        tuple<string, string, string> keyPair = element.first;
        tuple<int, int, int> valueTuple = element.second;
        //dont gossip to self or failed nodes
        if(get<0>(keyPair).compare(this->nodeInformation.ip) && (get<2>(valueTuple) != 1)){
            availableNodesInfo.push_back(keyPair);
            indexList.push_back(availableNodes++);
        }
    }
    switch (this->runningMode) {
        case GOSSIP: {
            return randItems(N_b, availableNodesInfo);
        }
        //ALL2ALL
        default: {
            return availableNodesInfo;
        }
    }
}

void Node::handleMaplejuiceQ(){
	if (!maplejuiceQ.empty() && !workerRing->size() && !isBlackout){
		for (auto &e: fileList) {
			if (e.second.size() < 4) return; //make sure replication is good first
		}
		string msgCopy(maplejuiceQ.front());
		cout << "[QUEUE] sending next maple/juice to be processed " << msgCopy << endl;
		tcpServent->regMessages.push(msgCopy);
		maplejuiceQ.pop();
	}
}

void Node::handleTcpMessage()
{
	//Before we do anything here, we should have the leader check to see if the file list is consistent or not.
	if(isLeader){
		if (!isMaplePhase && !isJuicePhase) checkFileListConsistency();
	}
	queue<string> qCopy(tcpServent->regMessages);
	tcpServent->regMessages = queue<string>();
	int size = qCopy.size();
	//cout << "Got " << size << " TCP messages" << endl;
	for (int j=0; j<size; j++) {
		// cout << qCopy.front() << endl;
		vector<string> msgSplit = splitString(qCopy.front(), "::");
		if (msgSplit.size() < 1){
			qCopy.pop();
			continue;
		}
		string payload = "";
		for(uint k = 1; k < msgSplit.size(); k++){
			if(k == msgSplit.size() - 1) payload += msgSplit[k];
			else payload += msgSplit[k] + "::";
		}
		MessageType msgType = static_cast<MessageType>(stoi(msgSplit[0]));
		Messages msg(msgType, payload);
		vector<string> inMsg = splitString(msg.payload, "::");
		//set<int> blackoutOk {PUTACK, DNSGET, DNS, DNSANS, REREPLICATE, REREPLICATEGET, GETNULL, ACK, LEADERACK};
		//cout << "[TCP] Has " << msg.type << " with " << msg.payload << endl;
		switch (msg.type) {
			case JUICESTART: {
				if (workerRing->size() || isBlackout) {
					maplejuiceQ.push(msg.toString());
					cout << "[JUICE] maple or juice in progress" << endl;
					break;
				}
				cout << "[JUICE] Debug: " << msg.toString() << endl;
				if (inMsg.size() < 6) { cout << "[JUICE] message too short " << to_string(inMsg.size()) << endl; break; }
				//juice_exe num_juices sdfs_intermediate_dir sdfs_out_file delete={0,1} hash_or_range={0,1}
				cout << "[JUICE] Leader starting new Juice phase" << endl;
				isJuicePhase = true;
				isMaplePhase = false;
				string includedDebug = "";
				sdfsOut = inMsg[3], sdfsPre = inMsg[2] + "-";
				exe = inMsg[0];
				maplejuiceClear = (stoi(inMsg[4])) ? true : false;
				int workers = stoi(inMsg[1]);
				int isRangePartition = stoi(inMsg[5]);
				int ringSize = hashRing->nodePositions.size();
				if (workers > ringSize-1) workers = ringSize-1;
				vector<string> directory;
				vector<tuple<string,string,string>> aliveNodes;
				//cout << "[DIRECTORY] - " << sdfsPre;
				for (auto &e: fileList){
					if (strncmp(e.first.c_str(), sdfsPre.c_str(), sdfsPre.size()) == 0){
						//cout << e.first << ", ";
						directory.push_back(e.first);
					}
				}
				//cout << endl;
				sort(directory.begin(), directory.end());
				for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first);
				vector<tuple<string,string,string>> juiceNodes = randItems(workers, aliveNodes);
				for (auto &e : juiceNodes) {
					Member m(get<0>(e), get<1>(e));
					workerRing->addNode(get<0>(e), hashingId(m, get<2>(e)));
					if (includedDebug.size()) includedDebug += " | ";
					includedDebug += get<0>(e);
					Messages startMsg(PHASESTART, "start juice");
					tcpServent->sendMessage(get<0>(e), TCPPORT, startMsg.toString());
				}
				cout << "[JUICE] " << includedDebug << " are the worker nodes" << endl;
				if (isRangePartition){
					int rangeSplit = (int) (round(double(directory.size()) / double(workers)));
					int workerAssigned = 0;
					int fileIndex = 0;
					for (auto &e: directory){
						string processor = get<0>(juiceNodes[workerAssigned]);
						workerProcessing[processor].push_back(make_tuple(e, "0")); //dont care about line #
						fileIndex++;
						if (fileIndex >= ((workerAssigned+1)*rangeSplit)) { workerAssigned++; }
					}
				}
				else {
					for (auto &e: directory){
						string processor = workerRing->getValue(workerRing->locateClosestNode(e));
						workerProcessing[processor].push_back(make_tuple(e, "0")); //dont care about line #
					}
				}
				for (auto &work : workerProcessing) {
					string msg = exe + "::" + sdfsOut + "::";
					int comma = 0;
					for (auto &f : work.second){
						if (comma) msg += ",";
						comma = 1;
						msg += get<0>(f);
					}
					Messages outMsg(JUICE, msg);
					tcpServent->sendMessage(work.first, TCPPORT, outMsg.toString());
				}
				break;
			}
			case JUICE: {
				auto piecesOfWork = splitString(inMsg[2], ",");
				string retry = "";
				int hasEverything = 1;
				string header = "";
				time_t fileTimestamp;
				time(&fileTimestamp);
				string match = "tmp-", output = "tmp-" + inMsg[1] + "-" + to_string(fileTimestamp);
				for (string s : piecesOfWork){
					if (localFilelist.find(s) == localFilelist.end())
					{
						Messages outMsg(DNSGET, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + s + "::" + s);
						tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString());
						hasEverything = 0;
					}
				}
				if (hasEverything){
					for (string s : piecesOfWork){
						cout << "[JUICE] " << s << " has arrived" << endl;
						string execName = EXEC_CMD + inMsg[0];
						string fileName = localFilelist[s];
						if (runExecutable(execName, fileName) < 0) { cout << "[EXEC] ERROR" << endl; break;}
						string scopy(s);
						auto splitter = splitString(scopy, "-");
						string strToMerge = "tmp-" + splitter[1];
						if (header.size()) header += ",";
						header += splitter[1];
						ofstream keyFile;
						keyFile.open(output, ios::app | ios::out);
						ifstream toMerge(strToMerge);
						if (!toMerge.is_open() || !keyFile.is_open()) {
							cout << "bad file permissions for " << strToMerge << " and/or " << output << endl;
							if (keyFile.is_open()) keyFile.close();
							break;
						}
						keyFile << toMerge.rdbuf();
						keyFile.close();
						remove(strToMerge.c_str());
					}
					FILE *fp = fopen(output.c_str(), "rb");
					fseek(fp, 0, SEEK_END);
					int size = ftell(fp);
					fseek(fp, 0, SEEK_SET);
					fclose(fp);
					cout << "[TEST] merging " << to_string(size) << " bytes" << endl;
					string mergeMsg = leaderIP + "::" + to_string(JUICEACK) + "::" + inMsg[1] + "::" + header + "::"+ output + "," + to_string(size) + "::";
					tcpServent->mergeMessages.push(mergeMsg);
				} else{
					tcpServent->regMessages.push(msg.toString());
				}
				break;
			}

			case JUICEACK: {
				if (!isLeader) break;
				vector<string> completedJuices = splitString(inMsg[1], ",");
				for (string &task : completedJuices){
					cout << "[JUICEACK] task: " << task << " status: ...";
					string matchStr = sdfsPre + task;
					auto element = make_tuple(matchStr, "0");
					auto it = find(workerProcessing[inMsg[0]].begin(), workerProcessing[inMsg[0]].end(), element);
					if (it != workerProcessing[inMsg[0]].end()) {
						cout << "completed." << endl;
						workerProcessing[inMsg[0]].erase(it);
					}
				}
				cout << endl;
				if (!workerProcessing[inMsg[0]].size()) workerProcessing.erase(inMsg[0]);
				if (!workerProcessing.size()) {
					cout <<"[JUICEACK] replicate final results " << endl;
					Messages outMsg(DNS, nodeInformation.ip + "::" + to_string(hashRingPosition) + "::" + sdfsOut + "::" + sdfsOut + "::" + "-1" + "::" + "-1" + "::");
					tcpServent->regMessages.push(outMsg.toString());
					if (maplejuiceClear){
						cout << "[JUICEACK] clearing files.... " << endl;
						for (auto &f : fileList){
							if (strncmp(f.first.c_str(), sdfsPre.c_str(), sdfsPre.size()) == 0){
								Messages outMsg(DELETE, nodeInformation.ip + "::" + f.first);
								tcpServent->regMessages.push(outMsg.toString());
							}
						}
					}
					cout << "[JUICE] ------------ complete ---------- " << endl;
					resetMapleJuice();
				}
				break;
			}
			case PHASESTART: {
				//TODO: reset things when you reach the end of MAPLESTART OR JUICESTART
				//that if the work dictionaries are empty, theres nothing to do and just end the phase
				//right now youll just stay stuck in the phase
				cout << "[PHASESTART] go " << inMsg[0] << endl;
				resetMapleJuice();
				break;
			}
			case MAPLESTART: {
				//leader only function
				if (workerRing->size() || isBlackout) {
					maplejuiceQ.push(msg.toString());
					cout << "[MAPLE] maple or juice in progress" << endl;
					break;
				}
				cout << "[MAPLE] Leader starting new Maple phase" << endl;
				if (inMsg.size() < 4) break;
				mapleKeys.clear();
				exe = inMsg[0];
				string num_maples = inMsg[1], sdfs_dir = inMsg[3] + "-", s = "";
				sdfsPre = inMsg[2] + "-";
				isMaplePhase = true;
				isJuicePhase = false;
				int workers = stoi(num_maples), ringSize = hashRing->nodePositions.size();
				if (workers > ringSize-1) workers = ringSize-1;
				int total_lines = 0, start = 0, id = 0;
				vector<tuple<string,int>> directory;
				vector<tuple<string,string,string>> aliveNodes;
				//3 workers and a master is a condition set for correct working of the program.
				//This assumption is similarly made in other places based on the design specification of 3 simul fails
				if (ringSize <= 3){
					cout << "[ERROR] Not enough nodes for Maple. Need 4 minimum (3 workers, 1 leader)" << endl;
					break;
				}
				cout << "[DIRECTORY] " << sdfs_dir;
				for (auto &e: fileSizes){
					if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){
						cout << " , " << e.first << " | " << to_string(get<1>(e.second));
						directory.push_back(make_tuple(e.first, get<1>(e.second)));
						total_lines += get<1>(e.second);
					}
				}
				cout << endl << "[MAPLE] need to process " << to_string(total_lines) << endl;
				for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first);
				vector<tuple<string,string,string>> mapleNodes = randItems(workers, aliveNodes);
				string includedDebug = "";
				for (auto &e : mapleNodes) {
					Member m(get<0>(e), get<1>(e));
					workerRing->addNode(get<0>(e), hashingId(m, get<2>(e)));
					if (includedDebug.size()) includedDebug += " | ";
					includedDebug += get<0>(e);
				}
				vector<string> messagesToSend; //used so we get our full assignments before scheduling
				cout << "[MAPLE] " << includedDebug << " are the worker nodes" << endl;
				for (auto &e: directory){
					start = 0;
					string file = get<0>(e);
					int lines = get<1>(e);
					cout << "[MAPLE] file: " << file << " - " << to_string(lines) << endl;
					while (start < lines){
						s = file + "::" + to_string(start);
						id = workerRing->locateClosestNode(s);
						srand(time(NULL));
						vector<int> temp = randItems(1, fileList[file]);
						string sender = hashRing->getValue(temp[0]); //because files are part of sdfs anyone can be the sender
						string processor = workerRing->getValue(id); //processor is a maple worker
						workerProcessing[processor].push_back(make_tuple(file, to_string(start)));
						workerTasks[processor].insert(make_tuple(file, to_string(start)));
						cout << "[MAPLE] assign file " << file << " at " << to_string(start) << " to " << processor << endl;
						mapleSending[make_tuple(file, to_string(start))] = make_tuple(sender, 0);
						string maplemsg = sender + "::" + processor + "::" + exe + "::" + s;
						//sender, processor, exec, file, start
						messagesToSend.push_back(maplemsg);
						start = start + T_maples;
					}
				}
				for (auto mapleMsg : messagesToSend) tcpServent->mapleMessages.push(mapleMsg);
				break;
			}

			case CHUNK: {
				//processor, exec, sdfs, start
				int end = stoi(inMsg[3]) + T_maples, start = stoi(inMsg[3]);
				string starts = inMsg[3] + "," + to_string(end);

				string localfile = "", sdfsFile = inMsg[2];
				if (localFilelist.find(inMsg[2]) != localFilelist.end()) localfile = localFilelist[inMsg[2]];
				else localfile = inMsg[2];

				vector<string> unDirectory = splitString(sdfsFile, "-"); //get rid of timestamp
				string fileDest = "tmp-"+inMsg[3]+"-"+sdfsFile.substr(unDirectory[0].size()+1);

				int lineCounter = -1, numbytes = 0, readLines = 0;
				ifstream file(localfile);
				string str;
				while (getline(file, str) && (lineCounter < end - 1))
				{
					lineCounter++;
					if (lineCounter < start) continue;
					numbytes += (str.size());
					readLines++;
				}
				file.clear();  // clear fail and eof bits
				file.seekg(0); // back to the start!

				string toSend = localfile + "," + to_string(numbytes);
				string header = sdfsFile + "," + inMsg[1] + "," + inMsg[3];
				//processor, return type, file dest, header, toSend, starts
				string sendMsg = inMsg[0] + "::" + to_string(CHUNKACK) + "::" + fileDest + "::" + header + "::" + toSend + "::" + starts;
				this->tcpServent->mergeMessages.push(sendMsg);
				break;
			}

			case CHUNKACK: {
				cout << "[CHUNKACK] receiving the put worked!" << endl;
				//IP, exec, start, temp, sdfs file
				if (!isLeader) {
					//forward to know that the file was put okay
					if (tcpServent->sendMessage(leaderIP, TCPPORT, msg.toString())){
						cout << "[CHUNKACK] bad send 1" << endl;
						tcpServent->regMessages.push(msg.toString());
						break;
					}
					string ackStr = nodeInformation.ip + "::" + inMsg[4] + "::" + inMsg[2]; //IP, file, chunk
					cout << "[CHUNKACK] sending maple ack " << ackStr << endl;
					Messages ackMsg(MAPLEACK, ackStr);
					if (tcpServent->sendMessage(leaderIP, TCPPORT, ackMsg.toString())){
						cout << "[CHUNKACK] bad send 2" << endl;
						tcpServent->regMessages.push(msg.toString());
						break;
					}
					string execName = EXEC_CMD + inMsg[1];
					if (runExecutable(execName, inMsg[3]) < 0) { cout << "[EXEC] ERROR" << endl; break;}
					break;
				}
				cout << "[CHUNKACK] leader confirming " << inMsg[4] << "::" << inMsg[2] << " was received" << endl;
				mapleSending[make_tuple(inMsg[4], inMsg[2])] = make_tuple(inMsg[0], 1);
				break;
			}

			case MAPLEACK: {
				if (isLeader){
					vector<tuple<string,string>> temp;
					cout << "[MAPLEACK] " << inMsg[0] << " processed " << inMsg[1] << "," << inMsg[2] << " | remaining: ";
					for (auto &e : workerTasks[inMsg[0]]){
						if (!get<0>(e).compare(inMsg[1]) && !get<1>(e).compare(inMsg[2])){
								temp.push_back(e);
						}
						else cout << "(" << get<0>(e) << "," << get<1>(e) << ") | ";
					}
					cout << endl;
					for (auto &e : temp) workerTasks[inMsg[0]].erase(e);
					if (!workerTasks[inMsg[0]].size()) {
						Messages outMsg(STARTMERGE, "");
						this->tcpServent->sendMessage(inMsg[0], TCPPORT, outMsg.toString());
					}
				}
				break;
			}

			case STARTMERGE: {
				if (!isLeader){
					auto entireDir = splitString(tcpServent->getDirMetadata(), ",");
					vector<string> unsentDir;
					int idx = 0, range = entireDir.size();
					while (idx < range){
						if(sentLocally.count(entireDir[idx]) == 0) {
							unsentDir.push_back(entireDir[idx]);
							unsentDir.push_back(entireDir[idx+1]);
						}
						idx += 2;
					}
					int filesHere = unsentDir.size() / 2;
					Messages outMsg(STARTMERGE, nodeInformation.ip + "::" + to_string(filesHere));
					this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString());
					if (filesHere == 0) break;
					string toProcess = "";
					int index = 0, dirRange = unsentDir.size(), partition = 10;
					while (index < dirRange){
						while ((index < dirRange) && (partition > 0)){
							if (toProcess.size()) toProcess += ",";
							toProcess += (unsentDir[index] + "," + unsentDir[index+1]);
							sentLocally.insert(unsentDir[index]);
							index += 2;
							partition--;
						}
						cout << "[STARTMERGE] " << toProcess << endl;
						string sendMsg = leaderIP + "::" + to_string(MAPLEACK) + "::" + "::" + "::" + toProcess + "::";
						this->tcpServent->mergeMessages.push(sendMsg);
						partition = 10;
						toProcess = "";
					}
				}
				else{
					filesAtWorker[inMsg[0]] = stoi(inMsg[1]);
					Messages alreadyDone(MERGECOMPLETE, inMsg[0] + "::");
					if (filesAtWorker[inMsg[0]] == 0) tcpServent->regMessages.push(alreadyDone.toString());
				}
				break;
			}

			case MERGECOMPLETE: {
				struct dirent *entry = nullptr;
			    DIR *dp = nullptr;
				string match = "tmp-" + inMsg[0] + "-";
				auto mergedFiles = splitString(inMsg[1], ",");
				set<string> mergeSet;
				string toInsert = "";
				for (auto &e: mergedFiles) mergeSet.insert(match + e);
				filesAtWorker[inMsg[0]] -= mergedFiles.size();
			    if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;}
				//cout << "[MERGECOMPLETE] processing files matching " << match << " and replacing with prefix: " << sdfsPre << ". Files left: " << to_string(filesAtWorker[inMsg[0]]) << endl;
			    while ((entry = readdir(dp))){
			        if (mergeSet.count(entry->d_name)){
			            string entryName(entry->d_name);
						toInsert = entryName.substr(match.size());
						mapleKeys.insert(toInsert);
						string mapleOutput = sdfsPre + toInsert;
						ofstream keyFile;
						keyFile.open(mapleOutput, ios::app);
						ifstream toMerge(entry->d_name);
						if (!toMerge.is_open() || !keyFile.is_open()) {
							cout << "bad file permissions for " << entry->d_name << " and/or " << mapleOutput << endl;
							mapleKeys.erase(toInsert);
							break;
						}
						keyFile << toMerge.rdbuf();
						keyFile.close();
						remove(entry->d_name);
						mergeSet.erase(entry->d_name);
			        }
			    }
				if (filesAtWorker[inMsg[0]] <= 0){
					int oldSize = workerTasks.size();
					workerTasks.erase(inMsg[0]);
					cout << "[WORKERS] " << to_string(oldSize) << " -> " << to_string(workerTasks.size()) << " remaining" << endl;
				}
				//Done with maple phase
				if (!workerTasks.size()) {
					auto it = mapleKeys.begin();
					while (it != mapleKeys.end()){
						string file = sdfsPre + (*it);
						it++;
						updateFileList(file, hashRingPosition);
						fileSizes[file] = make_tuple(-1, -1);
					}
					cout << "[MAPLE] ------------ complete ---------- " << endl;
					resetMapleJuice();
				}
				break;
			}

			case MERGEFAIL: {
				cout << "[MERGEFAIL] retrying!!!!!!!!! at " << nodeInformation.ip << endl;
				Messages outMsg(STARTMERGE, "");
				this->tcpServent->sendMessage(inMsg[0], TCPPORT, outMsg.toString());
				break;
			}

			case PUTACK: {
				if(inMsg.size() >= 4){
					string inMsgIP = inMsg[0], sdfsfilename = inMsg[1];
					string localfilename = inMsg[2], remoteLocalname = inMsg[3];
					//cout << "[PUTACK] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " localfilename: " << localfilename << endl;
					localFilelist[sdfsfilename] = localfilename;
					Messages outMsg(ACK, to_string(this->hashRingPosition)+"::"+sdfsfilename+"::"+remoteLocalname);
					this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString());
				}
				break;
			}
			case DELETE: {
				if (isLeader) {
					if(inMsg.size() >= 2){
						string inMsgIP = inMsg[0], sdfsfilename = inMsg[1];
						//cout << "[DELETE] " << "inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << endl;
						if (localFilelist.find(sdfsfilename) != localFilelist.end()) remove(localFilelist[sdfsfilename].c_str());
						localFilelist.erase(sdfsfilename);
						fileList.erase(sdfsfilename);
						fileSizes.erase(sdfsfilename);
						// This is TCP, so we don't need to ACK
					}
				}
				break;
			}
			case DNSGET: {
				if(isLeader){
					//cout << "msg.payload " << msg.payload << endl;
					if(inMsg.size() >= 4){
						string inMsgIP = inMsg[0];
						int nodePosition = stoi(inMsg[1]);
						int selectedNodePosition = nodePosition;
						string sdfsfilename = inMsg[2], localfilename = inMsg[3];
						//cout << "[DNSGET] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename << ", localfilename: " << localfilename << endl;
						vector<int> positions = fileList[sdfsfilename];
						if (positions.size() == 0) {
							// the file is not available
							//cout << "[DNSGET] sdfsfilename " << sdfsfilename << " is not available" << endl;
							fileList.erase(sdfsfilename);
							fileSizes.erase(sdfsfilename);
							Messages outMsg(GETNULL, sdfsfilename+": the file is not available::");
							this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString());
							break;
						}
						unsigned seed = chrono::system_clock::now().time_since_epoch().count();
  						shuffle (positions.begin(), positions.end(), default_random_engine(seed));
						for (uint i=0; i<positions.size(); i++) { // pick any node other than the requested node
							//cout << positions[i] << " ";
							if (positions[i]!=nodePosition && positions[i]!=hashRingPosition) {
								selectedNodePosition = positions[i];
							}
						}
						if (selectedNodePosition==nodePosition) selectedNodePosition = hashRingPosition;
						//cout << endl;
						//cout << "[DNSGET] we picks " << selectedNodePosition << endl;
						pendingRequests[sdfsfilename] = tuple<int, int, int>(-1, -1, nodePosition);
						pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, true, true);
						string nodeIP = hashRing->getValue(selectedNodePosition);
						pendingSenderRequests[sdfsfilename] = tuple<string, string, string>("", "", nodeIP);
						Messages outMsg(REREPLICATEGET, to_string(nodePosition) + "::" + sdfsfilename+ "::" +localfilename);
						//cout << "[DNSGET] Ask node " << nodeIP << " to replicate on pos ";
						//cout << to_string(nodePosition) << endl;
						this->tcpServent->sendMessage(nodeIP, TCPPORT, outMsg.toString());

					}
				}
				break;
			}
			case DNS: {
				// TODO: finish DNS functionality here, send out DNSANS
				if(isLeader){
					// Check hashring, get positions and send out DNS ANS
					isBlackout = true;
					if(inMsg.size() >= 7){
						string inMsgIP = inMsg[0];
						int nodePosition = stoi(inMsg[1]);
						string sdfsfilename = inMsg[2];
						string localfilename = inMsg[3];
						long int size = stol(inMsg[4]);
						int lines = stoi(inMsg[5]);
						string overwriteFilename = inMsg[6];
						//cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename;
						//cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl;
						// update fileList, client itself is one of the replicas
						updateFileList(sdfsfilename, nodePosition);
						fileSizes[sdfsfilename] = make_tuple(size, lines);
						//hashRing->debugHashRing();
						int closestNode = hashRing->locateClosestNode(sdfsfilename);
						int pred = hashRing->getPredecessor(closestNode);
						int succ = hashRing->getSuccessor(closestNode);
						if (hashRing->getValue(closestNode).compare(inMsgIP)==0) {
							closestNode = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ));
							//cout << "[DNS] we need one more node " << closestNode << endl;
						}
						if (hashRing->getValue(pred).compare(inMsgIP)==0) {
							pred = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ));
							//cout << "[DNS] we need one more node " << pred << endl;
						}
						if (hashRing->getValue(succ).compare(inMsgIP)==0) {
							succ = hashRing->getRandomNode(tuple<int, int, int>(closestNode, pred, succ));
							//cout << "[DNS] we need one more node " << succ << endl;
						}
						//cout << "[DNS] we have nodes [" << closestNode << " (closestNode), ";
						//cout << pred << " (pred), " << succ << " (succ)], reply " << closestNode << endl;
						pendingRequests[sdfsfilename] = tuple<int, int, int>(closestNode, pred, succ);
						pendingRequestSent[sdfsfilename] = tuple<int, int, int>(true, false, false);
						pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(inMsgIP, "", "");
						Messages outMsg(DNSANS, to_string(closestNode) + "::" + localfilename + "::" + sdfsfilename + "::" + overwriteFilename);
						this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString());
					}
				}

				break;
			}
			case DNSANS:{
				// Read the answer and send a PUT msg to dest
				if(inMsg.size() >= 4){
					int nodePosition = stoi(inMsg[0]);
					// since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address
					string nodeIP = hashRing->getValue(nodePosition);
					//cout << "nodeIP " << nodeIP << endl;
					//cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP << " using localfilename: " << inMsg[1] << endl;
					string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3];
					this->tcpServent->pendSendMessages.push(sendMsg);
				}
				break;
			}
			case REREPLICATEGET: {
				if (inMsg.size() >= 3) {
					int nodePosition = stoi(inMsg[0]);
					// since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address
					string nodeIP = hashRing->getValue(nodePosition);
					string sdfsfilename = inMsg[1];
					string remoteLocalfilename = inMsg[2];
					string localfilename = this->localFilelist[sdfsfilename];
					//cout << "[REREPLICATEGET] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl;
					//cout << "[REREPLICATEGET] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl;
					string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::"+remoteLocalfilename;
					this->tcpServent->pendSendMessages.push(sendMsg);
				}
				break;
			}
			case REREPLICATE:{
				// Read the answer and send a PUT msg to dest
				if (inMsg.size() >= 2) {
					int nodePosition = stoi(inMsg[0]);
					// since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address
					string nodeIP = hashRing->getValue(nodePosition);
					string sdfsfilename = inMsg[1];
					string localfilename = this->localFilelist[sdfsfilename];
					//cout << "[REREPLICATE] Got a request of sdfsfilename " << sdfsfilename << " to nodeIP " << nodeIP << endl;
					//cout << "[REREPLICATE] Put localfilename " << localfilename << " to nodeIP " << nodeIP << endl;
					string sendMsg = nodeIP+"::"+localfilename+"::"+sdfsfilename+"::";
					this->tcpServent->pendSendMessages.push(sendMsg);
				}
				break;
			}
			case GETNULL: {
				if (inMsg.size() >= 1) {
					cout << "[GETNULL] " << inMsg[0] << endl;
				}
				break;
			}

			case ACK:{
				if (inMsg.size() >= 3) {
					string nodePosition = inMsg[0];
					string sdfsfilename = inMsg[1];
					string localfilename = inMsg[2];
					localFilelist[sdfsfilename] = localfilename;
					Messages outMsg(LEADERACK, this->nodeInformation.ip + "::" + to_string(this->hashRingPosition) + "::" + msg.payload);
					//cout << "[ACK] Done replicated sdfsfilename " << sdfsfilename;
					//cout << " on node " << nodePosition << ", and ACK back to the leader" << endl;
					this->tcpServent->sendMessage(leaderIP, TCPPORT, outMsg.toString());
				}

				break;
			}
			case LEADERACK:{
				if(isLeader){
					if(inMsg.size() >= 4){
						string inMsgIP = inMsg[0];
						int inMsgnodePosition = stoi(inMsg[1]);
						int nodePosition = stoi(inMsg[2]);
						string sdfsfilename = inMsg[3];
						string replicatedNodeIP = hashRing->getValue(nodePosition);

						//cout << "[LEADERACK] Got ACK inMsgIP: " << inMsgIP << " sdfsfilename: " << sdfsfilename << " done on " << replicatedNodeIP << endl;
						string closestNodeIP = "";

						// update fileList
						updateFileList(sdfsfilename, inMsgnodePosition);
						updateFileList(sdfsfilename, nodePosition);

						vector<int> temp;
						//cout << "pendingRequests: ";
						if (get<0>(pendingRequests[sdfsfilename]) == nodePosition) {
							closestNodeIP = hashRing->getValue(get<0>(pendingRequests[sdfsfilename]));
							temp.push_back(-1);
						} else {
							temp.push_back(get<0>(pendingRequests[sdfsfilename]));
						}
						//cout << temp[0] << " (sent: " << get<0>(pendingRequestSent[sdfsfilename]);
						//cout << ", from " << get<0>(pendingSenderRequests[sdfsfilename]) << "), ";
						if (get<1>(pendingRequests[sdfsfilename]) == nodePosition) {
							temp.push_back(-1);
						} else {
							temp.push_back(get<1>(pendingRequests[sdfsfilename]));
						}
						//cout << temp[1] << " (sent: " << get<1>(pendingRequestSent[sdfsfilename]);
						//cout << ", from " << get<1>(pendingSenderRequests[sdfsfilename]) << "), ";
						if (get<2>(pendingRequests[sdfsfilename]) == nodePosition) {
							temp.push_back(-1);
						} else {
							temp.push_back(get<2>(pendingRequests[sdfsfilename]));
						}
						//cout << temp[2] << " (sent:" << get<2>(pendingRequestSent[sdfsfilename]);
						//cout << ", from " << get<2>(pendingSenderRequests[sdfsfilename]) << ")" << endl;
						pendingRequests[sdfsfilename] = tuple<int, int, int>(temp[0], temp[1], temp[2]);

						if(get<1>(pendingRequests[sdfsfilename]) == -1 && get<2>(pendingRequests[sdfsfilename])== -1){
							pendingRequests.erase(sdfsfilename);
							pendingRequestSent.erase(sdfsfilename);
							pendingSenderRequests.erase(sdfsfilename);
							//cout << "[LEADERACK] 3 or more Replicated files are done" << endl;
							isBlackout = false;
							break;
						}
						if((get<1>(pendingRequests[sdfsfilename])!=-1) && (!get<1>(pendingRequestSent[sdfsfilename]))){
							Messages outMsg(REREPLICATE, to_string(get<1>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename);
							// cout << "Sending out rereplicate to " << inMsgIP << "with message " << outMsg.toString() << endl;
							//cout << "[LEADERACK] Ask node incoming " << inMsgIP << " to replicate on pos ";
							//cout << to_string(get<1>(pendingRequests[sdfsfilename])) << endl;
							this->tcpServent->sendMessage(inMsgIP, TCPPORT, outMsg.toString());
							pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), true, get<2>(pendingRequestSent[sdfsfilename]));
							pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), inMsgIP, get<2>(pendingSenderRequests[sdfsfilename]));
						}
						if((get<2>(pendingRequests[sdfsfilename]) != -1) && (!get<2>(pendingRequestSent[sdfsfilename]))){
							Messages outMsg(REREPLICATE, to_string(get<2>(pendingRequests[sdfsfilename])) + "::" + sdfsfilename);
							// cout << "Sending out rereplicate to " << closestNodeIP << "with message " << outMsg.toString() << endl;
							//cout << "[LEADERACK] Ask node closest " << closestNodeIP << " to replicate on pos ";
							//cout << to_string(get<2>(pendingRequests[sdfsfilename])) << endl;
							this->tcpServent->sendMessage(closestNodeIP, TCPPORT, outMsg.toString());
							pendingRequestSent[sdfsfilename] = tuple<int, int, int>(get<0>(pendingRequestSent[sdfsfilename]), get<1>(pendingRequestSent[sdfsfilename]), true);
							pendingSenderRequests[sdfsfilename] = tuple<string, string, string>(get<0>(pendingSenderRequests[sdfsfilename]), get<1>(pendingSenderRequests[sdfsfilename]), inMsgIP);
						}
					}
				}
				break;
			}
			default:
				break;
		}
		qCopy.pop();
	}
}

void Node::listLocalFiles(){
	cout << "sdfsfilename ---> localfilename" << endl;
	for (auto& element: localFilelist) {
		cout << element.first << " ---> " << element.second << endl;
	}
}

void Node::listSDFSFileList(string sdfsfilename) {
	bool found = false;
	vector<int> foundPositions;
	for (auto& element: fileList) {
		if(element.first.compare(sdfsfilename)==0) { // found sdfsfilename
			found = true;
			foundPositions = element.second;
			break;
		}
	}
	if (found) {
		cout << "sdfsfilename " << sdfsfilename << " is stored at..." << endl;
		if (foundPositions.size() > 0) {
			for (uint i=0; i<foundPositions.size(); i++) {
				string storedIP = hashRing->getValue(foundPositions[i]);
				cout << storedIP << " at " << foundPositions[i] << endl;
			}
		} else { cout << "=== Current list is empty ===" << endl; }
	} else {
		cout << "sdfsfilename " << sdfsfilename << " is not existed" << endl;
	}
}

/*
 * Leader sends out fileList in the following string format:
 * first 2 bytes are filename len, FILENAME msg type, filename itself,
 * 2 bytes for the number of positions the file has, FILEPOSITION msg type,
 * and a string of a commas seperated list of positions following that, ending in null byte.
 * All files are encapsulated in this way and joined to make one string
*/
string Node::encapsulateFileList()
{
	string enMeg = "";
	if (checkLeaderExist() && isLeader) {
		for (auto& element: fileList) {
			string positions = "";
			string sdfsfilename = element.first;

			for (uint i=0; i<element.second.size(); i++) {
				positions += to_string(element.second[i]);
				if (i != element.second.size()-1) {
					positions += ",";
				}
			}
			//cout << "sdfsfilename " << sdfsfilename << endl;
			//cout << "positions " << positions << endl;
			string size = to_string(get<0>(fileSizes[sdfsfilename])) + "," + to_string(get<1>(fileSizes[sdfsfilename]));
			char *cstr = new char[sdfsfilename.length()+positions.length()+size.length()+3+3+3+1];
			size_t len = sdfsfilename.length()+3;
			int index = 0;
			cstr[index++] = len & 0xff;
			cstr[index] = (len >> 8) & 0xff;
			if (cstr[index] == 0) { // avoid null
				cstr[index] = 0xff;
			}
			index++;
			//printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]);
			cstr[index++] = FILENAME;
			for (uint i=0; i<sdfsfilename.length(); i++) {
				cstr[index+i] = sdfsfilename.c_str()[i];
			}
			index += sdfsfilename.length();
			size_t len2 = positions.length()+3;
			cstr[index++] = len2 & 0xff;
			cstr[index] = (len2 >> 8) & 0xff;
			if (cstr[index] == 0) { // avoid null
				cstr[index] = 0xff;
			}
			index++;
			//printf("cstr[3] %x, cstr[4] %x\n", cstr[0], cstr[1]);
			cstr[index++] = FILEPOSITIONS;
			//printf("cstr[%lu] %d\n", sdfsfilename.length()+2, cstr[sdfsfilename.length()+2]);
			for (uint i=0; i<positions.length(); i++) {
				cstr[index+i] = positions.c_str()[i];
			}
			index += positions.length();
			size_t len3 = size.length()+3;
			cstr[index++] = len3 & 0xff;
			cstr[index] = (len3 >> 8) & 0xff;
			if (cstr[index] == 0) { // avoid null
				cstr[index] = 0xff;
			}
			index++;
			cstr[index++] = FILESIZE;
			for (uint i=0; i<size.length(); i++) {
				cstr[index+i] = size.c_str()[i];
			}
			index += size.length();
			cstr[index] = '\0';
			//printf("cstrFile %s\n", cstr);
			string enMegFile(cstr);
			//cout << "enMegFile " << enMegFile << endl;
			enMeg += enMegFile;
		}
		//cout << "encapsulateFileList " << enMeg << endl;
	}
	return enMeg;
}

//(len, PayloadType, message, \0) encoding where len is 2 bytes.
string Node::encapsulateMessage(map<PayloadType,string> payloads)
{
	string enMeg = "";
	//cout << "payloads.size " << payloads.size() << endl;
	for (auto& element: payloads) {
		PayloadType type = element.first;
		string message = element.second;
		//cout << "message " << message << endl;
		//cout << "message.length " << message.length() << endl;
		//cout << "type " << type << endl;
		char *cstr = new char[message.length()+4];
		size_t len = message.length()+3;
		cstr[0] = len & 0xff;
		cstr[1] = (len >> 8) & 0xff;
		if (cstr[1] == 0) { // avoid null
			cstr[1] = 0xff;
		}
		//printf("cstr[0] %x, cstr[1] %x\n", cstr[0], cstr[1]);
		cstr[2] = type;
		//printf("cstr[2] %x\n", cstr[2]);
		for (uint i=0; i<message.length(); i++) {
			cstr[i+3] = message.c_str()[i];
		}
		cstr[message.length()+3] = '\0';
		//printf("cstrMsg %s\n", cstr);
		string enMegPart(cstr);
		//cout << "enMegPart " << enMegPart << endl;
		enMeg += enMegPart;
	}
	//cout << "encapsulateMessage " << enMeg << endl;
	return enMeg;
}

void Node::decapsulateFileList(string payload)
{
	int size = payload.length();
	uint pos = 0;
	fileList.clear();
	fileSizes.clear();
	string lastFilename = "";
	while (size > 0) {
		size_t length;
		if ((payload.c_str()[1+pos] & 0xff) == 0xff) {
			length = 0;
		} else {
			length = (payload.c_str()[1+pos]) & 0xff;
			length = length << 8;
		}
		length += (payload.c_str()[0+pos]) & 0xff;
		PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]);
		//printf(" len %lu, type %d\n", length, type);
		char cstr[length];
		bzero(cstr, sizeof(cstr));
		for (uint i=3; i<length; i++) {
			cstr[i-3] = payload.c_str()[pos+i];
		}
		string deMegPart(cstr);
		switch (type) {
			case FILENAME: {
				//cout << "FILENAME " << deMegPart << endl;
				lastFilename = deMegPart;
				break;
			}
			case FILEPOSITIONS: {
				//cout << "FILEPOSITIONS " << deMegPart << endl;
				vector<string> temp = splitString(deMegPart, ",");
				vector<int> positions;
				for (uint i=0; i<temp.size(); i++) {
					if (temp[i].compare("")!=0) {
						positions.push_back(stoi(temp[i]));
					}
				}
				fileList[lastFilename] = positions;
				break;
			}
			case FILESIZE: {
				vector<string> temp = splitString(deMegPart, ",");
				fileSizes[lastFilename] = make_tuple(stol(temp[0]),stoi(temp[1]));
				break;
			}
			default:
				break;
		}
		size -= length;
		pos += length;
	}

	// check with local file list
	if (!isLeader) {
		vector<string> fileToDelete;
		for (auto& element: localFilelist) {
			//cout << "sdfsfilename " << element.first << endl;
			if (fileList[element.first].size() == 0) {
				fileToDelete.push_back(element.first);
			}
		}
		for (uint i=0; i<fileToDelete.size(); i++) {
			if (localFilelist.find(fileToDelete[i]) != localFilelist.end()) remove(localFilelist[fileToDelete[i]].c_str());
			localFilelist.erase(fileToDelete[i]);
			//cout << "[DELETE] sdfsfilename " << fileToDelete[i] << endl;
		}
	}
}

string Node::decapsulateMessage(string payload)
{
	int size = payload.length();
	uint pos = 0;
	//cout << "payload " << payload << endl;
	//cout << "size " << size << endl;
	string deMeg = "";
	while (size > 0) {
		size_t length;
		if ((payload.c_str()[1+pos] & 0xff) == 0xff) {
			length = 0;
		} else {
			length = (payload.c_str()[1+pos]) & 0xff;
			length = length << 8;
		}
		length += (payload.c_str()[0+pos] & 0xff);
		//printf("lengthMeg %x %x %lu\n", payload.c_str()[0+pos], payload.c_str()[1+pos], length);

		PayloadType type = static_cast<PayloadType>(payload.c_str()[2+pos]);
		//printf(" len %lu, type %d\n", length, type);
		char cstr[length];
		bzero(cstr, sizeof(cstr));
		for (uint i=3; i<length; i++) {
			cstr[i-3] = payload.c_str()[pos+i];
		}
		//printf("cstr %s\n", cstr);
		string deMegPart(cstr);
		//cout << "deMegPart " << deMegPart << endl;
		if (type == REGULAR) {
			deMeg = deMegPart;
		} else if (type == FILEPAIR) {
			if (checkLeaderExist() && !isLeader) {
				//cout << "FILEPAIR " << deMegPart << endl;
				decapsulateFileList(deMegPart);
			}
		}
		//cout << "size1 " << size << endl;
		size -= length;
		pos += length;
		//cout << "size2 " << size << endl;
	}
	//cout << "deMeg " << deMeg << endl;
	return deMeg;
}

// piggyback fileList in heartbeat
string Node::populateSDFSFileList(MessageType type, string mem_list_to_send)
{
	Messages msg(type, mem_list_to_send);
	//cout << "populateSDFSFileList " << msg.toString() << endl;
	map<PayloadType,string> payloads;
	payloads[REGULAR] = msg.toString();
	if (isLeader) { // Only the leader includes the fileList
		payloads[FILEPAIR] = encapsulateFileList();
	}
	string enMeg = encapsulateMessage(payloads);
	return enMeg;
}

void Node::resetMapleJuice(){
	cleanupTmpFiles("tmp-");
	workerProcessing.clear();
	sentLocally.clear();
	workerTasks.clear();
	mapleSending.clear();
	filesAtWorker.clear();
	workerRing->clear();
	isJuicePhase = false;
	isMaplePhase = false;
}