Skip to content
Snippets Groups Projects
Commit 51821176 authored by arshiam2's avatar arshiam2
Browse files

update

parent 516307e5
No related branches found
No related tags found
No related merge requests found
No preview for this file type
function count(oldState, newState) {
if (oldState != undefined) {
return oldState + 1
}
return 1
}
module.exports = count;
\ No newline at end of file
...@@ -7,8 +7,20 @@ const process = require("process"); ...@@ -7,8 +7,20 @@ const process = require("process");
var bodyParser = require("body-parser"); var bodyParser = require("body-parser");
const dgram = require("dgram"); const dgram = require("dgram");
const fetch = require("node-fetch"); const fetch = require("node-fetch");
const uuidv4 = require("uuid/v4");
const resolver = require("./topology"); const topology1 = require("./topology");
const topology2 = require("./topology2");
const topology3 = require("./topology3");
const count = require("./aggregators");
// var WebSocket = require("ws");
// ws = new WebSocket("ws://172.16.170.86:8080");
const topologies = [topology1, topology2, topology3];
let resolver = topologies[0];
var app = express(); var app = express();
app.use(bodyParser.json()); // support json encoded bodies app.use(bodyParser.json()); // support json encoded bodies
...@@ -396,33 +408,56 @@ app.get("/master", function(req, res) { ...@@ -396,33 +408,56 @@ app.get("/master", function(req, res) {
queue = []; queue = [];
results = []; results = [];
streams = {};
join = {};
app.post("/bolt", function(req, res) { app.post("/bolt", function(req, res) {
console.log("bolt state: " + req.body.state); console.log("bolt state: " + req.body.action + " " + req.body.state);
res.send(resolver(req.body)); res.send({ data: resolver(req.body) });
}); });
app.post("/spout/:name", function(req, res) { app.post("/spout", function(req, res) {
console.log("name " + argv.name)
console.log("spout got: " + req.body.state); console.log("spout got: " + req.body.state);
console.log(req.body);
queue.push(req.body); queue.push(req.body);
res.send("got it"); res.send("got it");
}); });
app.post("/setTopology/:state", function(req, res) {
// console.log("spout got: " + req.body.state);
// queue.push(req.body);
console.log(req.params.state % topologies.length);
console.log(topologies[req.params.state % topologies.length]);
// topology = topologies[req.params.state % topologies.length]
resolver = topologies[req.params.state % topologies.length];
res.send("Set to " + req.params.state);
});
app.get("/sink", function(req, res) { app.get("/sink", function(req, res) {
if (Object.keys(join).length > 0) {
queue = [];
results = Object.keys(join).map(key => [key, join[key]]);
join = {};
}
if (results.length > 0) { if (results.length > 0) {
res.send(results.pop()); res.send(results.pop());
} else { } else {
res.send("DONE"); res.send("EMPTY");
} }
}); });
app.post("/getSecond", function(req, res) { app.post("/getSecond", function(req, res) {
res.send(Math.min(...clients.filter(e => String(e) !== String(process.env.VM)))); res.send(
Math.min(...clients.filter(e => String(e) !== String(process.env.VM)))
);
}); });
round_robin = 0; round_robin = 0;
running_commands = {};
setInterval(function sync() { setInterval(function sync() {
if (queue.length > 0) { if (queue.length > 0) {
const body = queue.pop(); const body = queue.pop();
...@@ -431,6 +466,16 @@ setInterval(function sync() { ...@@ -431,6 +466,16 @@ setInterval(function sync() {
machineToIps[clients[round_robin % clients.length]] + machineToIps[clients[round_robin % clients.length]] +
":3000/bolt/"; ":3000/bolt/";
round_robin += 1; round_robin += 1;
let job = uuidv4();
console.log({ job: body });
running_commands[job] = body;
let done = false
setInterval(function sync() {
if (!done) {
queue.push(body)
}
// console.log(Object.keys(running_commands).length);
}, 500);
fetch(url, { fetch(url, {
method: "post", method: "post",
body: JSON.stringify(body), body: JSON.stringify(body),
...@@ -438,15 +483,31 @@ setInterval(function sync() { ...@@ -438,15 +483,31 @@ setInterval(function sync() {
}) })
.then(res => res.json()) .then(res => res.json())
.then(json => { .then(json => {
if (json.action !== "END") { // delete running_commands[job];
queue.push(json); done = true
} else { json["data"].forEach(x => {
results.push(json.state); if (x.action == "REDUCE_BY_KEY") {
console.log("result: " + json.state); console.log(x);
} if ((x.aggregator = "count")) {
if (join[x.state] == undefined) {
join[x.state] = count(undefined, x.state);
} else {
join[x.state] = count(join[x.state], x.state);
}
}
} else if (x.action == "END") {
console.log(x);
// ws.onopen = function(evt) {
// ws.send(x);
// };
results.push(x);
} else {
queue.push(x);
}
});
}); });
} }
}, 250); }, 10);
http.createServer(app).listen(3000, machineToIps[process.env.VM], function() { http.createServer(app).listen(3000, machineToIps[process.env.VM], function() {
console.log("Express server listening on port 3000"); console.log("Express server listening on port 3000");
......
...@@ -209,6 +209,11 @@ ...@@ -209,6 +209,11 @@
"resolved": "https://registry.npmjs.org/statuses/-/statuses-1.4.0.tgz", "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.4.0.tgz",
"integrity": "sha512-zhSCtt8v2NDrRlPQpCNtw/heZLtfUDqxBM1udqikb/Hbk52LK4nQSwr10u77iopCW5LsyHpuXS0GnEc48mLeew==" "integrity": "sha512-zhSCtt8v2NDrRlPQpCNtw/heZLtfUDqxBM1udqikb/Hbk52LK4nQSwr10u77iopCW5LsyHpuXS0GnEc48mLeew=="
}, },
"streamjs": {
"version": "1.6.4",
"resolved": "https://registry.npmjs.org/streamjs/-/streamjs-1.6.4.tgz",
"integrity": "sha1-ehjXRWBDUcmYfXpNkPaNL+EX0GU="
},
"type-is": { "type-is": {
"version": "1.6.16", "version": "1.6.16",
"resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.16.tgz", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.16.tgz",
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"node-fetch": "^2.2.0", "node-fetch": "^2.2.0",
"path": "^0.12.7", "path": "^0.12.7",
"request": "^2.88.0", "request": "^2.88.0",
"send": "^0.16.2" "send": "^0.16.2",
"streamjs": "^1.6.4"
} }
} }
1
3
6
8
5
\ No newline at end of file
function map_add_hi(string) { function map_add_hi(string) {
return { action: "HI", state: "HI " + string }; let new_states = [];
let words = string.split(" ").forEach(word => {
new_states.push({ action: "LOWER", state: word });
});
return new_states;
}
function test() {
console.log("YOOOO")
}
function map_lower(string) {
return [{ action: "REDUCE_BY_KEY", state: string.toLowerCase() , aggregator: "count"}];
}
function resolver(data) {
switch (data.action) {
case "START":
return map_add_hi(data.state);
case "LOWER":
return map_lower(data.state);
default:
return { action: data.action, state: "NONE" };
} }
}
function map_add_bye(string) {
return { action: "BYE", state: "BYE " + string }; module.exports = resolver;
}
function map_add_end(string) {
return { action: "END", state: "END " + string };
}
function resolver(data) {
switch (data.action) {
case "START":
return map_add_hi(data.state);
case "HI":
return map_add_bye(data.state);
case "BYE":
return map_add_end(data.state);
default:
return { action: data.action, state: "NONE" };
}
}
module.exports = resolver;
\ No newline at end of file
function map_filter_by_score(string) {
if (Number(string.score) > 3) {
return [{ action: "END", state: string.movie }];
}
return [];
}
function resolver(data) {
switch (data.action) {
case "START":
return map_filter_by_score(data.state);
default:
return { action: data.action, state: "NONE" };
}
}
module.exports = resolver;
const fs = require("fs");
let digits = [];
var readStream = fs.createReadStream("randomNumbers.txt", "utf8");
readStream
.on("data", function(chunk) {
digits = [...digits, ...chunk.split("\n")];
})
.on("end", function() {
console.log("done");
});
function map_add_hi(string) {
let name = string[0];
const data = digits.map(digit => {
return { action: "END", state: [name, digit] };
});
return data;
}
function resolver(data) {
switch (data.action) {
case "START":
return map_add_hi(data.state);
default:
return { action: data.action, state: "NONE" };
}
}
module.exports = resolver;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment