diff --git a/client.c b/client.c index f4473d62f8f77c03ee7eed72e9b4673e26e9cbe9..effd565d1bf6438cec5074d70931530826755351 100644 --- a/client.c +++ b/client.c @@ -8,6 +8,7 @@ //Users/Stefan/Desktop/CS425/mp2_local/client.c #define _XOPEN_SOURCE 700 +#define _BSD_SOURCE #include "client.h" #include <sys/types.h> @@ -109,20 +110,23 @@ int getSuccessor(int hashNum, int numberOfServers) int clientConnect(char * buffercp, char * filename) { + int numberOfTestedServers = 7; /* Insert IPs into ipList and edit array size accordingly. Use 127.0.0.1 to test on local machines */ - char * ipList[1] = {"127.0.0.1"}; - //char * ipList[7] = {"172.22.150.218", "172.22.150.219", "172.22.150.220", "172.22.150.221", "172.22.150.222", "172.22.150.223", "172.22.150.224"}; + //char * ipList[1] = {"127.0.0.1"}; + char * ipList[7] = {"172.22.150.218", "172.22.150.219", "172.22.150.220", "172.22.150.221", "172.22.150.222", "172.22.150.223", "172.22.150.224"}; int connections = 0; unsigned long numberOfIps = sizeof(ipList)/sizeof(char *); - FILE * fptr = fopen(filename, "w"); + FILE * fptr; printf("buffer = %s\n", buffercp); + buffercp[strlen(buffercp) - 1] = '\0'; - char * buffer = malloc(strlen(buffercp) + 1); + char * buffer = malloc(256); + memset(buffer, '\0', 256); memcpy(buffer, buffercp, strlen(buffercp) + 1); char output[256]; @@ -131,6 +135,7 @@ int clientConnect(char * buffercp, char * filename) if(strstr(buffer, "grep") != NULL) { + fptr = fopen(filename, "w"); for(int i = 0; i < (int) numberOfIps; i++) { @@ -148,7 +153,9 @@ int clientConnect(char * buffercp, char * filename) } connections++; + close(fd); } + fclose(fptr); } else if (strstr(buffer, "put") != NULL) @@ -158,15 +165,15 @@ int clientConnect(char * buffercp, char * filename) char * lfs = strtok(NULL," "); char * dfs = strtok(NULL," "); - dfs[strlen(dfs)-1] = '\0'; printf("put = %s, 2 = %s, 3 = %s, %s\n", pch, lfs, dfs, buffercp); - int hash = computeHashFunction(dfs, 3); + int hash = computeHashFunction(dfs, numberOfTestedServers); int fd = connectToServer(ipList[hash]); printf("Hash number = %d\n", hash); - write(fd, buffercp, strlen(buffercp)); - + write(fd, buffercp, 256); + + sleep(1); FILE * fp = fopen(lfs, "r"); char *output2 = malloc(256); @@ -178,7 +185,7 @@ int clientConnect(char * buffercp, char * filename) write(fd, output2, count); } - + fclose(fp); free(output2); } @@ -189,34 +196,39 @@ int clientConnect(char * buffercp, char * filename) char * lfs = strtok(NULL," "); char * dfs = strtok(NULL," "); - dfs[strlen(dfs)-1] = '\0'; printf("get = %s, 2 = %s, 3 = %s, %s\n", pch, lfs, dfs, buffercp); - int hash = computeHashFunction(lfs, 3); - int fd = connectToServer(ipList[hash]); - write(fd, buffercp, strlen(buffercp)); + int hash = computeHashFunction(lfs, numberOfTestedServers); + printf("Hash = %d\n", hash); FILE * ptr = fopen(dfs, "w"); - int successor = getSuccessor(hash, 3); - int predecessor = getPredecessor(hash, 3); + int successor = getSuccessor(hash, numberOfTestedServers); + int predecessor = getPredecessor(hash, numberOfTestedServers); int i = 0; - int fds[3] = {fd,successor,predecessor}; + int fds[3] = {hash,successor,predecessor}; while (i < 3) { - fd = fds[i]; - while(read(fd, output, 256) > 0) - { - printf("Output: %s", output); - fprintf(ptr, "%s", output); - memset(output, '\0', 256); - i = 3; + + int fd = connectToServer(ipList[fds[i]]); + if(fd >= 0){ + + write(fd, buffercp, 256); + + while(read(fd, output, 256) > 0) + { + //printf("Output: %s", output); + fprintf(ptr, "%s", output); + memset(output, '\0', 256); + i = 3; + } + close(fd); } i++; - close(fd); + } if (i == 3) @@ -233,29 +245,74 @@ int clientConnect(char * buffercp, char * filename) pch = strtok (buffer," "); char * lfs = strtok(NULL," "); - lfs[strlen(lfs)-1] = '\0'; printf("delete = %s, 2 = %s, %s\n", pch, lfs, buffercp); - - int hash = computeHashFunction(lfs, 3); - int fd = connectToServer(ipList[hash]); - - write(fd, buffercp, strlen(buffercp)); + for(int i = 0; i < numberOfIps; i++) + { + int fd = connectToServer(ipList[i]); + if(fd < 0) + continue; + + write(fd, buffercp, 256); + close(fd); + } } else if (strstr(buffer, "store") != NULL) { + char * pch; + pch = strtok (buffer," "); + char * lfs = strtok(NULL," "); + //172.22.150.218 + char * ip = malloc(15); + sprintf(ip, "172.22.150.%s\0", lfs); + printf("Ip = %s\n", ip); + int fd = connectToServer(ip); + int timesRead = 0; + if(fd >= 0) + { + + write(fd, buffercp, 256); + //write(fd, buffer, strlen(buffer)); + + while(read(fd, output, 256) > 0) + { + //printf("Times read = %d\n", ++timesRead); + printf("Files on process %s: %s\n", lfs, output); + //fprintf(fptr, "From IP (%s): %s", ipList[i], output); + memset(output, '\0', 256); + } + close(fd); + } } else if (strstr(buffer, "list")) { + char * pch; + pch = strtok (buffer," "); + char * lfs = strtok(NULL," "); + + for(int i = 0; i < numberOfIps; i++) + { + int fd = connectToServer(ipList[i]); + if(fd < 0) + continue; + write(fd, buffercp, 256); + + while(read(fd, output, 256) > 0) + { + printf("File %s on process : %u\n", lfs, *(unsigned int *)output); + //fprintf(fptr, "From IP (%s): %s", ipList[i], output); + memset(output, '\0', 256); + } + close(fd); + } } free(buffer); free(buffercp); - - fclose(fptr); + return connections; } @@ -297,8 +354,8 @@ int UDPConnect(const char * ip){ int runUnitTest(){ //grep test test.txt - char * buff = malloc(32); - memset(buff, '\0', 32); + char * buff = malloc(256); + memset(buff, '\0', 256); strcpy(buff, "grep test test.txt\n"); int connections = clientConnect(buff, "testlog.txt"); @@ -325,7 +382,5 @@ int main() printf("Returned from func\n"); - printf("# of lines: %d lines\n", countFileLines("log.txt")); - return 0; } \ No newline at end of file diff --git a/logs/a.log b/logs/a.log new file mode 100644 index 0000000000000000000000000000000000000000..9f8b52106154bc06b5530d04e0e898e9fef547e6 Binary files /dev/null and b/logs/a.log differ diff --git a/logs/b.log b/logs/b.log new file mode 100644 index 0000000000000000000000000000000000000000..c6ece2f53d1a555fbdb3102bee1c725d3b77ba8e Binary files /dev/null and b/logs/b.log differ diff --git a/logs/c.log b/logs/c.log new file mode 100644 index 0000000000000000000000000000000000000000..9a6284276b77757730e4deb472e9883cfe9eeb0c Binary files /dev/null and b/logs/c.log differ diff --git a/logs/d.log b/logs/d.log new file mode 100644 index 0000000000000000000000000000000000000000..62cbe5be9ef81d64100b703fcd6f6139165ca6ca Binary files /dev/null and b/logs/d.log differ diff --git a/logs/e.log b/logs/e.log new file mode 100644 index 0000000000000000000000000000000000000000..04e9a64e26cef9a0579a12671251c9e083105a97 Binary files /dev/null and b/logs/e.log differ diff --git a/logs/f.log b/logs/f.log new file mode 100644 index 0000000000000000000000000000000000000000..4d72a43ad2393f2e244a0c4bada88ee766c1af82 Binary files /dev/null and b/logs/f.log differ diff --git a/logs/g.log b/logs/g.log new file mode 100644 index 0000000000000000000000000000000000000000..03e9970562eb42716d27dcd5c2e49f35a98808ca Binary files /dev/null and b/logs/g.log differ diff --git a/logs/h.log b/logs/h.log new file mode 100644 index 0000000000000000000000000000000000000000..a5f0d5d6cd0cd3f3da1b309ac2be5548449c4d28 Binary files /dev/null and b/logs/h.log differ diff --git a/logs/i.log b/logs/i.log new file mode 100644 index 0000000000000000000000000000000000000000..384bd73fa00784b2b2fbf162cee758e8ddf986c9 Binary files /dev/null and b/logs/i.log differ diff --git a/logs/j.log b/logs/j.log new file mode 100644 index 0000000000000000000000000000000000000000..50d1d02a43429194f9676f94d0be9447bbf5bfd2 Binary files /dev/null and b/logs/j.log differ diff --git a/server.c b/server.c index e56743228e976a34239ca31e7ed846bf54f89269..77c01d229377d39ab413d6c9146c01174e09ebe0 100644 --- a/server.c +++ b/server.c @@ -6,7 +6,8 @@ // // -//#define _XOPEN_SOURCE 700 +#define _XOPEN_SOURCE 700 +#define _BSD_SOURCE #include "server.h" #include <sys/types.h> @@ -51,9 +52,11 @@ fileList * fileLists; membershipList * list; unsigned int globalCount; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t fileLock = PTHREAD_MUTEX_INITIALIZER; char filename[56]; int leaveGroup; +int checkFiles; void printMembershipList(){ pthread_mutex_lock(&m); @@ -75,7 +78,7 @@ void buildMembershipList(unsigned int * buffer){ unsigned int numberOfServers = *buffer; unsigned int currentCount = list->numberOfServers; - printf("Current count = %d\n", currentCount); + //printf("Current count = %d\n", currentCount); unsigned int listSize = currentCount; buffer++; @@ -136,6 +139,7 @@ void buildMembershipList(unsigned int * buffer){ node = NULL; list->listInfo[j] = NULL; failedDetected = 1; + checkFiles = 1; currentCount--; @@ -212,6 +216,8 @@ void * buildMembershipListSerialized(){ } unsigned int * buffer = malloc(sizeof(unsigned int) + list->numberOfServers*(sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int))); + memset(buffer, '\0', sizeof(unsigned int) + list->numberOfServers*(sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int))); + void * ptr = buffer; *buffer = list->numberOfServers; buffer++; @@ -411,11 +417,6 @@ int createUDPSocketConnection(char * udpport){ return fd; } -void acceptMasterCommands(int fd){ - - -} - int isMemberOfGroup(int id) { @@ -469,7 +470,143 @@ int getSuccessor() ID--; } - return ID; + return ID; +} + +void duplicateFiles(){ + while(1){ + sleep(1); + + if(checkFiles){ + //pthread_mutex_lock(&fileLock); //Mutex lock - Critical Section + + for(int i = 0; i < fileLists->numberOfFiles; i++) + { + char * file = fileLists->files[i]; + int replies = 0; + + for(int j = 0; j < list->numberOfServers; j++) + { + char * buffer = malloc(16); //Adjust to IP + memset(buffer, '\0', 16); + + if(j < list->numberOfServers){ + sprintf(buffer, "172.22.150.%u", *list->listInfo[j]); + } + + int fd = connectToServer(buffer); + + + + char * input = malloc(256); + memset(input, '\0', 256); + sprintf(input, "list %s", file); + + write(fd, input, 256); + + char output[256]; + memset(output, '\0', 256); + + if(read(fd, output, 256) > 0) + { + unsigned int replyingServer = *(unsigned int *)output; + printf("Got reply from replyingServer = %u\n", replyingServer); + if(replyingServer == getSuccessor()) + { + printf("ReplyingServer == Successor\n"); + replies = 3; + checkFiles = 0; + close(fd); + break; + } + else + { + replies++; + } + memset(output, '\0', 256); + } + close(fd); + } + printf("We're out\n"); + if(replies < 3) + { + printf("Got less than 3 replies\n"); + + char forwardedOutput[256]; + char * temp = forwardedOutput; + memset(forwardedOutput, '\0', 256); + strcpy(temp, "put "); + temp = temp + 4; + strcpy(temp, "DNF "); + temp = temp + 4; + strcpy(temp, file); + + if(replies <= 2){ + int successorid = getSuccessor(); + char * successorip = malloc(16); + sprintf(successorip, "172.22.150.%u", successorid); + + int successorfd = connectToServer(successorip); + write(successorfd, forwardedOutput, 256); + + FILE * fpserver = fopen(file, "r"); + + char *output2 = malloc(256); + int count; + size_t length = 0; + printf("Forwarding files\n"); + while((count = getline(&output2, &length, fpserver)) > 0) + { + + write(successorfd, output2, count); + } + fclose(fpserver); + free(output2); + + close(successorfd); + free(successorip); + } + if(replies <= 1) + { + int predecessorid = getPredecessor(); + char * predecessorip = malloc(16); + sprintf(predecessorip, "172.22.150.%u", predecessorid); + + int predecessorfd = connectToServer(predecessorip); + write(predecessorfd, forwardedOutput, 256); + + FILE * fpserver = fopen(file, "r"); + + char *output2 = malloc(256); + int count; + size_t length = 0; + printf("Forwarding files\n"); + while((count = getline(&output2, &length, fpserver)) > 0) + { + + write(predecessorfd, output2, count); + } + fclose(fpserver); + free(output2); + + close(predecessorfd); + free(predecessorip); + + } + + + + + } + else + { + checkFiles = 0; + } + } + + //pthread_mutex_unlock(&fileLock); //Mutex unlock + } + } } void acceptTCPConnections(int fd){ @@ -478,12 +615,12 @@ void acceptTCPConnections(int fd){ while((clientfd = accept(fd, NULL, NULL)) != 0) { printf("Connection made on server!\n"); - char buffer[257]; + char buffer[256]; + memset(buffer, '\0', 256); int len = read(clientfd, buffer, 256); - buffer[len-1] = '\0'; - char buffercp[257]; - memcpy(buffercp, buffer, 257); + char buffercp[256]; + memcpy(buffercp, buffer, 256); printf("Message = \"%s\"\n", buffer); //Opens command in new process and pipes output to fp @@ -523,17 +660,31 @@ void acceptTCPConnections(int fd){ while(read(clientfd, output, 256) > 0) { - printf("Output to server is: %s\n", output); + //printf("Output to server is: %s\n", output); fprintf(fp, "%s", output); memset(output, '\0', 256); } + fclose(fp); - fileLists->files = realloc(fileLists->files, (++fileLists->numberOfFiles)*sizeof(char *)); - char * dfscpy = malloc(sizeof(dfs) + 1); + pthread_mutex_lock(&fileLock); //Mutex lock - Critical Section + + char ** templist = malloc(sizeof(char *)*(fileLists->numberOfFiles+1)); + for(int i = 0; i < fileLists->numberOfFiles; i++) + { + templist[i] = fileLists->files[i]; + } + + char * dfscpy = malloc(strlen(dfs) + 1); dfscpy[sizeof(dfs)] = '\0'; strcpy(dfscpy, dfs); - fileLists->files[fileLists->numberOfFiles -1] = dfscpy; + templist[fileLists->numberOfFiles] = dfscpy; + + free(fileLists->files); + fileLists->files = templist; + fileLists->numberOfFiles++; + + pthread_mutex_unlock(&fileLock); //Mutex unlock if(strstr(lfs, "DNF") == NULL) { int successorID = getSuccessor(); @@ -551,7 +702,7 @@ void acceptTCPConnections(int fd){ char forwardedOutput[256]; char * temp = forwardedOutput; memset(forwardedOutput, '\0', 256); - strcpy(temp, "get "); + strcpy(temp, "put "); temp = temp + 4; strcpy(temp, "DNF "); temp = temp + 4; @@ -563,15 +714,16 @@ void acceptTCPConnections(int fd){ write(arr[i], forwardedOutput, 256); - FILE * fpserver = fopen(lfs, "r"); + FILE * fpserver = fopen(dfs, "r"); char *output2 = malloc(256); int count; size_t length = 0; - + printf("Forwarding files\n"); while((count = getline(&output2, &length, fpserver)) > 0) { - write(fd, output2, count); + + write(arr[i], output2, count); } fclose(fpserver); free(output2); @@ -582,8 +734,8 @@ void acceptTCPConnections(int fd){ close(successorfd); close(predecessorfd); - fclose(fp); } + printf("Number of files on server = %d\n", fileLists->numberOfFiles); } else if(strstr(buffer, "get") != NULL) { @@ -614,19 +766,26 @@ void acceptTCPConnections(int fd){ fclose(fp); } + else if(strstr(buffer, "store") != NULL) { for (int i = 0; i < fileLists->numberOfFiles; i++) { - write(clientfd, fileLists->files[i], strlen(fileLists->files[i])+1); + char * character = malloc(1); + *character = ' '; + printf("i = %d w/ %s\n", i, fileLists->files[i]); + write(clientfd, fileLists->files[i], strlen(fileLists->files[i])); + write(clientfd, character, 1); + free(character); } } + else if(strstr(buffer, "list") != NULL) { char * pch; pch = strtok (buffer," "); - char * lfs = strtok(NULL," "); + char * searchedFile = strtok(NULL," "); unsigned int * tempint = malloc(sizeof(unsigned int)); *tempint = selfID; @@ -634,9 +793,10 @@ void acceptTCPConnections(int fd){ for (int i = 0; i < fileLists->numberOfFiles; i++) { - if(strcmp(lfs, fileLists->files[i]) == 0) + if(strstr(fileLists->files[i], searchedFile) != NULL) { write(clientfd, tempint, sizeof(unsigned int)); + printf("Wrote something out\n"); break; } } @@ -644,7 +804,45 @@ void acceptTCPConnections(int fd){ } else if(strstr(buffer, "delete") != NULL) { + char * pch; + pch = strtok (buffer," "); + char * deletedFile = strtok(NULL," "); + + int didNotFindFile = 1; + + pthread_mutex_lock(&fileLock); + for (int i = 0; i < fileLists->numberOfFiles; i++) + { + + if(strstr(fileLists->files[i],deletedFile) != NULL) + { + free(fileLists->files[i]); + fileLists->files[i] = NULL; + didNotFindFile = 0; + break; + } + } + + if(!didNotFindFile){ + char ** tempList = malloc((fileLists->numberOfFiles - 1)*sizeof(char *)); + int x = 0; + for(int i = 0 ; i < fileLists->numberOfFiles; i++) + { + if(fileLists->files[i] != NULL) + { + tempList[x++] = fileLists->files[i]; + } + } + free(fileLists->files); + fileLists->files = tempList; + fileLists->numberOfFiles--; + } + else + { + printf("Was not able to delete file! Not found!\n"); + } + pthread_mutex_unlock(&fileLock); //Mutex unlock } close(clientfd); @@ -666,7 +864,7 @@ void acceptUDPConnections(int udpfd){ if((received = recvfrom(udpfd, buf, 2048, MSG_DONTWAIT, (struct sockaddr *)&remaddr, &addrlen)) > 0) { buf[received] = '\0'; - printf("Received message\n"); + //printf("Received message\n"); unsigned int * buffer = (unsigned int *)buf; buildMembershipList(buffer); @@ -731,8 +929,8 @@ void sendUDPMessages(const char * port){ for(int i = 0; i < list->numberOfServers; i++) { - char * buffer = malloc(5); //Adjust to IP - memset(buffer, '\0', 5); + char * buffer = malloc(15); //Adjust to IP + memset(buffer, '\0', 15); void * ptr = buildMembershipListSerialized(); if(ptr == NULL) @@ -741,7 +939,7 @@ void sendUDPMessages(const char * port){ pthread_mutex_lock(&m); if(i < list->numberOfServers){ sprintf(buffer, "172.22.150.%u", *list->listInfo[i]); - printf("Sending membership list\n"); + //printf("Sending membership list\n"); int udpClientFd = UDPConnect(buffer, "1030"); @@ -869,8 +1067,8 @@ void * sendUDPMessagesPT(void * ptr) return NULL; } -void * acceptMasterCommandsPT(void * ptr){ - acceptMasterCommands(*(int *)ptr); +void * duplicateFilesPT(void * ptr){ + duplicateFiles(); return NULL; } @@ -968,6 +1166,7 @@ int main(int argc, char ** argv) globalCount = 0; leaveGroup = 0; + checkFiles = 0; int status, leader = 0, result; pthread_t id0, id1, id2, id3, id4, id5; @@ -985,11 +1184,14 @@ int main(int argc, char ** argv) //Initialize fileLists fileLists = malloc(sizeof(fileList)); + fileLists->files = NULL; fileLists->numberOfFiles = 0; - char * idgrab = malloc(3); + char * idgrab = malloc(4); + char * throwaway; + memcpy(idgrab, ip + (strlen(ip) -3), 3); - unsigned int id = strtoul(idgrab, NULL, 10); //Change to IP //Local + unsigned int id = strtoul(idgrab, &throwaway, 10); //Change to IP //Local free(idgrab); //Get ID based on IP @@ -1029,7 +1231,7 @@ int main(int argc, char ** argv) int tcpfd = createTCPSocketConnection("1024"); int udpfd = createUDPSocketConnection("1030"); - int mastertcpfd = createTCPSocketConnection("2000"); + //int mastertcpfd = createTCPSocketConnection("2000"); int * tcp = malloc(sizeof(int)); @@ -1043,7 +1245,7 @@ int main(int argc, char ** argv) pthread_create(&id2, NULL, acceptUDPConnectionsPT, (void *)udp); pthread_create(&id3, NULL, sendUDPMessagesPT, (void *)udpsend); pthread_create(&id4, NULL, acceptUserCommandsPT, NULL); - pthread_create(&id5, NULL, acceptMasterCommandsPT, NULL); + pthread_create(&id5, NULL, duplicateFilesPT, NULL); void * ptr; pthread_join(id0, &ptr); pthread_join(id1, &ptr);