1.4.2 Non-MicroPython Device Version#
This notebook serves as an alternative to the hardware/software communication notebook for devices that support full Python rather than MicroPython. If you’re using hardware that can run a complete operating system (e.g., Raspberry Pi 4/5, Linux, Windows, macOS, etc.), you can install the paho-mqtt Python library directly instead of using the mqtt_as MicroPython library.
This approach is particularly useful when you have more powerful hardware that isn’t restricted to MicroPython’s limitations. This notebook can be used alongside the companion notebook for a complete MQTT communication setup.
Note: These examples don’t consider multiple devices or simultaneous requests for simplicity. For more advanced scenarios with experiment tracking, see the self-driving-lab-demo which uses experiment_id for distinguishing between different operations.
# Install required packages
# Only install paho-mqtt if we are running in Colab
import sys
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
%pip install paho-mqtt
MQTT Device Simulation#
The code below simulates an MQTT device that can receive LED control commands and send temperature sensor data. Before running this code, make sure to:
Replace the credentials (currently showing public test credentials) with your own HiveMQ Cloud credentials if desired
Set your unique device ID in the
COURSE_IDvariableEnsure your device ID matches what you’re using in the companion notebook
CAUTION: The credentials shown below are public test credentials for educational purposes only. In production environments, never hardcode credentials directly in your code or expose them in plain text. Instead, use environment variables, secure configuration files, or cloud secret management services.
import paho.mqtt.client as mqtt
import ssl
import time
import random
import json
# Replace with your actual HiveMQ Cloud credentials and device ID
# (or import from your own my_secrets.py)
HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud" # Replace with your HiveMQ host
HIVEMQ_USERNAME = "sgbaird" # Replace with your username
HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L" # Replace with your password
COURSE_ID = "<your_id_here>" # Make sure this matches your companion notebook!
command_topic = f"{COURSE_ID}/onboard_led"
sensor_data_topic = f"{COURSE_ID}/onboard_temp"
# Simulate LED state (in real hardware, this would be the actual LED)
led_state = False
def read_temperature():
"""Simulate temperature reading (replace with actual sensor code)."""
return round(25 + random.uniform(-2.0, 2.0), 1)
def on_connect(client, userdata, flags, rc, properties=None):
"""Callback for when the client receives a CONNACK response from the server."""
if rc == 0:
print("Connected to MQTT broker")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(command_topic, qos=1)
print(f"Subscribed to command topic: {command_topic}")
else:
print(f"Failed to connect. Return code={rc}")
def on_message(client, userdata, msg):
"""Handle incoming MQTT messages."""
# Global variable for LED state simulation
# In real hardware, the state would be maintained by the device
global led_state
try:
payload_str = msg.payload.decode().strip().lower()
print(f"Received command on {msg.topic}: {payload_str}")
if msg.topic == command_topic:
# Process LED commands
if payload_str == "on":
led_state = True
elif payload_str == "off":
led_state = False
elif payload_str == "toggle":
led_state = not led_state
else:
print(f"Unknown command: {payload_str}")
print(f"LED state: {led_state}")
# Send temperature reading as response
temperature = read_temperature()
client.publish(sensor_data_topic, str(temperature), qos=1)
print(f"Published temperature: {temperature}°C")
except Exception as exc:
print(f"Exception in on_message: {exc}")
print("Setting up MQTT Client")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
# Enable TLS for secure connection
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
# Set username and password
client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD)
# Attach callbacks
client.on_connect = on_connect
client.on_message = on_message
# Connect to HiveMQ Cloud on port 8883
port = 8883
print(f"Connecting to {HIVEMQ_HOST}:{port}...")
client.connect(HIVEMQ_HOST, port)
# Start the MQTT network loop in a separate thread
client.loop_start()
print("Waiting for commands...")
print(f"Send commands to topic: {command_topic}")
print(f"Temperature data will be published to: {sensor_data_topic}")
start_time = time.time()
try:
# Keep the script running to handle incoming messages
while True:
elapsed = round(time.time() - start_time)
if elapsed % 10 == 0: # Print status every 10 seconds
print(f"Running... Elapsed: {elapsed}s")
time.sleep(1)
except KeyboardInterrupt:
print("\nInterrupted by user.")
finally:
# Gracefully stop
print("Disconnecting...")
client.loop_stop()
client.disconnect()
print("Disconnected.")
Setting up MQTT Client
Connecting to 248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud:8883...
Waiting for commands...
Send commands to topic: <your_id_here>/onboard_led
Temperature data will be published to: <your_id_here>/onboard_temp
Running... Elapsed: 0s
Connected to MQTT broker
Subscribed to command topic: <your_id_here>/onboard_led
Running... Elapsed: 10s
Running... Elapsed: 20s
Running... Elapsed: 30s
Running... Elapsed: 40s
Running... Elapsed: 50s
Interrupted by user.
Disconnecting...
Disconnected.
Usage Instructions#
Configure credentials: Replace the placeholder values for
HIVEMQ_HOST,HIVEMQ_USERNAME,HIVEMQ_PASSWORD, andCOURSE_IDwith your actual valuesRun the device simulation: Execute the code cell above to start the MQTT device simulation
Send commands: Use the companion notebook to send LED commands (
"on","off", or"toggle") to the command topicMonitor responses: The device will respond with temperature readings on the sensor data topic
Valid LED commands:
"on"- Turn LED on"off"- Turn LED off"toggle"- Switch LED state
The device simulation will continue running until you interrupt it (Ctrl+C in terminal or stop the cell in Jupyter/Colab).