index.js 14.22 KiB
var express = require("express");
var http = require("http");
var path = require("path");
var fs = require("fs");
var request = require("request");
const process = require("process");
var bodyParser = require("body-parser");
const dgram = require("dgram");
const fetch = require("node-fetch");
const resolver = require("./topology");
var app = express();
app.use(bodyParser.json()); // support json encoded bodies
app.use(bodyParser.urlencoded({ extended: true })); // support encoded bodies
var jsonParser = bodyParser.json();
const machineToIps = {
"0": "192.17.151.88",
"1": "172.22.156.15",
"2": "172.22.158.15",
"3": "172.22.154.16",
"4": "172.22.156.16",
"5": "172.22.158.16",
"6": "172.22.154.17",
"8": "172.22.158.17",
"9": "172.22.154.18",
"10": "172.22.156.18"
};
let master = 1;
let files = {};
let clients = [1];
let versions = {};
// Helper functions
function uploadToVM(VMNumber, localFileName, fileName, upload_status) {
console.log("Sending file to " + VMNumber);
var target =
"http://" +
machineToIps[String(VMNumber)] +
":3000/" +
upload_status +
"/" +
fileName;
// console.log(localFileName)
var rs = fs.createReadStream(localFileName);
var ws = request.post(target);
ws.on("drain", function() {
rs.resume();
});
ws.on("error", function(err) {
console.error("Cannot send file to VM " + String(VMNumber) + ": " + err);
for (let x = 0; x < clients.length; x++) {
if (!files[fileName].includes(clients[x])) {
files[fileName] = files[fileName].filter(e => e !== VMNumber);
files[fileName].push(clients[x]);
uploadToVM(clients[x], localFileName, fileName, "reupload");
break;
}
}
});
rs.pipe(ws);
}
function update() {
clients_with_timestamps = {};
clients.forEach(client => {
if (client != undefined) {
clients_with_timestamps[client] = new Date().getTime();
}
});
master = Math.min(...clients);
clients
.filter(e => e !== master)
.forEach(client => {
var postData = {
data: "update",
files,
clients,
versions
};
let url = "http://" + machineToIps[String(client)] + ":3000/update/";
var options = {
method: "POST",
body: postData,
json: true,
url: url
};
request(options, function optionalCallback(err, httpResponse, body) {});
});
}
// API endpoints
app.post("/reupload/:filename", function(req, res) {
console.log("Writting the file to VM " + process.env.VM);
var filename = path.basename(req.params.filename);
filename = path.resolve(__dirname, "files/" + filename);
var dst = fs.createWriteStream(filename);
req.pipe(dst);
dst.on("drain", function() {
req.resume();
});
});
app.post("/upload/:filename", function(req, res) {
console.log("\nPutting " + req.params.filename);
if (String(master) === String(process.env.VM)) {
var filename = path.basename(req.params.filename);
var fname = String(filename);
if (fname in versions) {
var count = versions[fname].length + 1;
var name = String(fname) + String(count);
versions[fname].push(name);
} else {
versions[fname] = [];
var name = String(fname) + "1";
versions[fname].push(name);
}
const numberOfFiles = Object.keys(files).length;
const numberOfClients = clients.length;
files[name] = [
clients[(numberOfFiles + 0) % numberOfClients],
clients[(numberOfFiles + 1) % numberOfClients],
clients[(numberOfFiles + 2) % numberOfClients],
clients[(numberOfFiles + 3) % numberOfClients]
];
files[name] = [...new Set(files[name])];
update();
if (files[name].indexOf(Number(process.env.VM)) >= 0) {
console.log("Writting the file to VM " + process.env.VM);
}
console.log(files);
filename = path.resolve(__dirname, "files/" + name);
var dst = fs.createWriteStream(filename);
req.pipe(dst);
dst.on("drain", function() {
req.resume();
});
req.on("close", function() {
for (let k = 0; k < files[name].length; k++) {
if (String(files[name][k]) !== String(process.env.VM)) {
uploadToVM(files[name][k], filename, name, "upload");
}
}
res.sendStatus(200);
});
} else {
console.log("Writting the file to VM " + process.env.VM);
var filename = path.basename(req.params.filename);
filename = path.resolve(__dirname, "files/" + filename);
var dst = fs.createWriteStream(filename);
req.pipe(dst);
dst.on("drain", function() {
req.resume();
});
}
});
app.post("/get/:sdfsfilename/:localfilename", function(req, res) {
console.log("\nGetting " + req.params.sdfsfilename);
var list = versions[String(req.params.sdfsfilename)];
var latestVersion = String(list[list.length - 1]);
console.log(latestVersion);
if (String(master) === String(process.env.VM)) {
if (
files[latestVersion].indexOf(Number(process.env.VM)) >= 0 ||
files[latestVersion].length == 0
) {
var target =
"http://" +
"172.16.244.74" +
":3000/upload/" +
req.params.localfilename;
if (!fs.existsSync(process.cwd() + "/files/" + latestVersion)) {
res.status(404).send("File does not exist");
} else {
console.log("Fetching from VM " + String(process.env.VM));
var rs = fs.createReadStream("files/" + latestVersion);
var ws = request.post(target);
ws.on("drain", function() {
rs.resume();
});
rs.on("end", function() {
console.log("Downloaded to user");
res.sendStatus(200);
});
ws.on("error", function(err) {
console.error("Cannot send file to " + target + ": " + err);
});
rs.pipe(ws);
}
} else {
console.log("Fetching from VM " + String(files[latestVersion][0]));
var target =
"http://" +
machineToIps[String(files[latestVersion][0])] +
":3000/get/" +
req.params.sdfsfilename +
"/" +
req.params.localfilename;
var ws = request.post(target);
}
} else {
var target = "http://172.16.244.74:3000/upload/" + req.params.localfilename;
if (!fs.existsSync(process.cwd() + "/files/" + latestVersion)) {
res.status(404).send("File does not exist");
} else {
var rs = fs.createReadStream("files/" + latestVersion);
var ws = request.post(target);
ws.on("drain", function() {
rs.resume();
});
rs.on("end", function() {
console.log("Downloaded to user");
res.sendStatus(200);
});
ws.on("error", function(err) {
console.error("Cannot send file to " + target + ": " + err);
});
rs.pipe(ws);
}
}
});
app.post("/delete/:sdfsfilename", function(req, res) {
console.log("\nDeleting " + req.params.sdfsfilename);
for (
var i = 0, size = versions[String(req.params.sdfsfilename)].length;
i < size + 1;
i++
) {
var item = versions[String(req.params.sdfsfilename)][i];
delete files[item];
}
delete versions[String(req.params.sdfsfilename)];
console.log(files);
});
app.post("/ls/:sdfsfilename", function(req, res) {
console.log('The file "' + req.params.sdfsfilename + '" is on VMs: ');
var arr = [];
if (String(req.params.sdfsfilename) in versions) {
for (var i = 0; i < versions[String(req.params.sdfsfilename)].length; i++) {
var item = versions[String(req.params.sdfsfilename)][i];
const fileWithVms = "\n" + item + ": " + files[item].join(", ");
arr.push(fileWithVms);
}
}
arr = [...new Set(arr)];
res.send(arr.join(", "));
});
app.post("/update/", jsonParser, function(req, res) {
if (req.body.data === "master") {
clients
.filter(e => e !== master)
.forEach(client => {
console.log(client);
let url = "http://" + machineToIps[String(client)] + ":3000/update/";
console.log(url);
var postData = {
data: "update",
files,
clients,
versions
};
var options = {
method: "POST",
body: postData,
json: true,
url: url
};
console.log(options);
request(options, function optionalCallback(err, httpResponse, body) {
if (err) {
return console.error("upload failed:", err);
}
console.log("Updated " + client);
});
});
} else {
files = req.body.files;
versions = req.body.versions;
if (clients != req.body.clients) {
console.log("Updating clients " + req.body.clients);
clients = req.body.clients;
clients_with_timestamps = {};
clients.forEach(client => {
clients_with_timestamps[client] = new Date().getTime();
});
}
}
});
app.post("/store/:VMNumber", function(req, res) {
filesOnVM = [];
Object.keys(files).forEach(file => {
if (files[file].includes(Number(req.params.VMNumber))) {
filesOnVM.push(file);
}
});
console.log(
"\nThe files on VM " +
req.params.VMNumber +
" include " +
filesOnVM.join(", ")
);
res.send(filesOnVM.join(", "));
});
app.post("/getVersions/:sdfsfilename/:numversions", function(req, res) {
let count = Number(req.params.numversions);
res.send(
versions[String(req.params.sdfsfilename)]
.reverse()
.slice(0, count)
.join(", ")
);
});
app.post("/join/:number", function(req, res) {
clients.push(Number(req.params.number));
clients = [...new Set(clients)];
console.log("Current set of clients " + clients.join(", "));
update();
res.send(clients.join(", "));
});
app.post("/leave/:number", function(req, res) {
clients = clients.filter(e => e !== Number(req.params.number));
update();
Object.keys(files).forEach(file => {
for (let k = 0; k < clients.length; k++) {
console.log(clients[k]);
if (files[file].indexOf(clients[k]) < 0) {
console.log("Rereplicating to VM " + clients[k]);
files[file].push(clients[k]);
files[file] = files[file].filter(f => f !== Number(req.params.number));
uploadToVM(
clients[k],
process.cwd() + "/files/" + file,
file,
"upload"
);
break;
}
}
});
update();
res.send(clients.join(", "));
});
app.post("/list/:number", function(req, res) {
res.send(clients.join(", "));
});
app.get("/master", function(req, res) {
res.send("Alive");
});
queue = [];
results = [];
app.post("/bolt", function(req, res) {
console.log("bolt state: " +req.body.state)
res.send(resolver(req.body));
});
app.post("/spout", function(req, res) {
queue.push(req.body)
res.send("got it");
});
app.post("/sink", function(req, res) {
// console.log("bolt state: " +req.body.state)
res.send("IM A SINK");
});
round_robin = 0
setInterval(function sync() {
if (queue.length > 0) {
const body = queue.pop();
let url = "http://" + machineToIps[clients[round_robin % clients.length] ] + ":3000/bolt/";
round_robin += 1;
fetch(url, {
method: "post",
body: JSON.stringify(body),
headers: { "Content-Type": "application/json" }
})
.then(res => res.json())
.then(json => {
if (json.action !== "END") {
queue.push(json);
} else {
results.push(json.state)
console.log(json.state);
}
});
}
}, 500);
http.createServer(app).listen(3000, machineToIps[process.env.VM], function() {
console.log("Express server listening on port 3000");
});
// SWIM protocol to catch failures
const socket = dgram.createSocket({ type: "udp4", reuseAddr: true });
socket.bind(20000);
clients_with_timestamps = {};
socket.on("listening", function() {
const address = socket.address();
console.log(
`VM ${process.env.VM} UDP socket listening on ${
machineToIps[String(process.env.VM)]
}:${address.port} pid: ${process.pid}`
);
});
function sendMessage(host, message) {
socket.send(message, 0, message.length, 20000, host, function() {});
}
socket.on("message", function(message, rinfo) {
if (message.toString().startsWith("ack")) {
clients_with_timestamps[
message.toString().substr(4)
] = new Date().getTime();
} else if (message.toString().startsWith("syn")) {
sendMessage(
machineToIps[message.toString().substr(4)],
Buffer.from(`ack ` + process.env.VM)
);
}
});
setInterval(function sync() {
const startIndex = process.env.VM;
let VMsToPing = [
clients[Number((startIndex + 0) % clients.length)],
clients[Number((startIndex + 1) % clients.length)],
clients[Number((startIndex + 2) % clients.length)],
clients[Number((startIndex + 3) % clients.length)]
];
VMsToPing = [...new Set(VMsToPing)];
VMsToPing = VMsToPing.filter(e => e !== process.env.VM);
VMsToPing.forEach(vm => {
sendMessage(machineToIps[vm], Buffer.from(`syn ` + process.env.VM));
});
}, 500);
setInterval(function sync() {
const startIndex = process.env.VM;
let VMsToPing = [
clients[Number((startIndex + 0) % clients.length)],
clients[Number((startIndex + 1) % clients.length)],
clients[Number((startIndex + 2) % clients.length)],
clients[Number((startIndex + 3) % clients.length)]
];
VMsToPing = [...new Set(VMsToPing)];
VMsToPing = VMsToPing.filter(e => e !== process.env.VM);
console.log("pinging " + VMsToPing);
VMsToPing.forEach(client => {
if (
clients_with_timestamps[client] !== 0 &&
clients_with_timestamps[client] !== -1 &&
new Date().getTime() - clients_with_timestamps[client] > 5000 &&
client != process.env.VM
) {
console.log(client + " is dead");
clients = clients.filter(e => e !== Number(client));
console.log("Clients left alive include " + clients.join(", "));
update();
Object.keys(files).forEach(file => {
for (let k = 0; k < clients.length; k++) {
if (files[file].indexOf(clients[k]) < 0) {
console.log("Rereplicating to VM " + clients[k]);
files[file].push(clients[k]);
files[file] = files[file].filter(f => f !== Number(client));
uploadToVM(
clients[k],
"files/" + file,
"files/" + file,
"reupload"
);
update();
break;
}
}
});
}
});
}, 5000);