Skip to content
Snippets Groups Projects
Commit 9cad1036 authored by ajf5's avatar ajf5
Browse files

logging

parent c1d3a325
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -16,6 +16,7 @@
using namespace std;
string getMostRecentFile(string readfile);
void cleanupTmpFiles();
class FileObject {
public:
......
......@@ -33,9 +33,8 @@ vector<string> splitString(string s, string delimiter);
string getIP();
string getIP(const char * host);
int new_thread_id();
void handlePipe(int file, string prefix);
void handlePipe(int file);
bool isInVector(vector<int> v, int i);
void sigchld_handler(int s);
//adapted from https://stackoverflow.com/questions/23030267/custom-sorting-a-vector-of-tuples
......
......@@ -37,3 +37,16 @@ string getMostRecentFile(string readfile){
sort(fileVersions.begin(), fileVersions.end());
return fileVersions[fileVersions.size()-1];
}
void cleanupTmpFiles(){
struct dirent *entry = nullptr;
DIR *dp = nullptr;
string match = "tmp-";
int matchLen = match.size();
if ((dp = opendir(".")) == nullptr) { cout << "tmp directory error " << endl;}
while ((entry = readdir(dp))){
if (strncmp(entry->d_name, match.c_str(), matchLen) == 0){
remove(entry->d_name);
}
}
}
......@@ -747,7 +747,7 @@ void Node::checkFileListConsistency(){
Messages outMsg(DNSGET, nodeInfo + "::" + to_string(nodesToCheck[i]) + "::" + element.first + "::");
tuple<int, int, int> request = pendingRequests[element.first];
if(get<0>(request) != -1 || get<1>(request) != -1 || get<2>(request) != -1){
cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl;
//cout << "on put " << get<0>(request) << "/" << get<1>(request) << "/" << get<2>(request) << endl;
break;
}
pendingRequests[element.first] = tuple<int, int, int>(-1, -1, nodesToCheck[i]);
......@@ -823,6 +823,7 @@ void Node::handleTcpMessage()
//currently running something, dont start a new phase
if (mapleProcessing.size()) {tcpServent->regMessages.push(msg.toString()); cout << "[MAPLE] already mapling" << endl; break;}
cout << "[MAPLE] Leader starting new Maple phase" << endl;
cleanupTmpFiles();
if (inMsg.size() >= 4){
string mapleExe = inMsg[0], num_maples = inMsg[1], sdfsPre = inMsg[2], sdfs_dir = inMsg[3] + "-";
int workers = stoi(num_maples);
......@@ -830,17 +831,16 @@ void Node::handleTcpMessage()
if (workers > hashRing->nodePositions.size()-2) workers = hashRing->nodePositions.size()-2;
int total_lines = 0;
vector<tuple<string,int>> directory;
cout << "[DIRECTORY] " << sdfs_dir << endl;
cout << "[DIRECTORY] " << sdfs_dir;
for (auto &e: fileSizes){
cout << e.first << " | " << to_string(get<1>(e.second));
cout << e.first << " | " << to_string(get<1>(e.second)) << " ";
if (strncmp(e.first.c_str(), sdfs_dir.c_str(), sdfs_dir.size()) == 0){
cout << " was a match ";
directory.push_back(make_tuple(e.first, get<1>(e.second)));
total_lines += get<1>(e.second);
}
cout << endl;
}
cout << "[MAPLE] need to process " << to_string(total_lines) << endl;
cout << endl << "[MAPLE] need to process " << to_string(total_lines) << endl;
vector<tuple<string,string,string>> aliveNodes;
for (auto &e : membershipList) if (get<0>(e.first).compare(nodeInformation.ip)) aliveNodes.push_back(e.first);
vector<tuple<string,string,string>> mapleNodes = randItems(workers, aliveNodes);
......@@ -892,7 +892,7 @@ void Node::handleTcpMessage()
pid_t pid = fork();
if (pid){ //parent process, DONT need to waitpid because of signal handler set up
close(dataPipe[1]);
handlePipe(dataPipe[0], sdfsPre);
handlePipe(dataPipe[0]);
} else if (pid < 0) {
fprintf (stderr, "Fork failed.\n"); break;
} else { //child process
......@@ -1038,8 +1038,8 @@ void Node::handleTcpMessage()
int lines = stoi(inMsg[5]);
string overwriteFilename = inMsg[6];
string overwrite = inMsg[7];
cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename;
cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl;
//cout << "[DNS] Got " << "inMsgIP: " << inMsgIP << ", sdfsfilename: " << sdfsfilename;
//cout << ", localfilename: " << localfilename << ", pos: " << nodePosition << endl;
// update fileList, client itself is one of the replicas
updateFileList(sdfsfilename, nodePosition);
fileSizes[sdfsfilename] = make_tuple(size, lines);
......@@ -1078,8 +1078,8 @@ void Node::handleTcpMessage()
// since we do not keep files in hashRing, the value itself is IPaddress, not NODE:IP_Address
string nodeIP = hashRing->getValue(nodePosition);
//cout << "nodeIP " << nodeIP << endl;
cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP;
cout << " using localfilename: " << inMsg[1] << endl;
//cout << "[DNSANS] " << "we will put sdfsfilename: " << inMsg[2] << " to nodeIP: " << nodeIP;
//cout << " using localfilename: " << inMsg[1] << endl;
string sendMsg = nodeIP+"::"+inMsg[1]+"::"+inMsg[2]+"::"+inMsg[3]+"::"+inMsg[4];
this->tcpServent->pendSendMessages.push(sendMsg);
}
......
......@@ -293,7 +293,7 @@ int TcpSocket::messageHandler(int sockfd, string payloadMessage, string returnIP
case CHUNK:
case CHUNKACK:
case DNS:{
cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl;
//cout << "["<< messageTypes[msg.type] << "] payloadMessage: " << payloadMessage << endl;
regMessages.push(payloadMessage); //handle from queue
break;
}
......
......@@ -50,9 +50,9 @@ bool isInVector(vector<int> v, int i){
return false;
}
void handlePipe(int file, string prefix) {
void handlePipe(int file) {
size_t bufSize = 1024;
cout << "[PIPE] sleeping for data. " << " Prefix: " << prefix << endl;
cout << "[PIPE] sleeping for data. " << endl;
sleep(5);
FILE *stream = fdopen(file, "r"); FILE *tmp;
char str[bufSize];
......@@ -62,7 +62,7 @@ void handlePipe(int file, string prefix) {
lines++;
std::string key(strtok(str, delim));
std::string val(strtok(NULL, delim));
string keyFile = "tmp-" + prefix + "-" + key;
string keyFile = "tmp-" + key;
string write = key + "," + val + "\n";
tmp = fopen(keyFile.c_str(), "ab");
fwrite(write.c_str(),sizeof(char),write.size(),tmp);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment