From 5182117633971e4944fcac68555107aedf1b85ef Mon Sep 17 00:00:00 2001 From: arshiam2 <arshiam2@illinois.edu> Date: Sun, 2 Dec 2018 23:18:37 -0600 Subject: [PATCH] update --- .DS_Store | Bin 6148 -> 6148 bytes sfds/aggregators.js | 8 ++++ sfds/index.js | 89 ++++++++++++++++++++++++++++++++++------- sfds/package-lock.json | 5 +++ sfds/package.json | 3 +- sfds/randomNumbers.txt | 5 +++ sfds/topology.js | 53 ++++++++++++------------ sfds/topology2.js | 17 ++++++++ sfds/topology3.js | 30 ++++++++++++++ 9 files changed, 169 insertions(+), 41 deletions(-) create mode 100644 sfds/aggregators.js create mode 100644 sfds/randomNumbers.txt create mode 100644 sfds/topology2.js create mode 100644 sfds/topology3.js diff --git a/.DS_Store b/.DS_Store index c4fc845f8720a2e9f226dd0ad0f7a688068246f2..03b1cfbc02eefc432c19558ff432f9c0b23cad1b 100644 GIT binary patch delta 47 zcmZoMXfc@J&&aYdU^gQp%Vr)Xb4EcHhGK>^h7^Y4l;Y%^r2PDx&5q2QSvIqC{N)D# DG!hM) delta 32 ocmZoMXfc@J&&a$nU^gQp^JX3<bH>dn%!^qjHYjdp=lIJH0H4bV#{d8T diff --git a/sfds/aggregators.js b/sfds/aggregators.js new file mode 100644 index 0000000..ed535b4 --- /dev/null +++ b/sfds/aggregators.js @@ -0,0 +1,8 @@ +function count(oldState, newState) { + if (oldState != undefined) { + return oldState + 1 + } + return 1 +} + +module.exports = count; \ No newline at end of file diff --git a/sfds/index.js b/sfds/index.js index 98fde87..792ef4a 100644 --- a/sfds/index.js +++ b/sfds/index.js @@ -7,8 +7,20 @@ const process = require("process"); var bodyParser = require("body-parser"); const dgram = require("dgram"); const fetch = require("node-fetch"); +const uuidv4 = require("uuid/v4"); -const resolver = require("./topology"); +const topology1 = require("./topology"); +const topology2 = require("./topology2"); +const topology3 = require("./topology3"); + +const count = require("./aggregators"); + +// var WebSocket = require("ws"); +// ws = new WebSocket("ws://172.16.170.86:8080"); + +const topologies = [topology1, topology2, topology3]; + +let resolver = topologies[0]; var app = express(); app.use(bodyParser.json()); // support json encoded bodies @@ -396,33 +408,56 @@ app.get("/master", function(req, res) { queue = []; results = []; +streams = {}; +join = {}; app.post("/bolt", function(req, res) { - console.log("bolt state: " + req.body.state); - res.send(resolver(req.body)); + console.log("bolt state: " + req.body.action + " " + req.body.state); + res.send({ data: resolver(req.body) }); }); -app.post("/spout/:name", function(req, res) { - console.log("name " + argv.name) +app.post("/spout", function(req, res) { console.log("spout got: " + req.body.state); + console.log(req.body); queue.push(req.body); res.send("got it"); }); +app.post("/setTopology/:state", function(req, res) { + // console.log("spout got: " + req.body.state); + // queue.push(req.body); + console.log(req.params.state % topologies.length); + console.log(topologies[req.params.state % topologies.length]); + // topology = topologies[req.params.state % topologies.length] + resolver = topologies[req.params.state % topologies.length]; + res.send("Set to " + req.params.state); +}); + app.get("/sink", function(req, res) { + if (Object.keys(join).length > 0) { + queue = []; + results = Object.keys(join).map(key => [key, join[key]]); + join = {}; + } if (results.length > 0) { res.send(results.pop()); } else { - res.send("DONE"); + res.send("EMPTY"); } }); app.post("/getSecond", function(req, res) { - res.send(Math.min(...clients.filter(e => String(e) !== String(process.env.VM)))); + res.send( + Math.min(...clients.filter(e => String(e) !== String(process.env.VM))) + ); }); round_robin = 0; +running_commands = {}; + + + setInterval(function sync() { if (queue.length > 0) { const body = queue.pop(); @@ -431,6 +466,16 @@ setInterval(function sync() { machineToIps[clients[round_robin % clients.length]] + ":3000/bolt/"; round_robin += 1; + let job = uuidv4(); + console.log({ job: body }); + running_commands[job] = body; + let done = false + setInterval(function sync() { + if (!done) { + queue.push(body) + } + // console.log(Object.keys(running_commands).length); + }, 500); fetch(url, { method: "post", body: JSON.stringify(body), @@ -438,15 +483,31 @@ setInterval(function sync() { }) .then(res => res.json()) .then(json => { - if (json.action !== "END") { - queue.push(json); - } else { - results.push(json.state); - console.log("result: " + json.state); - } + // delete running_commands[job]; + done = true + json["data"].forEach(x => { + if (x.action == "REDUCE_BY_KEY") { + console.log(x); + if ((x.aggregator = "count")) { + if (join[x.state] == undefined) { + join[x.state] = count(undefined, x.state); + } else { + join[x.state] = count(join[x.state], x.state); + } + } + } else if (x.action == "END") { + console.log(x); + // ws.onopen = function(evt) { + // ws.send(x); + // }; + results.push(x); + } else { + queue.push(x); + } + }); }); } -}, 250); +}, 10); http.createServer(app).listen(3000, machineToIps[process.env.VM], function() { console.log("Express server listening on port 3000"); diff --git a/sfds/package-lock.json b/sfds/package-lock.json index 529126d..816c77a 100644 --- a/sfds/package-lock.json +++ b/sfds/package-lock.json @@ -209,6 +209,11 @@ "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.4.0.tgz", "integrity": "sha512-zhSCtt8v2NDrRlPQpCNtw/heZLtfUDqxBM1udqikb/Hbk52LK4nQSwr10u77iopCW5LsyHpuXS0GnEc48mLeew==" }, + "streamjs": { + "version": "1.6.4", + "resolved": "https://registry.npmjs.org/streamjs/-/streamjs-1.6.4.tgz", + "integrity": "sha1-ehjXRWBDUcmYfXpNkPaNL+EX0GU=" + }, "type-is": { "version": "1.6.16", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.16.tgz", diff --git a/sfds/package.json b/sfds/package.json index 979792c..0c82050 100644 --- a/sfds/package.json +++ b/sfds/package.json @@ -14,6 +14,7 @@ "node-fetch": "^2.2.0", "path": "^0.12.7", "request": "^2.88.0", - "send": "^0.16.2" + "send": "^0.16.2", + "streamjs": "^1.6.4" } } diff --git a/sfds/randomNumbers.txt b/sfds/randomNumbers.txt new file mode 100644 index 0000000..6e11f21 --- /dev/null +++ b/sfds/randomNumbers.txt @@ -0,0 +1,5 @@ +1 +3 +6 +8 +5 \ No newline at end of file diff --git a/sfds/topology.js b/sfds/topology.js index 09b4330..6452f3d 100644 --- a/sfds/topology.js +++ b/sfds/topology.js @@ -1,28 +1,29 @@ function map_add_hi(string) { - return { action: "HI", state: "HI " + string }; + let new_states = []; + let words = string.split(" ").forEach(word => { + new_states.push({ action: "LOWER", state: word }); + }); + return new_states; +} + + +function test() { + console.log("YOOOO") +} + +function map_lower(string) { + return [{ action: "REDUCE_BY_KEY", state: string.toLowerCase() , aggregator: "count"}]; +} + +function resolver(data) { + switch (data.action) { + case "START": + return map_add_hi(data.state); + case "LOWER": + return map_lower(data.state); + default: + return { action: data.action, state: "NONE" }; } - - function map_add_bye(string) { - return { action: "BYE", state: "BYE " + string }; - } - - - function map_add_end(string) { - return { action: "END", state: "END " + string }; - } - - function resolver(data) { - switch (data.action) { - case "START": - return map_add_hi(data.state); - case "HI": - return map_add_bye(data.state); - case "BYE": - return map_add_end(data.state); - default: - return { action: data.action, state: "NONE" }; - } - } - - module.exports = resolver; - \ No newline at end of file +} + +module.exports = resolver; diff --git a/sfds/topology2.js b/sfds/topology2.js new file mode 100644 index 0000000..203c2d7 --- /dev/null +++ b/sfds/topology2.js @@ -0,0 +1,17 @@ +function map_filter_by_score(string) { + if (Number(string.score) > 3) { + return [{ action: "END", state: string.movie }]; + } + return []; +} + +function resolver(data) { + switch (data.action) { + case "START": + return map_filter_by_score(data.state); + default: + return { action: data.action, state: "NONE" }; + } +} + +module.exports = resolver; diff --git a/sfds/topology3.js b/sfds/topology3.js new file mode 100644 index 0000000..a084b04 --- /dev/null +++ b/sfds/topology3.js @@ -0,0 +1,30 @@ +const fs = require("fs"); + +let digits = []; +var readStream = fs.createReadStream("randomNumbers.txt", "utf8"); +readStream + .on("data", function(chunk) { + digits = [...digits, ...chunk.split("\n")]; + }) + .on("end", function() { + console.log("done"); + }); + +function map_add_hi(string) { + let name = string[0]; + const data = digits.map(digit => { + return { action: "END", state: [name, digit] }; + }); + return data; +} + +function resolver(data) { + switch (data.action) { + case "START": + return map_add_hi(data.state); + default: + return { action: data.action, state: "NONE" }; + } +} + +module.exports = resolver; -- GitLab