Skip to content
Snippets Groups Projects
index.js 14.22 KiB
var express = require("express");
var http = require("http");
var path = require("path");
var fs = require("fs");
var request = require("request");
const process = require("process");
var bodyParser = require("body-parser");
const dgram = require("dgram");
const fetch = require("node-fetch");

const resolver = require("./topology");


var app = express();
app.use(bodyParser.json()); // support json encoded bodies
app.use(bodyParser.urlencoded({ extended: true })); // support encoded bodies

var jsonParser = bodyParser.json();

const machineToIps = {
  "0": "192.17.151.88",
  "1": "172.22.156.15",
  "2": "172.22.158.15",
  "3": "172.22.154.16",
  "4": "172.22.156.16",
  "5": "172.22.158.16",
  "6": "172.22.154.17",
  "8": "172.22.158.17",
  "9": "172.22.154.18",
  "10": "172.22.156.18"
};

let master = 1;

let files = {};

let clients = [1];

let versions = {};

// Helper functions

function uploadToVM(VMNumber, localFileName, fileName, upload_status) {
  console.log("Sending file to " + VMNumber);
  var target =
    "http://" +
    machineToIps[String(VMNumber)] +
    ":3000/" +
    upload_status +
    "/" +
    fileName;
  // console.log(localFileName)
  var rs = fs.createReadStream(localFileName);
  var ws = request.post(target);

  ws.on("drain", function() {
    rs.resume();
  });

  ws.on("error", function(err) {
    console.error("Cannot send file to VM " + String(VMNumber) + ": " + err);
    for (let x = 0; x < clients.length; x++) {
      if (!files[fileName].includes(clients[x])) {
        files[fileName] = files[fileName].filter(e => e !== VMNumber);
        files[fileName].push(clients[x]);
        uploadToVM(clients[x], localFileName, fileName, "reupload");
        break;
      }
    }
  });

  rs.pipe(ws);
}

function update() {
  clients_with_timestamps = {};
  clients.forEach(client => {
    if (client != undefined) {
      clients_with_timestamps[client] = new Date().getTime();
    }
  });
  master = Math.min(...clients);
  clients
    .filter(e => e !== master)
    .forEach(client => {
      var postData = {
        data: "update",
        files,
        clients,
        versions
      };
      let url = "http://" + machineToIps[String(client)] + ":3000/update/";
      var options = {
        method: "POST",
        body: postData,
        json: true,
        url: url
      };
      request(options, function optionalCallback(err, httpResponse, body) {});
    });
}

// API endpoints

app.post("/reupload/:filename", function(req, res) {
  console.log("Writting the file to VM " + process.env.VM);
  var filename = path.basename(req.params.filename);
  filename = path.resolve(__dirname, "files/" + filename);
  var dst = fs.createWriteStream(filename);
  req.pipe(dst);
  dst.on("drain", function() {
    req.resume();
  });
});

app.post("/upload/:filename", function(req, res) {
  console.log("\nPutting " + req.params.filename);
  if (String(master) === String(process.env.VM)) {
    var filename = path.basename(req.params.filename);
    var fname = String(filename);

    if (fname in versions) {
      var count = versions[fname].length + 1;
      var name = String(fname) + String(count);
      versions[fname].push(name);
    } else {
      versions[fname] = [];
      var name = String(fname) + "1";
      versions[fname].push(name);
    }

    const numberOfFiles = Object.keys(files).length;
    const numberOfClients = clients.length;

    files[name] = [
      clients[(numberOfFiles + 0) % numberOfClients],
      clients[(numberOfFiles + 1) % numberOfClients],
      clients[(numberOfFiles + 2) % numberOfClients],
      clients[(numberOfFiles + 3) % numberOfClients]
    ];

    files[name] = [...new Set(files[name])];

    update();

    if (files[name].indexOf(Number(process.env.VM)) >= 0) {
      console.log("Writting the file to VM " + process.env.VM);
    }

    console.log(files);

    filename = path.resolve(__dirname, "files/" + name);

    var dst = fs.createWriteStream(filename);
    req.pipe(dst);
    dst.on("drain", function() {
      req.resume();
    });
    req.on("close", function() {
      for (let k = 0; k < files[name].length; k++) {
        if (String(files[name][k]) !== String(process.env.VM)) {
          uploadToVM(files[name][k], filename, name, "upload");
        }
      }
      res.sendStatus(200);
    });
  } else {
    console.log("Writting the file to VM " + process.env.VM);
    var filename = path.basename(req.params.filename);
    filename = path.resolve(__dirname, "files/" + filename);
    var dst = fs.createWriteStream(filename);
    req.pipe(dst);
    dst.on("drain", function() {
      req.resume();
    });
  }
});

app.post("/get/:sdfsfilename/:localfilename", function(req, res) {
  console.log("\nGetting " + req.params.sdfsfilename);
  var list = versions[String(req.params.sdfsfilename)];
  var latestVersion = String(list[list.length - 1]);
  console.log(latestVersion);

  if (String(master) === String(process.env.VM)) {
    if (
      files[latestVersion].indexOf(Number(process.env.VM)) >= 0 ||
      files[latestVersion].length == 0
    ) {
      var target =
        "http://" +
        "172.16.244.74" +
        ":3000/upload/" +
        req.params.localfilename;

      if (!fs.existsSync(process.cwd() + "/files/" + latestVersion)) {
        res.status(404).send("File does not exist");
      } else {
        console.log("Fetching from VM " + String(process.env.VM));
        var rs = fs.createReadStream("files/" + latestVersion);
        var ws = request.post(target);

        ws.on("drain", function() {
          rs.resume();
        });

        rs.on("end", function() {
          console.log("Downloaded to user");
          res.sendStatus(200);
        });

        ws.on("error", function(err) {
          console.error("Cannot send file to " + target + ": " + err);
        });

        rs.pipe(ws);
      }
    } else {
      console.log("Fetching from VM " + String(files[latestVersion][0]));

      var target =
        "http://" +
        machineToIps[String(files[latestVersion][0])] +
        ":3000/get/" +
        req.params.sdfsfilename +
        "/" +
        req.params.localfilename;
      var ws = request.post(target);
    }
  } else {
    var target = "http://172.16.244.74:3000/upload/" + req.params.localfilename;

    if (!fs.existsSync(process.cwd() + "/files/" + latestVersion)) {
      res.status(404).send("File does not exist");
    } else {
      var rs = fs.createReadStream("files/" + latestVersion);
      var ws = request.post(target);

      ws.on("drain", function() {
        rs.resume();
      });

      rs.on("end", function() {
        console.log("Downloaded to user");
        res.sendStatus(200);
      });

      ws.on("error", function(err) {
        console.error("Cannot send file to " + target + ": " + err);
      });

      rs.pipe(ws);
    }
  }
});

app.post("/delete/:sdfsfilename", function(req, res) {
  console.log("\nDeleting " + req.params.sdfsfilename);
  for (
    var i = 0, size = versions[String(req.params.sdfsfilename)].length;
    i < size + 1;
    i++
  ) {
    var item = versions[String(req.params.sdfsfilename)][i];
    delete files[item];
  }
  delete versions[String(req.params.sdfsfilename)];
  console.log(files);
});

app.post("/ls/:sdfsfilename", function(req, res) {
  console.log('The file "' + req.params.sdfsfilename + '" is on VMs: ');
  var arr = [];
  if (String(req.params.sdfsfilename) in versions) {
    for (var i = 0; i < versions[String(req.params.sdfsfilename)].length; i++) {
      var item = versions[String(req.params.sdfsfilename)][i];
      const fileWithVms = "\n" + item + ": " + files[item].join(", ");
      arr.push(fileWithVms);
    }
  }
  arr = [...new Set(arr)];
  res.send(arr.join(", "));
});

app.post("/update/", jsonParser, function(req, res) {
  if (req.body.data === "master") {
    clients
      .filter(e => e !== master)
      .forEach(client => {
        console.log(client);
        let url = "http://" + machineToIps[String(client)] + ":3000/update/";
        console.log(url);

        var postData = {
          data: "update",
          files,
          clients,
          versions
        };

        var options = {
          method: "POST",
          body: postData,
          json: true,
          url: url
        };
        console.log(options);
        request(options, function optionalCallback(err, httpResponse, body) {
          if (err) {
            return console.error("upload failed:", err);
          }
          console.log("Updated " + client);
        });
      });
  } else {
    files = req.body.files;
    versions = req.body.versions;
    if (clients != req.body.clients) {
      console.log("Updating clients " + req.body.clients);
      clients = req.body.clients;
      clients_with_timestamps = {};
      clients.forEach(client => {
        clients_with_timestamps[client] = new Date().getTime();
      });
    }
  }
});

app.post("/store/:VMNumber", function(req, res) {
  filesOnVM = [];
  Object.keys(files).forEach(file => {
    if (files[file].includes(Number(req.params.VMNumber))) {
      filesOnVM.push(file);
    }
  });

  console.log(
    "\nThe files on VM  " +
      req.params.VMNumber +
      " include " +
      filesOnVM.join(", ")
  );
  res.send(filesOnVM.join(", "));
});

app.post("/getVersions/:sdfsfilename/:numversions", function(req, res) {
  let count = Number(req.params.numversions);
  res.send(
    versions[String(req.params.sdfsfilename)]
      .reverse()
      .slice(0, count)
      .join(", ")
  );
});

app.post("/join/:number", function(req, res) {
  clients.push(Number(req.params.number));
  clients = [...new Set(clients)];
  console.log("Current set of clients " + clients.join(", "));
  update();
  res.send(clients.join(", "));
});

app.post("/leave/:number", function(req, res) {
  clients = clients.filter(e => e !== Number(req.params.number));
  update();

  Object.keys(files).forEach(file => {
    for (let k = 0; k < clients.length; k++) {
      console.log(clients[k]);
      if (files[file].indexOf(clients[k]) < 0) {
        console.log("Rereplicating to VM " + clients[k]);
        files[file].push(clients[k]);
        files[file] = files[file].filter(f => f !== Number(req.params.number));
        uploadToVM(
          clients[k],
          process.cwd() + "/files/" + file,
          file,
          "upload"
        );
        break;
      }
    }
  });
  update();

  res.send(clients.join(", "));
});

app.post("/list/:number", function(req, res) {
  res.send(clients.join(", "));
});

app.get("/master", function(req, res) {
  res.send("Alive");
});

queue = [];
results = [];

app.post("/bolt", function(req, res) {
  console.log("bolt state: " +req.body.state)
  res.send(resolver(req.body));
});

app.post("/spout", function(req, res) {
  queue.push(req.body)
  res.send("got it");
});

app.post("/sink", function(req, res) {
  // console.log("bolt state: " +req.body.state)
  res.send("IM A SINK");
});


round_robin = 0

setInterval(function sync() {
  if (queue.length > 0) {
    const body = queue.pop();
    let url = "http://" + machineToIps[clients[round_robin % clients.length] ] + ":3000/bolt/";
    round_robin += 1;
    fetch(url, {
      method: "post",
      body: JSON.stringify(body),
      headers: { "Content-Type": "application/json" }
    })
      .then(res => res.json())
      .then(json => {
        if (json.action !== "END") {
          queue.push(json);
        } else {
          results.push(json.state)
          console.log(json.state);
        }
      });
  }

}, 500);

http.createServer(app).listen(3000, machineToIps[process.env.VM], function() {
  console.log("Express server listening on port 3000");
});

// SWIM protocol to catch failures

const socket = dgram.createSocket({ type: "udp4", reuseAddr: true });
socket.bind(20000);

clients_with_timestamps = {};

socket.on("listening", function() {
  const address = socket.address();
  console.log(
    `VM ${process.env.VM} UDP socket listening on ${
      machineToIps[String(process.env.VM)]
    }:${address.port} pid: ${process.pid}`
  );
});

function sendMessage(host, message) {
  socket.send(message, 0, message.length, 20000, host, function() {});
}

socket.on("message", function(message, rinfo) {
  if (message.toString().startsWith("ack")) {
    clients_with_timestamps[
      message.toString().substr(4)
    ] = new Date().getTime();
  } else if (message.toString().startsWith("syn")) {
    sendMessage(
      machineToIps[message.toString().substr(4)],
      Buffer.from(`ack ` + process.env.VM)
    );
  }
});

setInterval(function sync() {
  const startIndex = process.env.VM;
  let VMsToPing = [
    clients[Number((startIndex + 0) % clients.length)],
    clients[Number((startIndex + 1) % clients.length)],
    clients[Number((startIndex + 2) % clients.length)],
    clients[Number((startIndex + 3) % clients.length)]
  ];
  VMsToPing = [...new Set(VMsToPing)];
  VMsToPing = VMsToPing.filter(e => e !== process.env.VM);
  VMsToPing.forEach(vm => {
    sendMessage(machineToIps[vm], Buffer.from(`syn ` + process.env.VM));
  });
}, 500);

setInterval(function sync() {
  const startIndex = process.env.VM;
  let VMsToPing = [
    clients[Number((startIndex + 0) % clients.length)],
    clients[Number((startIndex + 1) % clients.length)],
    clients[Number((startIndex + 2) % clients.length)],
    clients[Number((startIndex + 3) % clients.length)]
  ];
  VMsToPing = [...new Set(VMsToPing)];
  VMsToPing = VMsToPing.filter(e => e !== process.env.VM);
  console.log("pinging " + VMsToPing);

  VMsToPing.forEach(client => {
    if (
      clients_with_timestamps[client] !== 0 &&
      clients_with_timestamps[client] !== -1 &&
      new Date().getTime() - clients_with_timestamps[client] > 5000 &&
      client != process.env.VM
    ) {
      console.log(client + " is dead");
      clients = clients.filter(e => e !== Number(client));
      console.log("Clients left alive include " + clients.join(", "));
      update();
      Object.keys(files).forEach(file => {
        for (let k = 0; k < clients.length; k++) {
          if (files[file].indexOf(clients[k]) < 0) {
            console.log("Rereplicating to VM " + clients[k]);
            files[file].push(clients[k]);
            files[file] = files[file].filter(f => f !== Number(client));
            uploadToVM(
              clients[k],
              "files/" + file,
              "files/" + file,
              "reupload"
            );
            update();
            break;
          }
        }
      });
    }
  });
}, 5000);