Skip to content
Snippets Groups Projects
Commit d0b99554 authored by strider's avatar strider
Browse files

Lab source code

parent eac76d8b
No related branches found
No related tags found
No related merge requests found
......@@ -127,3 +127,7 @@ dmypy.json
# Pyre type checker
.pyre/
# Idea
.idea/*
import json
import logging
import sys
import greengrasssdk
# Logging
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# SDK Client
client = greengrasssdk.client("iot-data")
# c02 tracker
max_co2 = {}
def lambda_handler(event, context):
logger.info("Received message!")
logger.info(event)
global max_co2
if "reset" in event:
max_co2 = {}
return
# TODO1: Get your data
if "vehicle_CO2" not in event or "device_id" not in event:
logger.info("No Data!")
return
logger.info(event["vehicle_CO2"])
# TODO2: Calculate max CO2 emission
co2 = event["vehicle_CO2"]
device = event["device_id"]
max_co2[device] = max(max_co2.get(device, 0), co2)
data = {
"device_id": device,
"max_co2": max_co2[device],
"timestep_time": event["timestep_time"]
}
# TODO3: Return the result to device
client.publish(
topic=f"emissions/co2/{device}",
payload=json.dumps("Max Emission: {}".format(max_co2[device])),
)
client.publish(
topic=f"emissions/co2/all",
payload=json.dumps(data),
)
return
\ No newline at end of file
# Import SDK packages
import os
import sys
import uuid
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import pandas as pd
# TODO 1: modify the following parameters
# Starting and end index, modify this
total_devices = 2
# Path to the dataset, modify this
data_path = "data2/vehicle{}.csv"
# Path to your certificates, modify this
certificate_formatter = "./certificates/device_{}/device_{}-certificate.pem.crt"
key_formatter = "./certificates/device_{}/device_{}-private.pem.key"
endpoint = "a2lbnjtkvt1dl2-ats.iot.us-east-1.amazonaws.com"
rootCa = "certificates/AmazonRootCA1.pem"
GROUP_CA_PATH = "./groupCA/"
clients = []
device_state = {}
def exit_handler():
print("Disconnecting!")
for client in clients:
client.client.disconnect()
class MQTTClient:
def __init__(self, device_id, cert, key, groupCA, core_host, core_port):
# For certificate based connection
self.device_id = device_id
self.name = f"device_{device_id}"
self.state = 0
self.client = AWSIoTMQTTClient(self.name)
# Configure group core endpoints
self.client.configureEndpoint(core_host, core_port)
# Set files downloaded for each device
self.client.configureCredentials(groupCA, key, cert)
self.client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
self.client.configureDrainingFrequency(2) # Draining: 2 Hz
self.client.configureConnectDisconnectTimeout(10) # 10 sec
self.client.configureMQTTOperationTimeout(10) # 5 sec
self.client.onMessage = self.customOnMessage
#self.client.subscribe(f"emissions/co2/{self.name}", 0, self.customOnMessage)
def customOnMessage(self, message):
# TODO3: fill in the function to show your received message
print("{} received payload {} from topic {}".format(self.device_id, message.payload, message.topic))
# Suback callback
def customSubackCallback(self, mid, data):
# You don't need to write anything here
pass
# Puback callback
def customPubackCallback(self, mid):
# You don't need to write anything here
pass
def publish(self, Payload="payload"):
# TODO4: fill in this function for your publish
self.client.subscribeAsync(f"emissions/co2/{self.name}", 0, ackCallback=self.customSubackCallback)
self.client.publishAsync("emissions/co2/trigger", Payload, 0, ackCallback=self.customPubackCallback)
def discover_core(d_id):
# Progressive back off core
backOffCore = ProgressiveBackOffCore()
# Discover GGCs
discoveryInfoProvider = DiscoveryInfoProvider()
discoveryInfoProvider.configureEndpoint(endpoint)
discoveryInfoProvider.configureCredentials(rootCa, certificate_formatter.format(d_id, d_id),
key_formatter.format(d_id, d_id))
discoveryInfoProvider.configureTimeout(10) # 10 sec
discovered = False
groupCA = None
coreInfo = None
try:
discoveryInfo = discoveryInfoProvider.discover(f"device_{d_id}")
caList = discoveryInfo.getAllCas()
coreList = discoveryInfo.getAllCores()
# We only pick the first ca and core info
groupId, ca = caList[0]
coreInfo = coreList[0]
print("Discovered GGC: %s from Group: %s" % (coreInfo.coreThingArn, groupId))
print("Now we persist the connectivity/identity information...")
groupCA = GROUP_CA_PATH + groupId + "_CA_" + str(uuid.uuid4()) + ".crt"
if not os.path.exists(GROUP_CA_PATH):
os.makedirs(GROUP_CA_PATH)
groupCAFile = open(groupCA, "w")
groupCAFile.write(ca)
groupCAFile.close()
discovered = True
print("Now proceed to the connecting flow...")
except Exception as e:
print("Error in discovery!")
print("Type: %s" % str(type(e)))
print("Error message: %s" % str(e))
print("Backing off...\n")
backOffCore.backOff()
# Stop program if GG Core is not discovered
if not discovered:
# With print_discover_resp_only flag, we only woud like to check if the API get called correctly.
print("Discovery failed. Exiting...\n")
sys.exit(-1)
return groupCA, coreInfo
print("Loading vehicle data...")
data = []
for i in range(total_devices):
a = pd.read_csv(data_path.format(i))
data.append(a)
device_state[i] = 0
print("Initializing MQTTClients...")
for device_id in range(total_devices):
groupCA, coreInfo = discover_core(device_id)
core_host = "192.168.86.57"
core_port = 8883
print("Trying to connect to core at %s:%d" % (core_host, core_port))
client = MQTTClient(device_id, certificate_formatter.format(device_id, device_id),
key_formatter.format(device_id, device_id), groupCA, core_host, core_port)
client.client.connect()
clients.append(client)
# Reset lambda data
clients[0].publish(json.dumps({'reset': True}))
while True:
# Loop through each device
for client in clients:
# Send next row of data
next_row = data[client.device_id].iloc[[device_state[client.device_id]]].to_dict('r')[0]
next_row['device_id'] = client.name
device_state[client.device_id] = device_state[client.device_id] + 1
client.publish(json.dumps(next_row))
# Sleep for a couple of seconds
time.sleep(3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment