1.4.2 Non-MicroPython Device Version

1.4.2 Non-MicroPython Device Version#

Open In Colab

This notebook serves as an alternative to the hardware/software communication notebook for devices that support full Python rather than MicroPython. If you’re using hardware that can run a complete operating system (e.g., Raspberry Pi 4/5, Linux, Windows, macOS, etc.), you can install the paho-mqtt Python library directly instead of using the mqtt_as MicroPython library.

This approach is particularly useful when you have more powerful hardware that isn’t restricted to MicroPython’s limitations. This notebook can be used alongside the companion notebook for a complete MQTT communication setup.

Note: These examples don’t consider multiple devices or simultaneous requests for simplicity. For more advanced scenarios with experiment tracking, see the self-driving-lab-demo which uses experiment_id for distinguishing between different operations.

# Install required packages
# Only install paho-mqtt if we are running in Colab
import sys
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    %pip install paho-mqtt

MQTT Device Simulation#

The code below simulates an MQTT device that can receive LED control commands and send temperature sensor data. Before running this code, make sure to:

  1. Replace the credentials (currently showing public test credentials) with your own HiveMQ Cloud credentials if desired

  2. Set your unique device ID in the COURSE_ID variable

  3. Ensure your device ID matches what you’re using in the companion notebook

CAUTION: The credentials shown below are public test credentials for educational purposes only. In production environments, never hardcode credentials directly in your code or expose them in plain text. Instead, use environment variables, secure configuration files, or cloud secret management services.

import paho.mqtt.client as mqtt
import ssl
import time
import random
import json

# Replace with your actual HiveMQ Cloud credentials and device ID
# (or import from your own my_secrets.py)
HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"  # Replace with your HiveMQ host
HIVEMQ_USERNAME = "sgbaird"  # Replace with your username
HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"  # Replace with your password
COURSE_ID = "<your_id_here>"  # Make sure this matches your companion notebook!

command_topic = f"{COURSE_ID}/onboard_led"
sensor_data_topic = f"{COURSE_ID}/onboard_temp"

# Simulate LED state (in real hardware, this would be the actual LED)
led_state = False


def read_temperature():
    """Simulate temperature reading (replace with actual sensor code)."""
    return round(25 + random.uniform(-2.0, 2.0), 1)

def on_connect(client, userdata, flags, rc, properties=None):
    """Callback for when the client receives a CONNACK response from the server."""
    if rc == 0:
        print("Connected to MQTT broker")
        # Subscribing in on_connect() means that if we lose the connection and
        # reconnect then subscriptions will be renewed.
        client.subscribe(command_topic, qos=1)
        print(f"Subscribed to command topic: {command_topic}")
    else:
        print(f"Failed to connect. Return code={rc}")

def on_message(client, userdata, msg):
    """Handle incoming MQTT messages."""
    # Global variable for LED state simulation
    # In real hardware, the state would be maintained by the device
    global led_state
    
    try:
        payload_str = msg.payload.decode().strip().lower()
        print(f"Received command on {msg.topic}: {payload_str}")

        if msg.topic == command_topic:
            # Process LED commands
            if payload_str == "on":
                led_state = True
            elif payload_str == "off":
                led_state = False
            elif payload_str == "toggle":
                led_state = not led_state
            else:
                print(f"Unknown command: {payload_str}")
            
            print(f"LED state: {led_state}")
            
            # Send temperature reading as response
            temperature = read_temperature()
            client.publish(sensor_data_topic, str(temperature), qos=1)
            print(f"Published temperature: {temperature}°C")
            
    except Exception as exc:
        print(f"Exception in on_message: {exc}")

print("Setting up MQTT Client")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)

# Enable TLS for secure connection
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)

# Set username and password
client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD)

# Attach callbacks
client.on_connect = on_connect
client.on_message = on_message

# Connect to HiveMQ Cloud on port 8883
port = 8883
print(f"Connecting to {HIVEMQ_HOST}:{port}...")
client.connect(HIVEMQ_HOST, port)

# Start the MQTT network loop in a separate thread
client.loop_start()
print("Waiting for commands...")
print(f"Send commands to topic: {command_topic}")
print(f"Temperature data will be published to: {sensor_data_topic}")

start_time = time.time()
try:
    # Keep the script running to handle incoming messages
    while True:
        elapsed = round(time.time() - start_time)
        if elapsed % 10 == 0:  # Print status every 10 seconds
            print(f"Running... Elapsed: {elapsed}s")
        time.sleep(1)
except KeyboardInterrupt:
    print("\nInterrupted by user.")
finally:
    # Gracefully stop
    print("Disconnecting...")
    client.loop_stop()
    client.disconnect()
    print("Disconnected.")
Setting up MQTT Client
Connecting to 248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud:8883...
Waiting for commands...
Send commands to topic: <your_id_here>/onboard_led
Temperature data will be published to: <your_id_here>/onboard_temp
Running... Elapsed: 0s
Connected to MQTT broker
Subscribed to command topic: <your_id_here>/onboard_led
Running... Elapsed: 10s
Running... Elapsed: 20s
Running... Elapsed: 30s
Running... Elapsed: 40s
Running... Elapsed: 50s
Interrupted by user.
Disconnecting...
Disconnected.

Usage Instructions#

  1. Configure credentials: Replace the placeholder values for HIVEMQ_HOST, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, and COURSE_ID with your actual values

  2. Run the device simulation: Execute the code cell above to start the MQTT device simulation

  3. Send commands: Use the companion notebook to send LED commands ("on", "off", or "toggle") to the command topic

  4. Monitor responses: The device will respond with temperature readings on the sensor data topic

Valid LED commands:

  • "on" - Turn LED on

  • "off" - Turn LED off

  • "toggle" - Switch LED state

The device simulation will continue running until you interrupt it (Ctrl+C in terminal or stop the cell in Jupyter/Colab).