Skip to content
Snippets Groups Projects
Commit 16241a64 authored by stangao2's avatar stangao2
Browse files

init

parents
Branches master
No related tags found
No related merge requests found
certificates
gen-cert
\ No newline at end of file
################################################### Connecting to AWS
import boto3
import json
################################################### Create random name for things
import random
import string
################################################### Parameters for Thing
thingArn = 'arn:aws:iot:us-west-2:626635433575:thinggroup/tg2'
thingId = 'generated'
thingName = ''.join([random.choice(string.ascii_letters + string.digits) for n in range(15)])
defaultPolicyName = 'policy1'
###################################################
def createThing():
global thingClient
thingResponse = thingClient.create_thing(
thingName = thingName
)
data = json.loads(json.dumps(thingResponse, sort_keys=False, indent=4))
for element in data:
if element == 'thingArn':
thingArn = data['thingArn']
elif element == 'thingId':
thingId = data['thingId']
createCertificate()
def createCertificate():
global thingClient
certResponse = thingClient.create_keys_and_certificate(
setAsActive = True
)
data = json.loads(json.dumps(certResponse, sort_keys=False, indent=4))
for element in data:
if element == 'certificateArn':
certificateArn = data['certificateArn']
elif element == 'keyPair':
PublicKey = data['keyPair']['PublicKey']
PrivateKey = data['keyPair']['PrivateKey']
elif element == 'certificatePem':
certificatePem = data['certificatePem']
elif element == 'certificateId':
certificateId = data['certificateId']
with open('public.key', 'w') as outfile:
outfile.write(PublicKey)
with open('private.key', 'w') as outfile:
outfile.write(PrivateKey)
with open('cert.pem', 'w') as outfile:
outfile.write(certificatePem)
response = thingClient.attach_policy(
policyName = defaultPolicyName,
target = certificateArn
)
response = thingClient.attach_thing_principal(
thingName = thingName,
principal = certificateArn
)
thingClient = boto3.client('iot')
createThing()
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
{
"component": {
"FirehosePublisher": {
"author": "a",
"version": "1.0.6",
"build": {
"build_system": "zip"
},
"publish": {
"bucket": "aws-greengrass-component-artifacts",
"region": "us-west-2"
}
}
},
"gdk_version": "1.0.0"
}
\ No newline at end of file
import sys
import json
import time
import logging
import boto3
import backoff
import threading
from awsgreengrasspubsubsdk.pubsub_client import AwsGreengrassPubSubSdkClient
class Config:
def __init__(self):
self.delivery_stream_name = "PUT-S3-oP8iQ"
self.region = "us-west-2"
class FirehoseClient:
"""
AWS Firehose client to send records and monitor metrics.
Attributes:
config (object): Configuration object with delivery stream name and region.
delivery_stream_name (str): Name of the Firehose delivery stream.
firehose (boto3.client): Boto3 Firehose client.
"""
def __init__(self, config):
"""
Initialize the FirehoseClient.
Args:
config (object): Configuration object with delivery stream name and region.
"""
self.config = config
self.delivery_stream_name = config.delivery_stream_name
self.region = config.region
self.firehose = boto3.client("firehose", region_name=self.region)
self.total_records_sent = 0
@backoff.on_exception(
backoff.expo, Exception, max_tries=5, jitter=backoff.full_jitter
)
def put_record_batch(self, data: list, batch_size: int = 500):
"""
Put records in batches to Firehose with backoff and retry.
Args:
data (list): List of data records to be sent to Firehose.
batch_size (int): Number of records to send in each batch. Default is 500.
This method attempts to send records in batches to the Firehose delivery stream.
It retries with exponential backoff in case of exceptions.
"""
for i in range(0, len(data), batch_size):
batch = data[i : i + batch_size]
# print records print for debugging
self.total_records_sent += len(batch)
logger.info(f"Sending batch of {len(batch)} records to Firehose. Total batches sent: {self.total_records_sent}")
record_dicts = [{"Data": json.dumps(record)} for record in batch]
try:
response = self.firehose.put_record_batch(
DeliveryStreamName=self.delivery_stream_name, Records=record_dicts
)
except Exception as e:
logger.info(f"Failed to send batch of {len(batch)} records. Error: {e}")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MyAwsGreengrassV2Component():
def __init__(self):
self.lock = threading.Lock() # Lock to prevent
# initialize client
base_topic = "GGHW" # only used if no topic is provided to client.publish_message. We don't use it in this simple example
self.client = AwsGreengrassPubSubSdkClient(base_topic=base_topic, default_message_handler=self.message_handler)
self.client.activate_mqtt_pubsub()
self.client.activate_ipc_pubsub()
config = Config()
self.firehose_client = FirehoseClient(config) # Initialize Firehose client
self.current_list = 1;
self.firehose_data_list_1 = [] # Initialize data list to store records
self.firehose_data_list_2 = [] # Initialize data list to store records
self.firehose_last_send_time = time.time()
self.firehose_send_interval = 10 # Send data to Firehose every 10 seconds at most
# Expose the client methods as class methods
self.subscribe_to_topic = self.client.subscribe_to_topic
def message_handler(self, protocol, topic, message_id, status, route, message_payload):
logger.info(f"Received message on {topic}: {message_payload}")
# lets make sure two threads arent accessing firehose_data_list at the same time by using a lock
with self.lock:
if self.current_list == 1:
self.firehose_data_list_1.append(message_payload)
else:
self.firehose_data_list_2.append(message_payload)
if __name__ == "__main__":
# define topics
subscribe_topic = "emiTest/out"
# Initialize client
client = MyAwsGreengrassV2Component()
logger.debug("initialized")
# subscribe to topic
client.subscribe_to_topic("ipc_mqtt", subscribe_topic)
firehose_send_interval = 5 # Send data to Firehose every 10 seconds at most
try:
while True:
# send to firehose at specified time intervaltime interval
# and flip the list that gets appended to on message receipt
time.sleep(firehose_send_interval)
logger.debug("iattempted send")
with client.lock:
list_to_send = client.current_list
if list_to_send == 1:
client.current_list = 2
else:
client.current_list = 1
# actuallhy send the data and reset list
if list_to_send == 1:
client.firehose_client.put_record_batch(client.firehose_data_list_1)
client.firehose_data_list_1 = []
else:
client.firehose_client.put_record_batch(client.firehose_data_list_2)
client.firehose_data_list_2 = []
except Exception as e:
logger.error(f"Error running the component: {e}")
\ No newline at end of file
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "FirehosePublisher",
"ComponentVersion": "1.0.6",
"ComponentDescription": "a",
"ComponentPublisher": "a",
"ComponentDependencies": {
"aws.greengrass.TokenExchangeService": {
"VersionRequirement": "^2.0.0",
"DependencyType": "HARD"
}
},
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"FirehosePublisher:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics through ipc",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
},
"aws.greengrass.ipc.mqttproxy": {
"FirehosePublisher:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics through mqtt",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Artifacts": [
{
"URI": "s3://aws-greengrass-component-artifacts/FirehosePublisher/src.zip",
"Unarchive": "ZIP"
}
],
"Lifecycle": {
"Install": {
"Script": "python3 -m pip install awsgreengrasspubsubsdk && python3 -m pip install boto3 && python3 -m pip install backoff"
},
"Run": {
"Script": "python3 -u {artifacts:decompressedPath}/src/main.py",
"RequiresPrivilege": "false"
}
}
}
]
}
\ No newline at end of file
{
"component": {
"emiTest": {
"author": "a",
"version": "0.0.5",
"build": {
"build_system": "zip"
},
"publish": {
"bucket": "aws-greengrass-component-artifacts",
"region": "us-west-2"
}
}
},
"gdk_version": "1.0.0"
}
\ No newline at end of file
import sys
import json
import time
import logging
from awsgreengrasspubsubsdk.pubsub_client import AwsGreengrassPubSubSdkClient
from awsgreengrasspubsubsdk.message_formatter import PubSubMessageFormatter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MyAwsGreengrassV2Component():
def __init__(self):
# initialize client
base_topic = "GGHW" # only used if no topic is provided to client.publish_message. We don't use it in this simple example
self.client = AwsGreengrassPubSubSdkClient(base_topic=base_topic, default_message_handler=self.message_handler)
self.client.activate_mqtt_pubsub()
self.client.activate_ipc_pubsub()
# message formatter for consistent messaging API (starting point for routes etc)
self.message_formatter = PubSubMessageFormatter()
# Expose the client methods as class methods
self.subscribe_to_topic = self.client.subscribe_to_topic
self.emissions = dict()
def publish_message(self, message, topic=None):
sdk_message = self.message_formatter.get_message(message=message)
self.client.publish_message('ipc_mqtt', sdk_message, topic=topic) # Publish using MQTT and IPC protocol
return sdk_message
def message_handler(self, protocol, topic, message_id, status, route, message_payload):
logger.info(f"Received message on {topic}: {message_payload}")
payload = json.loads(message_payload)
vehicle_id = payload['vehicle_id']
co2 = payload['vehicle_CO2']
if vehicle_id in self.emissions and co2 > self.emissions[vehicle_id]:
self.emissions[vehicle_id] = co2
elif vehicle_id not in self.emissions:
self.emissions[vehicle_id] = co2
message = {"max_emission": self.emissions[vehicle_id]}
sdk_message = client.publish_message(message, topic="emiTest/out")
if __name__ == "__main__":
# define topics
subscribe_topic = "emiTest/in"
publish_topic = "emiTest/out"
# Initialize client
client = MyAwsGreengrassV2Component()
# subscribe to topic
# NOTE since we are using the sdk here, the client requires messages json formatted
# as show below, where the contents of "message" is the actual payload/custom data.
# {
# "sdk_version": "0.1.4",
# "message_id": "20240930180857201116",
# "status": 200,
# "route": "default_message_handler",
# "message": { "user_msg": "Hello World, from IoT CONSOLE" }
# }
client.subscribe_to_topic("ipc_mqtt", subscribe_topic)
try:
while True:
message = {"user_msg": "greengrasscore is still alive"}
sdk_message = client.publish_message( message, topic=publish_topic) # Publish using MQTT and IPC protocol
logger.info(f"Published sdk message to {publish_topic}: {sdk_message}")
time.sleep(120)
except Exception as e:
logger.error(f"Error running the component: {e}")
\ No newline at end of file
import logging
log = logging.getLogger(__name__)
class MessageHandler():
def __init__(self, publish_message, message_formatter):
self.publish_message = publish_message
self.message_formatter = message_formatter
def emission_request(self, protocol, topic, message_id, status, route, message):
try:
log.info('emission req receieved on protocl: {} - topic: {}'.format(protocol, topic))
log.info('message received: {}'.format(message))
msg = {"text": "aaaa"}
response = self.message_formatter.get_message(message_id=message_id, route=route, message=msg)
self.publish_message(protocol=protocol, message=response)
except Exception as e:
error_message = 'Exception in emission request: {}'.format(e)
log.info(error_message)
\ No newline at end of file
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "emiTest",
"ComponentVersion": "0.0.5",
"ComponentDescription": "a",
"ComponentPublisher": "a",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"emiTest:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics through ipc",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
},
"aws.greengrass.ipc.mqttproxy": {
"emiTest:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics through mqtt",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Artifacts": [
{
"URI": "s3://aws-greengrass-component-artifacts/emiTest/src.zip",
"Unarchive": "ZIP"
}
],
"Lifecycle": {
"Install": {
"Script": "python3 -m pip install awsgreengrasspubsubsdk && python3 -m pip install numpy"
},
"Run": {
"Script": "python3 -u {artifacts:decompressedPath}/src/main.py '{configuration:/PubSubTopics}'",
"RequiresPrivilege": "false"
}
}
}
]
}
\ No newline at end of file
-----BEGIN CERTIFICATE-----
MIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF
ADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6
b24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL
MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv
b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj
ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM
9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw
IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6
VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L
93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm
jgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC
AYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA
A4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI
U5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs
N+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv
o/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU
5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy
rqXRfboQnoZsG4q5WTP468SQvvG5
-----END CERTIFICATE-----
# Import SDK packages
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import pandas as pd
import numpy as np
#TODO 1: modify the following parameters
#Starting and end index, modify this
device_st = 0
device_end = 5
#Path to the dataset, modify this
data_path = "./data/vehicle{}.csv"
#Path to your certificates, modify this
certificate_formatter = "./gen-cert/v{}/cert.pem"
key_formatter = "./gen-cert/v{}/private.key"
class MQTTClient:
def __init__(self, device_id, cert, key):
# For certificate based connection
self.device_id = str(device_id)
self.state = 0
self.client = AWSIoTMQTTClient(self.device_id)
#TODO 2: modify your broker address
self.client.configureEndpoint("a2208lrv5avpgj-ats.iot.us-west-2.amazonaws.com", 8883)
self.client.configureCredentials("./keys/AmazonRootCA1.pem", key, cert)
self.client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
self.client.configureDrainingFrequency(2) # Draining: 2 Hz
self.client.configureConnectDisconnectTimeout(10) # 10 sec
self.client.configureMQTTOperationTimeout(5) # 5 sec
self.client.onMessage = self.customOnMessage
self.client.subscribe("emiTest/out/veh{}".format(device_id), 0, None)
print('subscribed to: ', "emiTest/out/veh{}".format(device_id))
def customOnMessage(self,message):
#TODO 3: fill in the function to show your received message
print("client {} received payload {}".format(self.device_id, message.payload))
# Suback callback
def customSubackCallback(self,mid, data):
#You don't need to write anything here
pass
# Puback callback
def customPubackCallback(self,mid):
#You don't need to write anything here
pass
def publish(self, topic):
# Load the vehicle's emission data
df = pd.read_csv(data_path.format(self.device_id))
for index, row in df.iterrows():
# Create a JSON payload from the row data
payload = json.dumps(row.to_dict())
message = {
"sdk_version": "0.1.4",
"message_id": "1234124",
"status": 200,
"route": "",
"message": payload
}
message = json.dumps(message)
# Publish the payload to the specified topic
print(f"Publishing to {topic}")
self.client.publishAsync(topic, message, 0, ackCallback=self.customPubackCallback)
# Sleep to simulate real-time data publishing
print("Loading vehicle data...")
data = []
for i in range(5):
a = pd.read_csv(data_path.format(i))
data.append(a)
print("Initializing MQTTClients...")
clients = []
for device_id in range(device_st, device_end):
client = MQTTClient(device_id,certificate_formatter.format(device_id) ,key_formatter.format(device_id))
client.client.connect()
clients.append(client)
while True:
print("send now?")
x = input()
if x == "s":
for i,c in enumerate(clients):
topic = "emiTest/in"
c.publish(topic=topic)
elif x == "d":
for c in clients:
c.client.disconnect()
print("All devices disconnected")
exit()
else:
print("wrong key pressed")
time.sleep(3)
import json
import logging
import sys
import greengrasssdk
# Logging
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# SDK Client
client = greengrasssdk.client("iot-data")
def lambda_handler(event, context):
# TODO1: Get your data
# TODO2: Calculate max CO2 emission
maxCounter = 0.0
for record in event:
CO2_val = float(record['vehicle_CO2'])
vehicle_stat = record['vehicle_id']
if CO2_val > maxCounter:
maxCounter = CO2_val
# TODO3: Return the result
client.publish(
topic="iot/Vehicle_" + vehicle_stat,
queueFullPolicy="AllOrException",
payload=json.dumps({"max_CO2": maxCounter, }),
)
return
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment