diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000000000000000000000000000000000000..106db24ebf96fa388e880a31c1ff5ebee4b98767 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,17 @@ +<component name="InspectionProjectProfileManager"> + <profile version="1.0"> + <option name="myName" value="Project Default" /> + <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true"> + <Languages> + <language minSize="123" name="Python" /> + </Languages> + </inspection_tool> + <inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true"> + <option name="ignoredErrors"> + <list> + <option value="N806" /> + </list> + </option> + </inspection_tool> + </profile> +</component> \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000000000000000000000000000000000000..105ce2da2d6447d11dfe32bfb846c3d5b199fc99 --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ +<component name="InspectionProjectProfileManager"> + <settings> + <option name="USE_PROJECT_PROFILE" value="false" /> + <version value="1.0" /> + </settings> +</component> \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000000000000000000000000000000000000..a6218fed0aeb0cbb03b46a9064efeeda11861bf6 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="Black"> + <option name="sdkName" value="Python 3.11" /> + </component> + <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" /> +</project> \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000000000000000000000000000000000000..35eb1ddfbbc029bcab630581847471d7f238ec53 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="" vcs="Git" /> + </component> +</project> \ No newline at end of file diff --git a/clean_shadows.py b/clean_shadows.py new file mode 100644 index 0000000000000000000000000000000000000000..62b81c9e71c8d892cb8d67138f1a019885588c80 --- /dev/null +++ b/clean_shadows.py @@ -0,0 +1,26 @@ +import json +import boto3 + +# Create IoT client +iot = boto3.client("iot-data", region_name='us-east-2') # Uses AWS region by default + + +def refresh_shadow_max_CO2(vehicle_id): + try: + payload = { + "state": { + "reported": { + "max_CO2": 0 + } + } + } + iot.update_thing_shadow( + thingName=f"Vehicle_{vehicle_id}", + payload=json.dumps(payload) + ) + except Exception as e: + print(f"Error updating shadow for {vehicle_id}: {e}") + + +for i in range(5): + refresh_shadow_max_CO2(i) diff --git a/data_visualization.ipynb b/data_visualization.ipynb index 129a4fe959e87d215d70ab5f85e0fa2e40b20e93..04c6ae5e5087530011824122dd7134bc575f5cf2 100644 --- a/data_visualization.ipynb +++ b/data_visualization.ipynb @@ -5,8 +5,8 @@ "id": "cb73c637-68cf-4617-940b-eb8cf781fb0c", "metadata": { "ExecuteTime": { - "end_time": "2025-04-12T16:55:30.041336Z", - "start_time": "2025-04-12T16:55:25.351914Z" + "end_time": "2025-04-12T20:18:56.854916Z", + "start_time": "2025-04-12T20:18:55.844145Z" } }, "source": [ @@ -17,46 +17,31 @@ "name": "stdout", "output_type": "stream", "text": [ - "Collecting pyathena\r\n", - " Downloading pyathena-3.12.2-py3-none-any.whl.metadata (6.3 kB)\r\n", - "Collecting boto3>=1.26.4 (from pyathena)\r\n", - " Downloading boto3-1.37.33-py3-none-any.whl.metadata (6.7 kB)\r\n", - "Collecting botocore>=1.29.4 (from pyathena)\r\n", - " Downloading botocore-1.37.33-py3-none-any.whl.metadata (5.7 kB)\r\n", + "Requirement already satisfied: pyathena in /opt/homebrew/lib/python3.11/site-packages (3.12.2)\r\n", + "Requirement already satisfied: boto3>=1.26.4 in /opt/homebrew/lib/python3.11/site-packages (from pyathena) (1.37.33)\r\n", + "Requirement already satisfied: botocore>=1.29.4 in /opt/homebrew/lib/python3.11/site-packages (from pyathena) (1.37.33)\r\n", "Requirement already satisfied: fsspec in /opt/homebrew/lib/python3.11/site-packages (from pyathena) (2023.10.0)\r\n", "Requirement already satisfied: python-dateutil in /opt/homebrew/lib/python3.11/site-packages (from pyathena) (2.8.2)\r\n", - "Collecting tenacity>=4.1.0 (from pyathena)\r\n", - " Downloading tenacity-9.1.2-py3-none-any.whl.metadata (1.2 kB)\r\n", - "Collecting jmespath<2.0.0,>=0.7.1 (from boto3>=1.26.4->pyathena)\r\n", - " Using cached jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)\r\n", - "Collecting s3transfer<0.12.0,>=0.11.0 (from boto3>=1.26.4->pyathena)\r\n", - " Using cached s3transfer-0.11.4-py3-none-any.whl.metadata (1.7 kB)\r\n", + "Requirement already satisfied: tenacity>=4.1.0 in /opt/homebrew/lib/python3.11/site-packages (from pyathena) (9.1.2)\r\n", + "Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /opt/homebrew/lib/python3.11/site-packages (from boto3>=1.26.4->pyathena) (1.0.1)\r\n", + "Requirement already satisfied: s3transfer<0.12.0,>=0.11.0 in /opt/homebrew/lib/python3.11/site-packages (from boto3>=1.26.4->pyathena) (0.11.4)\r\n", "Requirement already satisfied: urllib3!=2.2.0,<3,>=1.25.4 in /opt/homebrew/lib/python3.11/site-packages (from botocore>=1.29.4->pyathena) (2.0.7)\r\n", "Requirement already satisfied: six>=1.5 in /opt/homebrew/lib/python3.11/site-packages (from python-dateutil->pyathena) (1.16.0)\r\n", - "Downloading pyathena-3.12.2-py3-none-any.whl (75 kB)\r\n", - "Downloading boto3-1.37.33-py3-none-any.whl (139 kB)\r\n", - "Downloading botocore-1.37.33-py3-none-any.whl (13.5 MB)\r\n", - "\u001B[2K \u001B[90mâ”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”â”\u001B[0m \u001B[32m13.5/13.5 MB\u001B[0m \u001B[31m8.2 MB/s\u001B[0m eta \u001B[36m0:00:00\u001B[0ma \u001B[36m0:00:01\u001B[0m\r\n", - "\u001B[?25hDownloading tenacity-9.1.2-py3-none-any.whl (28 kB)\r\n", - "Using cached jmespath-1.0.1-py3-none-any.whl (20 kB)\r\n", - "Using cached s3transfer-0.11.4-py3-none-any.whl (84 kB)\r\n", - "Installing collected packages: tenacity, jmespath, botocore, s3transfer, boto3, pyathena\r\n", - "Successfully installed boto3-1.37.33 botocore-1.37.33 jmespath-1.0.1 pyathena-3.12.2 s3transfer-0.11.4 tenacity-9.1.2\r\n", "\r\n", "\u001B[1m[\u001B[0m\u001B[34;49mnotice\u001B[0m\u001B[1;39;49m]\u001B[0m\u001B[39;49m A new release of pip is available: \u001B[0m\u001B[31;49m24.3.1\u001B[0m\u001B[39;49m -> \u001B[0m\u001B[32;49m25.0.1\u001B[0m\r\n", "\u001B[1m[\u001B[0m\u001B[34;49mnotice\u001B[0m\u001B[1;39;49m]\u001B[0m\u001B[39;49m To update, run: \u001B[0m\u001B[32;49mpython3.11 -m pip install --upgrade pip\u001B[0m\r\n" ] } ], - "execution_count": 1 + "execution_count": 5 }, { "cell_type": "code", "id": "9dac8beb-e37a-4a84-b1b7-79df228ce9f8", "metadata": { "ExecuteTime": { - "end_time": "2025-04-12T20:09:13.798458Z", - "start_time": "2025-04-12T20:09:13.572226Z" + "end_time": "2025-04-12T20:18:59.210911Z", + "start_time": "2025-04-12T20:18:58.898265Z" } }, "source": [ @@ -68,19 +53,19 @@ "conn = connect(s3_staging_dir='s3://step3.3/', region_name='us-east-2')" ], "outputs": [], - "execution_count": 3 + "execution_count": 6 }, { "metadata": { "ExecuteTime": { - "end_time": "2025-04-12T20:09:15.490473Z", - "start_time": "2025-04-12T20:09:14.555450Z" + "end_time": "2025-04-12T20:21:44.107387Z", + "start_time": "2025-04-12T20:21:43.691481Z" } }, "cell_type": "code", "source": [ "# Query Athena\n", - "query = 'SELECT device_id, data.vehicle_CO2 FROM vehicledatabase.lab4data_mylesai2' # change this!\n", + "query = 'SELECT device_id, data.vehicle_CO2 FROM cs437iot_lab4bucket' \n", "df = pd.read_sql(query, conn)\n", "print(\"Column names in the DataFrame:\", df.columns)" ], @@ -90,7 +75,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/var/folders/jm/rgjqkd2s5j5d309wnmpsvddr0000gn/T/ipykernel_75159/1256295774.py:3: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.\n", + "/var/folders/jm/rgjqkd2s5j5d309wnmpsvddr0000gn/T/ipykernel_75159/3995870251.py:3: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.\n", " df = pd.read_sql(query, conn)\n", "Failed to execute query.\n", "Traceback (most recent call last):\n", @@ -130,7 +115,7 @@ }, { "ename": "DatabaseError", - "evalue": "Execution failed on sql: SELECT device_id, data.vehicle_CO2 FROM vehicledatabase.lab4data_mylesai2\nAn error occurred (AccessDeniedException) when calling the StartQueryExecution operation: You are not authorized to perform: athena:StartQueryExecution on the resource. After your AWS administrator or you have updated your permissions, please try again.\nunable to rollback", + "evalue": "Execution failed on sql: SELECT device_id, data.vehicle_CO2 FROM cs437iot_lab4bucket\nAn error occurred (AccessDeniedException) when calling the StartQueryExecution operation: You are not authorized to perform: athena:StartQueryExecution on the resource. After your AWS administrator or you have updated your permissions, please try again.\nunable to rollback", "output_type": "error", "traceback": [ "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", @@ -160,15 +145,15 @@ "\u001B[0;31mNotSupportedError\u001B[0m: ", "\nThe above exception was the direct cause of the following exception:\n", "\u001B[0;31mDatabaseError\u001B[0m Traceback (most recent call last)", - "Cell \u001B[0;32mIn[4], line 3\u001B[0m\n\u001B[1;32m 1\u001B[0m \u001B[38;5;66;03m# Query Athena\u001B[39;00m\n\u001B[1;32m 2\u001B[0m query \u001B[38;5;241m=\u001B[39m \u001B[38;5;124m'\u001B[39m\u001B[38;5;124mSELECT device_id, data.vehicle_CO2 FROM vehicledatabase.lab4data_mylesai2\u001B[39m\u001B[38;5;124m'\u001B[39m\n\u001B[0;32m----> 3\u001B[0m df \u001B[38;5;241m=\u001B[39m \u001B[43mpd\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mread_sql\u001B[49m\u001B[43m(\u001B[49m\u001B[43mquery\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mconn\u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 4\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mColumn names in the DataFrame:\u001B[39m\u001B[38;5;124m\"\u001B[39m, df\u001B[38;5;241m.\u001B[39mcolumns)\n", + "Cell \u001B[0;32mIn[7], line 3\u001B[0m\n\u001B[1;32m 1\u001B[0m \u001B[38;5;66;03m# Query Athena\u001B[39;00m\n\u001B[1;32m 2\u001B[0m query \u001B[38;5;241m=\u001B[39m \u001B[38;5;124m'\u001B[39m\u001B[38;5;124mSELECT device_id, data.vehicle_CO2 FROM cs437iot_lab4bucket\u001B[39m\u001B[38;5;124m'\u001B[39m \n\u001B[0;32m----> 3\u001B[0m df \u001B[38;5;241m=\u001B[39m \u001B[43mpd\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mread_sql\u001B[49m\u001B[43m(\u001B[49m\u001B[43mquery\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mconn\u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 4\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mColumn names in the DataFrame:\u001B[39m\u001B[38;5;124m\"\u001B[39m, df\u001B[38;5;241m.\u001B[39mcolumns)\n", "File \u001B[0;32m/opt/homebrew/lib/python3.11/site-packages/pandas/io/sql.py:654\u001B[0m, in \u001B[0;36mread_sql\u001B[0;34m(sql, con, index_col, coerce_float, params, parse_dates, columns, chunksize, dtype_backend, dtype)\u001B[0m\n\u001B[1;32m 652\u001B[0m \u001B[38;5;28;01mwith\u001B[39;00m pandasSQL_builder(con) \u001B[38;5;28;01mas\u001B[39;00m pandas_sql:\n\u001B[1;32m 653\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(pandas_sql, SQLiteDatabase):\n\u001B[0;32m--> 654\u001B[0m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43mpandas_sql\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mread_query\u001B[49m\u001B[43m(\u001B[49m\n\u001B[1;32m 655\u001B[0m \u001B[43m \u001B[49m\u001B[43msql\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 656\u001B[0m \u001B[43m \u001B[49m\u001B[43mindex_col\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mindex_col\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 657\u001B[0m \u001B[43m \u001B[49m\u001B[43mparams\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mparams\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 658\u001B[0m \u001B[43m \u001B[49m\u001B[43mcoerce_float\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mcoerce_float\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 659\u001B[0m \u001B[43m \u001B[49m\u001B[43mparse_dates\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mparse_dates\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 660\u001B[0m \u001B[43m \u001B[49m\u001B[43mchunksize\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mchunksize\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 661\u001B[0m \u001B[43m \u001B[49m\u001B[43mdtype_backend\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mdtype_backend\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 662\u001B[0m \u001B[43m \u001B[49m\u001B[43mdtype\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mdtype\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 663\u001B[0m \u001B[43m \u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 665\u001B[0m \u001B[38;5;28;01mtry\u001B[39;00m:\n\u001B[1;32m 666\u001B[0m _is_table_name \u001B[38;5;241m=\u001B[39m pandas_sql\u001B[38;5;241m.\u001B[39mhas_table(sql)\n", "File \u001B[0;32m/opt/homebrew/lib/python3.11/site-packages/pandas/io/sql.py:2326\u001B[0m, in \u001B[0;36mSQLiteDatabase.read_query\u001B[0;34m(self, sql, index_col, coerce_float, parse_dates, params, chunksize, dtype, dtype_backend)\u001B[0m\n\u001B[1;32m 2315\u001B[0m \u001B[38;5;28;01mdef\u001B[39;00m \u001B[38;5;21mread_query\u001B[39m(\n\u001B[1;32m 2316\u001B[0m \u001B[38;5;28mself\u001B[39m,\n\u001B[1;32m 2317\u001B[0m sql,\n\u001B[0;32m (...)\u001B[0m\n\u001B[1;32m 2324\u001B[0m dtype_backend: DtypeBackend \u001B[38;5;241m|\u001B[39m Literal[\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mnumpy\u001B[39m\u001B[38;5;124m\"\u001B[39m] \u001B[38;5;241m=\u001B[39m \u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mnumpy\u001B[39m\u001B[38;5;124m\"\u001B[39m,\n\u001B[1;32m 2325\u001B[0m ) \u001B[38;5;241m-\u001B[39m\u001B[38;5;241m>\u001B[39m DataFrame \u001B[38;5;241m|\u001B[39m Iterator[DataFrame]:\n\u001B[0;32m-> 2326\u001B[0m cursor \u001B[38;5;241m=\u001B[39m \u001B[38;5;28;43mself\u001B[39;49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mexecute\u001B[49m\u001B[43m(\u001B[49m\u001B[43msql\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mparams\u001B[49m\u001B[43m)\u001B[49m\n\u001B[1;32m 2327\u001B[0m columns \u001B[38;5;241m=\u001B[39m [col_desc[\u001B[38;5;241m0\u001B[39m] \u001B[38;5;28;01mfor\u001B[39;00m col_desc \u001B[38;5;129;01min\u001B[39;00m cursor\u001B[38;5;241m.\u001B[39mdescription]\n\u001B[1;32m 2329\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m chunksize \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n", "File \u001B[0;32m/opt/homebrew/lib/python3.11/site-packages/pandas/io/sql.py:2271\u001B[0m, in \u001B[0;36mSQLiteDatabase.execute\u001B[0;34m(self, sql, params)\u001B[0m\n\u001B[1;32m 2267\u001B[0m \u001B[38;5;28;01mexcept\u001B[39;00m \u001B[38;5;167;01mException\u001B[39;00m \u001B[38;5;28;01mas\u001B[39;00m inner_exc: \u001B[38;5;66;03m# pragma: no cover\u001B[39;00m\n\u001B[1;32m 2268\u001B[0m ex \u001B[38;5;241m=\u001B[39m DatabaseError(\n\u001B[1;32m 2269\u001B[0m \u001B[38;5;124mf\u001B[39m\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mExecution failed on sql: \u001B[39m\u001B[38;5;132;01m{\u001B[39;00msql\u001B[38;5;132;01m}\u001B[39;00m\u001B[38;5;130;01m\\n\u001B[39;00m\u001B[38;5;132;01m{\u001B[39;00mexc\u001B[38;5;132;01m}\u001B[39;00m\u001B[38;5;130;01m\\n\u001B[39;00m\u001B[38;5;124munable to rollback\u001B[39m\u001B[38;5;124m\"\u001B[39m\n\u001B[1;32m 2270\u001B[0m )\n\u001B[0;32m-> 2271\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m ex \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01minner_exc\u001B[39;00m\n\u001B[1;32m 2273\u001B[0m ex \u001B[38;5;241m=\u001B[39m DatabaseError(\u001B[38;5;124mf\u001B[39m\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mExecution failed on sql \u001B[39m\u001B[38;5;124m'\u001B[39m\u001B[38;5;132;01m{\u001B[39;00msql\u001B[38;5;132;01m}\u001B[39;00m\u001B[38;5;124m'\u001B[39m\u001B[38;5;124m: \u001B[39m\u001B[38;5;132;01m{\u001B[39;00mexc\u001B[38;5;132;01m}\u001B[39;00m\u001B[38;5;124m\"\u001B[39m)\n\u001B[1;32m 2274\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m ex \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01mexc\u001B[39;00m\n", - "\u001B[0;31mDatabaseError\u001B[0m: Execution failed on sql: SELECT device_id, data.vehicle_CO2 FROM vehicledatabase.lab4data_mylesai2\nAn error occurred (AccessDeniedException) when calling the StartQueryExecution operation: You are not authorized to perform: athena:StartQueryExecution on the resource. After your AWS administrator or you have updated your permissions, please try again.\nunable to rollback" + "\u001B[0;31mDatabaseError\u001B[0m: Execution failed on sql: SELECT device_id, data.vehicle_CO2 FROM cs437iot_lab4bucket\nAn error occurred (AccessDeniedException) when calling the StartQueryExecution operation: You are not authorized to perform: athena:StartQueryExecution on the resource. After your AWS administrator or you have updated your permissions, please try again.\nunable to rollback" ] } ], - "execution_count": 4 + "execution_count": 7 }, { "metadata": {}, diff --git a/lab4_emulator_client.py b/lab4_emulator_client.py index 1f90d71bcc8fc4b93c5e25558bd0077888e6f873..c54daa2521570c7ec55da02f38f7965308316f55 100644 --- a/lab4_emulator_client.py +++ b/lab4_emulator_client.py @@ -8,7 +8,7 @@ import numpy as np #TODO 1: modify the following parameters #Starting and end index, modify this device_st = 0 -device_end = 1 +device_end = 5 #Path to your certificates, modify this certificate_formatter = "certs/Vehicle_{}_certificate.pem.crt" @@ -19,6 +19,9 @@ key_formatter = "certs/Vehicle_{}_private.pem.key" data_path = "data/vehicle{}.csv" +def maxCO2Callback(client, userdata, message): + print(f"Max CO2 level from {message.topic} is {message.payload.decode()}") + class MQTTClient: def __init__(self, device_id, cert, key): # For certificate based connection @@ -33,14 +36,15 @@ class MQTTClient: self.client.configureConnectDisconnectTimeout(10) # 10 sec self.client.configureMQTTOperationTimeout(5) # 5 sec self.client.onMessage = self.customOnMessage + self.client.connect() + self.client.subscribe(topic=f"iot/Vehicle_{device_id}", QoS=1, callback=maxCO2Callback) def customOnMessage(self, message): - #TODO 3: fill in the function to show your received message - print("client {} received payload {} from topic {}".format(self.device_id, "undefined", "undefined")) + # print("client {} received payload {} from topic {}".format(self.device_id, "undefined", "undefined")) + pass # Suback callback def customSubackCallback(self, mid, data): - #You don't need to write anything here pass # Puback callback @@ -52,16 +56,11 @@ class MQTTClient: def publish(self, topic="vehicle/emission/data"): # Load the vehicle's emission data df = pd.read_csv(data_path.format(self.device_id)) - for index, row in df.iterrows(): - # Create a JSON payload from the row data - payload = json.dumps(row.to_dict()) - - # Publish the payload to the specified topic - print(f"Publishing: {payload} to {topic}") - self.client.publishAsync(topic, payload, 0, ackCallback=self.customPubackCallback) - - # Sleep to simulate real-time data publishing - time.sleep(10) + row = df.iloc[self.state] + payload = json.dumps(row.to_dict()) + print(f"Publishing: {payload} to {topic}") + self.client.publishAsync(topic, payload, 0, ackCallback=self.customPubackCallback) + self.state += 1 print("Loading vehicle data...") data = [] @@ -90,6 +89,4 @@ while True: print("All devices disconnected") exit() else: - print("wrong key pressed") - - time.sleep(3) + print("wrong key pressed") \ No newline at end of file