diff --git a/.DS_Store b/.DS_Store
index c4fc845f8720a2e9f226dd0ad0f7a688068246f2..03b1cfbc02eefc432c19558ff432f9c0b23cad1b 100644
Binary files a/.DS_Store and b/.DS_Store differ
diff --git a/sfds/aggregators.js b/sfds/aggregators.js
new file mode 100644
index 0000000000000000000000000000000000000000..ed535b4f2f69801e67d67e55188163a48a6710cc
--- /dev/null
+++ b/sfds/aggregators.js
@@ -0,0 +1,8 @@
+function count(oldState, newState) {
+    if (oldState != undefined) {
+        return oldState + 1
+    }
+    return 1
+}
+
+module.exports = count;
\ No newline at end of file
diff --git a/sfds/index.js b/sfds/index.js
index 98fde875c7731cdfa54ac2a0deaabd416feeb322..792ef4a729050af02e4980df225bfec577d91b84 100644
--- a/sfds/index.js
+++ b/sfds/index.js
@@ -7,8 +7,20 @@ const process = require("process");
 var bodyParser = require("body-parser");
 const dgram = require("dgram");
 const fetch = require("node-fetch");
+const uuidv4 = require("uuid/v4");
 
-const resolver = require("./topology");
+const topology1 = require("./topology");
+const topology2 = require("./topology2");
+const topology3 = require("./topology3");
+
+const count = require("./aggregators");
+
+// var WebSocket = require("ws");
+// ws = new WebSocket("ws://172.16.170.86:8080");
+
+const topologies = [topology1, topology2, topology3];
+
+let resolver = topologies[0];
 
 var app = express();
 app.use(bodyParser.json()); // support json encoded bodies
@@ -396,33 +408,56 @@ app.get("/master", function(req, res) {
 
 queue = [];
 results = [];
+streams = {};
+join = {};
 
 app.post("/bolt", function(req, res) {
-  console.log("bolt state: " + req.body.state);
-  res.send(resolver(req.body));
+  console.log("bolt state: " + req.body.action + " " + req.body.state);
+  res.send({ data: resolver(req.body) });
 });
 
-app.post("/spout/:name", function(req, res) {
-  console.log("name " + argv.name)
+app.post("/spout", function(req, res) {
   console.log("spout got: " + req.body.state);
+  console.log(req.body);
   queue.push(req.body);
   res.send("got it");
 });
 
+app.post("/setTopology/:state", function(req, res) {
+  // console.log("spout got: " + req.body.state);
+  // queue.push(req.body);
+  console.log(req.params.state % topologies.length);
+  console.log(topologies[req.params.state % topologies.length]);
+  // topology = topologies[req.params.state % topologies.length]
+  resolver = topologies[req.params.state % topologies.length];
+  res.send("Set to " + req.params.state);
+});
+
 app.get("/sink", function(req, res) {
+  if (Object.keys(join).length > 0) {
+    queue = [];
+    results = Object.keys(join).map(key => [key, join[key]]);
+    join = {};
+  }
   if (results.length > 0) {
     res.send(results.pop());
   } else {
-    res.send("DONE");
+    res.send("EMPTY");
   }
 });
 
 app.post("/getSecond", function(req, res) {
-  res.send(Math.min(...clients.filter(e => String(e) !== String(process.env.VM))));
+  res.send(
+    Math.min(...clients.filter(e => String(e) !== String(process.env.VM)))
+  );
 });
 
 round_robin = 0;
 
+running_commands = {};
+
+
+
 setInterval(function sync() {
   if (queue.length > 0) {
     const body = queue.pop();
@@ -431,6 +466,16 @@ setInterval(function sync() {
       machineToIps[clients[round_robin % clients.length]] +
       ":3000/bolt/";
     round_robin += 1;
+    let job = uuidv4();
+    console.log({ job: body });
+    running_commands[job] = body;
+    let done = false
+    setInterval(function sync() {
+      if (!done) {
+        queue.push(body)
+      }
+      // console.log(Object.keys(running_commands).length);
+    }, 500);
     fetch(url, {
       method: "post",
       body: JSON.stringify(body),
@@ -438,15 +483,31 @@ setInterval(function sync() {
     })
       .then(res => res.json())
       .then(json => {
-        if (json.action !== "END") {
-          queue.push(json);
-        } else {
-          results.push(json.state);
-          console.log("result: " + json.state);
-        }
+        // delete running_commands[job];
+        done = true
+        json["data"].forEach(x => {
+          if (x.action == "REDUCE_BY_KEY") {
+            console.log(x);
+            if ((x.aggregator = "count")) {
+              if (join[x.state] == undefined) {
+                join[x.state] = count(undefined, x.state);
+              } else {
+                join[x.state] = count(join[x.state], x.state);
+              }
+            }
+          } else if (x.action == "END") {
+            console.log(x);
+            // ws.onopen = function(evt) {
+            //   ws.send(x);
+            // };
+            results.push(x);
+          } else {
+            queue.push(x);
+          }
+        });
       });
   }
-}, 250);
+}, 10);
 
 http.createServer(app).listen(3000, machineToIps[process.env.VM], function() {
   console.log("Express server listening on port 3000");
diff --git a/sfds/package-lock.json b/sfds/package-lock.json
index 529126d77479edc80070765b8b0abff50cffa0a0..816c77a24181dd39d8cb0ec316ce7365715f93a8 100644
--- a/sfds/package-lock.json
+++ b/sfds/package-lock.json
@@ -209,6 +209,11 @@
       "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.4.0.tgz",
       "integrity": "sha512-zhSCtt8v2NDrRlPQpCNtw/heZLtfUDqxBM1udqikb/Hbk52LK4nQSwr10u77iopCW5LsyHpuXS0GnEc48mLeew=="
     },
+    "streamjs": {
+      "version": "1.6.4",
+      "resolved": "https://registry.npmjs.org/streamjs/-/streamjs-1.6.4.tgz",
+      "integrity": "sha1-ehjXRWBDUcmYfXpNkPaNL+EX0GU="
+    },
     "type-is": {
       "version": "1.6.16",
       "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.16.tgz",
diff --git a/sfds/package.json b/sfds/package.json
index 979792c4f29d7e0d6ca3f4bf09ae019244332211..0c8205065581dd76deb2e1983c58f84a99ee5f6c 100644
--- a/sfds/package.json
+++ b/sfds/package.json
@@ -14,6 +14,7 @@
     "node-fetch": "^2.2.0",
     "path": "^0.12.7",
     "request": "^2.88.0",
-    "send": "^0.16.2"
+    "send": "^0.16.2",
+    "streamjs": "^1.6.4"
   }
 }
diff --git a/sfds/randomNumbers.txt b/sfds/randomNumbers.txt
new file mode 100644
index 0000000000000000000000000000000000000000..6e11f217987ed8fa5112667d26fd0d11660c1b82
--- /dev/null
+++ b/sfds/randomNumbers.txt
@@ -0,0 +1,5 @@
+1
+3
+6
+8
+5
\ No newline at end of file
diff --git a/sfds/topology.js b/sfds/topology.js
index 09b43304945f5fc16f0972f21c9acee34022824e..6452f3d2976fc8185fee4ecaa438db62c870252a 100644
--- a/sfds/topology.js
+++ b/sfds/topology.js
@@ -1,28 +1,29 @@
 function map_add_hi(string) {
-    return { action: "HI", state: "HI " + string };
+  let new_states = [];
+  let words = string.split(" ").forEach(word => {
+    new_states.push({ action: "LOWER", state: word });
+  });
+  return new_states;
+}
+
+
+function test() {
+  console.log("YOOOO")
+}
+
+function map_lower(string) {
+  return [{ action: "REDUCE_BY_KEY", state: string.toLowerCase() , aggregator: "count"}];
+}
+
+function resolver(data) {
+  switch (data.action) {
+    case "START":
+      return map_add_hi(data.state);
+    case "LOWER":
+      return map_lower(data.state);
+    default:
+      return { action: data.action, state: "NONE" };
   }
-  
-  function map_add_bye(string) {
-    return { action: "BYE", state: "BYE " + string };
-  }
-  
-  
-  function map_add_end(string) {
-      return { action: "END", state: "END " + string };
-    }
-  
-  function resolver(data) {
-    switch (data.action) {
-      case "START":
-        return map_add_hi(data.state);
-      case "HI":
-        return map_add_bye(data.state);
-      case "BYE":
-        return map_add_end(data.state);
-      default:
-        return { action: data.action, state: "NONE" };
-    }
-  }
-  
-  module.exports = resolver;
-  
\ No newline at end of file
+}
+
+module.exports = resolver;
diff --git a/sfds/topology2.js b/sfds/topology2.js
new file mode 100644
index 0000000000000000000000000000000000000000..203c2d7906452c466f45c7c84ce108b6106da0b2
--- /dev/null
+++ b/sfds/topology2.js
@@ -0,0 +1,17 @@
+function map_filter_by_score(string) {
+  if (Number(string.score) > 3) {
+    return [{ action: "END", state: string.movie }];
+  }
+  return [];
+}
+
+function resolver(data) {
+  switch (data.action) {
+    case "START":
+      return map_filter_by_score(data.state);
+    default:
+      return { action: data.action, state: "NONE" };
+  }
+}
+
+module.exports = resolver;
diff --git a/sfds/topology3.js b/sfds/topology3.js
new file mode 100644
index 0000000000000000000000000000000000000000..a084b041de1de41188e00df1a0ad102cf9ac2906
--- /dev/null
+++ b/sfds/topology3.js
@@ -0,0 +1,30 @@
+const fs = require("fs");
+
+let digits = [];
+var readStream = fs.createReadStream("randomNumbers.txt", "utf8");
+readStream
+  .on("data", function(chunk) {
+    digits = [...digits, ...chunk.split("\n")];
+  })
+  .on("end", function() {
+    console.log("done");
+  });
+
+function map_add_hi(string) {
+  let name = string[0];
+  const data = digits.map(digit => {
+    return { action: "END", state: [name, digit] };
+  });
+  return data;
+}
+
+function resolver(data) {
+  switch (data.action) {
+    case "START":
+      return map_add_hi(data.state);
+    default:
+      return { action: data.action, state: "NONE" };
+  }
+}
+
+module.exports = resolver;