
(3.4-orientation)=
# 🧩 3.4 Mobile Robotics

```{contents}
:depth: 3
```

## 🔰 Tutorial

In this module, you will develop software to:
1. Demonstrate control of a mobile cobot using frameworks such as the Robot Operating System (**ROS**) and **Isaac Sim**
2. Use a workflow orchestration package to manage asynchronous tasks
3. Define asynchrony in the context of hardware control for autonomous laboratories

### Bill of Materials

- [MyCobot 280 Pi: - World's Smallest and Lightest Six-Axis Collaborative Robot](https://shop.elephantrobotics.com/en-ca/collections/mycobot-280/products/mycobot-pi-worlds-smallest-and-lightest-six-axis-collaborative-robot)
  A versatile and compact six-axis robot, ideal for mobile robotics applications.

- [Camera Flange 2.0](https://shop.elephantrobotics.com/en-ca/collections/camera-modules/products/camera-flange-2-0)
  Used for vision-based tasks in mobile robotics, such as object recognition and navigation.

- [Adaptive Gripper](https://shop.elephantrobotics.com/en-ca/collections/grippers/products/adaptive-gripper)
  A flexible gripper designed for precise manipulation and picking tasks in collaborative robotic systems.

- [G-Shape Base 2.0](https://shop.elephantrobotics.com/en-ca/collections/fixed-bases/products/g-shape-base-2-0)
  Provides a sturdy mounting platform for the MyCobot, ensuring stability during robotic operations.

- [LIDAR sensor for obstacle detection]
  Used for real-time obstacle detection and mapping in mobile robotics.

- Printed AprilTags (can be generated and printed from [AprilTag Generation](https://github.com/AprilRobotics/apriltag-generation))

- USB-A to micro USB-B cable:
  Used to connect and power devices such as the Raspberry Pi or peripherals.

- SD Card with Raspbian OS:
  Pre-loaded with the Raspbian OS for use with the Raspberry Pi to facilitate software installations and configurations.

### Required Software

- [Prefect](https://www.prefect.io/)
  A workflow orchestration tool used to manage and coordinate asynchronous tasks in the system.

- [AprilTags Python Library](https://pypi.org/project/apriltag/)
  A computer vision library for identifying and tracking AprilTags, used for spatial referencing and navigation.

- [ROS2 Humble](https://docs.ros.org/en/humble/Installation.html)
  ROS2 (Robot Operating System) is an open-source framework for building robot applications. **Humble Hawksbill** is the currently recommended version due to its stability and long-term support.

- [Ubuntu 22.04 LTS](https://releases.ubuntu.com/22.04/)
  Ubuntu 22.04 LTS is the recommended Linux distribution for running ROS2, as it provides a stable and compatible environment. **Ubuntu 24.04** (the latest version) may encounter compatibility issues with ROS2 and other tools.


### Documentation

- [MyCobot Pi Documentation](https://docs.elephantrobotics.com/docs/gitbook-en/2-serialproduct/2.1-280/2.1.2-PI.html)
  Detailed guide on setting up and operating the MyCobot Pi.

- [Gripper Control via Python](https://docs.elephantrobotics.com/docs/gitbook-en/7-ApplicationBasePython/7.5_gripper.html)
  Guide for controlling the adaptive gripper using Python commands.


#### Notes
These materials provide a comprehensive setup for controlling a mobile cobot, including vision systems, robotic arms, grippers, and obstacle detection sensors. The setup integrates ROS and workflow orchestration using Prefect, enabling asynchronous task execution and complex robot control in various environments, such as autonomous labs or educational settings.

### ROS (Robot Operating System)

First, you will learn how to use ROS to control a mobile cobot. ROS is an open-source framework widely used for building robot applications. ROS provides tools and libraries to help design, simulate, and control robots, making it a crucial component in modern robotics.

#### ROS2 Overview

ROS2 (Robot Operating System 2) is an extension of the original ROS framework, designed to be modular and scalable for building complex robotic systems. It is composed of numerous **software packages**, which offer a wide range of functionalities like hardware abstraction, device control, message-passing, and software development tools.

ROS2 heavily relies on the **Ubuntu Linux** operating system, and it is recommended to use **Ubuntu 22.04 LTS** for stability and compatibility. One of ROS2’s key advantages is its distributed architecture, which allows different components (nodes) to communicate with each other over a network. This distributed nature is essential for enabling flexibility in robot systems development.

#### Key Tools in ROS2

- **RViz**: A visualization tool that allows developers to visualize the robot’s state, environment, and sensor data in real time. RViz is crucial for debugging and developing applications without actual hardware.
- **Gazebo**: A simulation environment that allows for the testing of robots in a virtual environment, providing realistic physics-based simulation for sensors, motors, and environments.

#### Note
When using RViz/Gazebo to visualize complex robot models or environments, rendering can be resource-intensive. If your system does not have a powerful GPU, you may experience slow performance or even crashes. It is recommended to consider using cloud solutions like AWS with NVIDIA NGC to offload rendering tasks and ensure smooth performance for simulations and visualizations.


#### ROS2 Installation Notes

- **Standard Installation**: The recommended way to install ROS2 is by using the Ubuntu package manager (`apt`). Detailed installation steps can be found in the [ROS2 Humble Installation Guide](https://docs.ros.org/en/humble/Installation.html).

- **Using Docker**: ROS2 can also be installed using Docker. This approach is useful for isolating the ROS environment or using different ROS versions on the same system. However, **Docker installations** require additional configuration to enable **network communication** (for example, setting up proper interfaces for node communication). Also, be cautious with **real-time communication** interfaces as they might not work as smoothly as native installations.

- **Dependencies**: ROS2 depends on multiple packages for its full functionality. Ensure that you have installed the required system dependencies (such as `colcon` for building packages and `rosdep` for managing dependencies).

- **Communication Protocol**: ROS2 uses DDS (Data Distribution Service) as its default communication protocol. While this provides scalability and flexibility, ensure that your network is properly configured to allow node communication, especially in multi-machine setups.

- **Environment Setup**: After installation, source your ROS2 setup files (`source /opt/ros/humble/setup.bash`) to ensure that the necessary environment variables are configured properly.

By understanding and configuring these components, you’ll be able to fully leverage ROS2 for your robotic application development, from simulation to hardware deployment.

### Demo

✅ Read the [ROS Noetic Documentation](http://wiki.ros.org/noetic/)

✅ Watch [Getting Started with ROS](https://www.youtube.com/watch?v=ehtUb55Rmmg&list=PLk51HrKSBQ8-jTgD0qgRp1vmQeVSJ5SQC)

✅ Learn about controlling robots with ROS [ROS Navigation Stack](http://wiki.ros.org/navigation)

#### Key ROS2 Terminologies

When working with ROS2, it’s important to familiarize yourself with the following terms and concepts:

- **Node**: A basic computation unit in ROS2. A node is a process that performs computation and communicates with other nodes. Each node typically handles a specific task such as sensor data processing or motor control.

- **Topic**: A channel through which nodes communicate by sending or receiving messages. Data is published to topics, and other nodes can subscribe to those topics to receive the data.

- **Publisher**: A node that publishes messages to a specific topic. For example, a sensor node might publish sensor data like laser scans to a topic.

- **Subscriber**: A node that listens to (or subscribes to) a specific topic. It receives messages published by other nodes and processes them. For example, a motor controller might subscribe to a topic where sensor data is published to adjust the robot’s movement.

- **Message**: A data structure used by nodes to communicate. ROS2 provides predefined message types (such as `std_msgs/String`) but also allows custom message types to be created.

- **Service**: A synchronous communication mechanism in ROS2 where a node can send a request to another node and receive a response. This is used for tasks that require a response, such as starting or stopping a robot's operation.

- **Action**: A mechanism for long-running tasks that require feedback and the possibility of cancellation. It’s used in cases like robot navigation or manipulation, where tasks are ongoing and may need to be canceled before completion.

- **Launch File**: A file that defines how multiple ROS2 nodes are launched together. It allows for complex robot systems with multiple nodes to be started in a coordinated way.

- **Parameter**: A configuration value for nodes that can be set at runtime. Parameters allow you to adjust node behavior without changing the code, such as setting the speed of a robot or the frequency of sensor data.

- **TF (Transform)**: A ROS2 tool for keeping track of coordinate frames over time. It allows nodes to understand the spatial relationships between different parts of the robot and the environment.

### MyCobot Camera AprilTag Microscope Integration

This Python script integrates a MyCobot robotic arm, a camera for AprilTag detection, and a simulated microscope for advanced object analysis. It demonstrates an automated process of object detection, manipulation, and microscopic examination.

#### Features

1. AprilTag Detection: Uses a camera to detect AprilTags in the environment.
2. Robotic Arm Control: Controls a MyCobot robotic arm to move to detected objects.
3. Object Grasping: Enables the robotic arm to grasp detected objects.
4. Microscopic Scanning: Utilizes a simulated microscope to perform detailed scans of the grasped object.

#### Requirements:
- Python 3.x
- OpenCV (cv2)
- numpy
- pymycobot
- pupil_apriltags
- microscope_demo_client (custom module for microscope simulation)

#### Setup

1. Ensure all required libraries are installed.
2. Configure the my_secrets.py file with appropriate MQTT and microscope connection details.
3. Adjust the MyCobot serial port in the script if necessary.

#### Usage

1. Run the following python script.
2. The system will continuously scan for AprilTags using the camera.
3. When an AprilTag (ID 1) is detected:
 - The robotic arm moves to the tag's location.
 - The arm attempts to grasp the object.
 - A microscopic scan is initiated.
4. The microscope performs a scan and displays the captured images.
5. The process ends after completing the microscope scan.

```

import cv2
import numpy as np
from pymycobot.mycobot import MyCobot
from pupil_apriltags import Detector
import time
from microscope_demo_client import MicroscopeDemo
from my_secrets import (
    HIVEMQ_HOST,
    HIVEMQ_PASSWORD,
    HIVEMQ_PORT,
    HIVEMQ_USERNAME,
    MICROSCOPE_NAME,
)

# Initialize MyCobot object
mc = MyCobot("/dev/ttyUSB0", 115200)  # Adjust port as needed

# Initialize camera
cap = cv2.VideoCapture(0)  # Using default camera, may need to adjust index

# Initialize AprilTag detector
at_detector = Detector(
    families="tag36h11",
    nthreads=1,
    quad_decimate=1.0,
    quad_sigma=0.0,
    refine_edges=1,
    decode_sharpening=0.25,
    debug=0
)

# Initialize simulated microscope
microscope = MicroscopeDemo(
    HIVEMQ_HOST, HIVEMQ_PORT, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, MICROSCOPE_NAME
)

def detect_apriltag(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    tags = at_detector.detect(gray)

    for tag in tags:
        if tag.tag_id == 1:  # Assuming we're looking for tag with ID 1
            return tag.center
    return None

def move_arm_to_target(x, y):
    # Convert camera coordinates to arm coordinates
    # This is a simplified example and needs to be adjusted based on your setup
    arm_x = x * 0.5  # Assumed scale factor
    arm_y = y * 0.5
    mc.send_coords([arm_x, arm_y, 200, 0, 0, 0], 50)
    time.sleep(2)

def grab_object():
    mc.set_gripper_state(0, 50)  # Close gripper
    time.sleep(2)

def microscope_scan():
    microscope.move(0, 0)
    microscope.focus()
    image = microscope.take_image()
    image.show()

    print("Scanning area...")
    scan_images = microscope.scan([2000, 2000], [-2000, -2000])
    for i, img in enumerate(scan_images):
        img.show()
        print(f"Showing scan image {i+1}/{len(scan_images)}")

    microscope.move(0, 0)

def main():
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Unable to capture image")
                break

            target = detect_apriltag(frame)
            if target:
                x, y = target
                print(f"AprilTag detected at position: ({x}, {y})")
                move_arm_to_target(x, y)
                grab_object()
                print("Object grasped")

                # Perform microscope scan
                microscope_scan()
                break

            # Display image (optional)
            cv2.imshow('Frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    finally:
        cap.release()
        cv2.destroyAllWindows()
        microscope.end_connection()

if __name__ == "__main__":
    main()

```

#### Key Functions
- detect_apriltag(frame): Detects AprilTags in a given frame.
- move_arm_to_target(x, y): Moves the robotic arm to specified coordinates.
- grab_object(): Commands the robotic arm to grasp an object.
- microscope_scan(): Performs a microscopic scan of the area.


---

### Isaac Sim

Next, we will enhance the integration of **Isaac Sim** with **ROS** to simulate and control more complex robotics applications. Isaac Sim allows you to create realistic simulations with physics-based interactions, which are essential for testing robotic behaviors before deploying them on real hardware.

#### Updated Demo

✅ Watch [How to Install NVIDIA Isaac Sim](https://www.youtube.com/watch?v=SyrsAd8WbCo&ab_channel=Loupe)

✅ Watch [Getting Started with Isaac Sim](https://www.youtube.com/watch?v=3pWwkuc2Ecw&pp=ygUeR2V0dGluZyBTdGFydGVkIHdpdGggSXNhYWMgU2lt)

✅ Read [Isaac Sim and ROS Integration](https://docs.nvidia.com/isaac/isaac_ros/ros.html)

Note: Isaac Sim runs with high system resource requirements, especially for GPU (Graphics Processing Unit) support requirements. As a result, running Isaac Sim in a virtual machine may encounter some limitations, especially if your Mac does not have a dedicated NVIDIA GPU, or if the virtual machine has limited GPU support.
Possible solutions

A. Install Isaac Sim on a Linux physical machine
If you have a physical Linux machine with an NVIDIA GPU, this is the best option for running Isaac Sim. On this machine, you can install Ubuntu 20.04, ROS, and Isaac Sim and run an efficient simulation. If you do not have such a machine, consider a cloud solution.

B. Using cloud services (e.g. NVIDIA's Omniverse Cloud or AWS EC2)
If you can't provide enough GPU resources locally, using a cloud service is a good alternative. For example:

NVIDIA Omniverse Cloud: this is NVIDIA's cloud-based platform dedicated to its simulation and graphics tools, including Isaac Sim. You can create, test and run Isaac Sim in the cloud without relying on local hardware.

AWS EC2 G4/G5 instances: Amazon offers EC2 instances (e.g., G4, G5 instances) that support NVIDIA GPUs. These instances provide powerful GPU acceleration and allow you to run Isaac Sim in the cloud. in this case, you can connect remotely and do your simulation and development work in the cloud.

C. Try Docker with GPU acceleration enabled
If you are developing on a Linux physical machine (with an NVIDIA GPU), consider using Docker to run Isaac Sim with GPU acceleration enabled.NVIDIA provides GPU acceleration support for Docker, which allows containers to directly access the host's GPU resources.


We will simulate a mycobot moving in an environment with obstacles and using Isaac Sim's advanced features like object detection, visual navigation, and path planning. Here’s a more detailed example that involves these capabilities:

```bash
#!/usr/bin/env python3

import rospy
import tf
from geometry_msgs.msg import Twist, PoseStamped
from sensor_msgs.msg import Image, PointCloud2
from nav_msgs.msg import OccupancyGrid, Path
from std_msgs.msg import Float32MultiArray
from cv_bridge import CvBridge
import cv2
import numpy as np
from pymycobot.mycobot import MyCobot
from object_detection.msg import DetectedObjectsArray
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
import actionlib

class MyCobotNavigator:
    def __init__(self):
        rospy.init_node('mycobot_navigator', anonymous=True)

        # Initialize MyCobot
        self.mycobot = MyCobot("/dev/ttyUSB0", 115200)

        # ROS Publishers
        self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        self.goal_pub = rospy.Publisher('/move_base_simple/goal', PoseStamped, queue_size=10)

        # ROS Subscribers
        rospy.Subscriber('/camera/rgb/image_raw', Image, self.image_callback)
        rospy.Subscriber('/camera/depth/points', PointCloud2, self.pointcloud_callback)
        rospy.Subscriber('/map', OccupancyGrid, self.map_callback)
        rospy.Subscriber('/detected_objects', DetectedObjectsArray, self.object_detection_callback)

        # Initialize CV Bridge
        self.bridge = CvBridge()

        # Initialize MoveBase Action Client
        self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction)
        self.move_base_client.wait_for_server()

        # Initialize TF listener
        self.tf_listener = tf.TransformListener()

        # State variables
        self.current_image = None
        self.current_pointcloud = None
        self.current_map = None
        self.detected_objects = []
        self.navigation_goal = None

    def image_callback(self, msg):
        try:
            self.current_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
            self.process_image()
        except Exception as e:
            rospy.logerr(f"Error processing image: {e}")

    def pointcloud_callback(self, msg):
        self.current_pointcloud = msg
        self.process_pointcloud()

    def map_callback(self, msg):
        self.current_map = msg
        self.update_navigation_plan()

    def object_detection_callback(self, msg):
        self.detected_objects = msg.objects
        self.process_detected_objects()

    def process_image(self):
        if self.current_image is not None:
            # Perform visual navigation tasks
            # For example, lane detection or visual markers recognition
            gray = cv2.cvtColor(self.current_image, cv2.COLOR_BGR2GRAY)
            edges = cv2.Canny(gray, 50, 150)
            lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=100, maxLineGap=10)
            if lines is not None:
                for line in lines:
                    x1, y1, x2, y2 = line[0]
                    cv2.line(self.current_image, (x1, y1), (x2, y2), (0, 255, 0), 2)

            cv2.imshow("Navigation View", self.current_image)
            cv2.waitKey(1)

    def process_pointcloud(self):
        if self.current_pointcloud is not None:
            # Process pointcloud for obstacle detection
            # This is a placeholder for actual pointcloud processing
            rospy.loginfo("Processing pointcloud for obstacle detection")

    def process_detected_objects(self):
        for obj in self.detected_objects:
            rospy.loginfo(f"Detected object: {obj.label} at position: {obj.pose.position}")
            # Implement logic to interact with detected objects
            if obj.label == "target_object":
                self.plan_path_to_object(obj.pose)

    def plan_path_to_object(self, object_pose):
        goal = MoveBaseGoal()
        goal.target_pose.header.frame_id = "map"
        goal.target_pose.header.stamp = rospy.Time.now()
        goal.target_pose.pose = object_pose

        self.move_base_client.send_goal(goal)
        rospy.loginfo("Sent goal to move_base")

    def update_navigation_plan(self):
        if self.navigation_goal is not None:
            self.plan_path_to_goal(self.navigation_goal)

    def plan_path_to_goal(self, goal_pose):
        start_pose = PoseStamped()
        start_pose.header.frame_id = "base_link"
        start_pose.header.stamp = rospy.Time.now()

        try:
            transformed_pose = self.tf_listener.transformPose("map", start_pose)
            # Here you would typically call a path planning service
            # For this example, we'll just log the intent
            rospy.loginfo(f"Planning path from {transformed_pose.pose.position} to {goal_pose.pose.position}")
        except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e:
            rospy.logerr(f"TF Error: {e}")

    def move_arm_to_target(self, x, y, z):
        # Move MyCobot arm to target position
        self.mycobot.send_coords([x, y, z, 0, 0, 0], 50, 1)  # Adjust speed and mode as needed
        rospy.loginfo(f"Moving arm to coordinates: ({x}, {y}, {z})")

    def run(self):
        rate = rospy.Rate(10)  # 10 Hz
        while not rospy.is_shutdown():
            if self.navigation_goal is not None:
                # Check if we've reached the goal
                if self.move_base_client.get_state() == actionlib.GoalStatus.SUCCEEDED:
                    rospy.loginfo("Reached the goal")
                    self.navigation_goal = None
                    # Perform action with MyCobot arm
                    self.move_arm_to_target(0.2, 0, 0.1)  # Example coordinates
            rate.sleep()

if __name__ == '__main__':
    try:
        navigator = MyCobotNavigator()
        navigator.run()
    except rospy.ROSInterruptException:
        pass
```

**Additional Features in Isaac Sim**:
- **Object Detection**: Integrate Isaac Sim’s built-in computer vision models for detecting objects (e.g., cups, boxes) within the simulated environment.
- **Path Planning**: Use ROS's move_base package for autonomous navigation, allowing the robot to calculate paths around obstacles in real-time.
- **Visual SLAM**: Use visual markers and the robot's onboard camera for Simultaneous Localization and Mapping (SLAM).

---

### Asynchronous Task Execution

In this section, we will further develop the use of **Prefect** for asynchronous task execution in robotics. We will demonstrate how to handle multiple asynchronous tasks, such as moving a robot, capturing sensor data, and running real-time analysis concurrently.

#### Updated Demo

✅ Learn [Asynchronous Task Execution with Prefect](https://docs.prefect.io/core/concepts/tasks.html)

In this enhanced example, we will manage multiple asynchronous tasks, such as controlling the robot's movement while simultaneously monitoring sensors and running a real-time analysis of LIDAR data.

```python
from prefect import task, Flow
import time
import random


@task
def control_robot():
    for _ in range(10):
        print("Moving the robot forward...")
        time.sleep(1)


@task
def monitor_sensors():
    for _ in range(10):
        sensor_value = random.uniform(0.0, 2.0)  # Simulating sensor data
        print(f"LIDAR sensor reading: {sensor_value}")
        time.sleep(1)


@task
def analyze_data():
    for _ in range(10):
        print("Analyzing real-time data...")
        time.sleep(1)


with Flow("Asynchronous Robotics Control") as flow:
    control_robot()
    monitor_sensors()
    analyze_data()

flow.run()
```

This script provides a more complex example of how to integrate MyCobot with Isaac Sim and ROS for object detection, visual navigation and path planning.
1. Initialization: sets up the ROS node, initializes MyCobot, and creates the necessary ROS publishers and subscribers.
2. image processing: receive and process images from analog cameras to perform simple visual navigation tasks (e.g. lane detection).
3. point cloud processing: receives point cloud data for obstacle detection (in this example a placeholder).
4. object detection: processes detected objects and plans paths as needed.
5. navigation: sends a navigation target to the client using the move_base operation and tracks its status.
6. path planning: plan paths based on current position and target position.
7. Robotic arm control: provides a function to move the MyCobot robotic arm to a specified position.
8. Master Loop: Continuously checks the navigation status and executes the robotic arm action when the goal is reached.

To use this script you need to:
1. Ensure that the Isaac Sim integration with ROS is set up correctly.
2. start the necessary ROS nodes, such as move_base, object detection nodes, etc.
3. Adjust the topic names and parameters in the script to match your specific Isaac Sim setup.


---

### Define Asynchrony in the Context of Hardware Control

Asynchrony in robotics refers to executing tasks independently and concurrently. In the context of hardware control, this is critical for managing complex processes in an autonomous laboratory, where sensors must continually gather data, robots must execute movement commands, and the system needs to react in real-time.

For instance, while a robot is moving, it must be able to continuously monitor its surroundings (e.g., using LIDAR or cameras) and adjust its trajectory without pausing or waiting for other tasks to complete.

---
### Task combination and process

### 1. ROS controlled robot (robotic arm connected to Raspberry Pi)

This example shows how to use ROS to control a robotic arm connected to a Raspberry Pi and detect objects around it (e.g. detecting a small medicine bottle).

ROS code example:

- Installation dependencies:
- To run ROS and the robot node on the Raspberry Pi, you need to install rosserial to communicate with the Raspberry Pi.
- Detecting small medicine bottles through the camera requires OpenCV to be installed.

```bash
sudo apt-get install ros-noetic-rosserial
sudo apt-get install ros-noetic-opencv
```

- Robotic Arm Movement Control (ROS Publisher):

```Python
import rospy
from geometry_msgs.msg import Twist

def move_arm():
rospy.init_node('move_robot_arm', anonymous=True)
pub = rospy.Publisher('/robot_arm_controller/command', Twist, queue_size=10)
rate = rospy.Rate(10) # 10 Hz

twist = Twist()
twist.linear.x = 0.5 # Control the robot arm to move forward
twist.angular.z = 0.5 # Control the rotation of the robot arm

while not rospy.is_shutdown():
pub.publish(twist)
rate.sleep()

if __name__ == '__main__':
try:
move_arm()
except rospy.ROSInterruptException:
pass
```

- Medicine bottle detection (OpenCV + ROS Subscriber):

```Python
import rospy
import cv2
from sensor_msgs.msg import Image
from cv_bridge import CvBridge

bridge = CvBridge()

def image_callback(msg):
# Convert ROS image messages to OpenCV images
img = bridge.imgmsg_to_cv2(msg, "bgr8")

# Use OpenCV to detect pill bottles (e.g. by color or shape)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)

# Use OpenCV contour detection for medicine bottles
contours, _ = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if w > 30 and h > 30: # Assume the size of the bottle is within this range
cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)
print("Detected a bottle")

cv2.imshow("Image", img)
cv2.waitKey(1)

rospy.init_node('bottle_detection_node')
sub = rospy.Subscriber('/camera/rgb/image_raw', Image, image_callback)
rospy.spin()
```

Process Description:
- Control the movement of the robot arm through the `Twist` message.
- Use the camera's image stream and detect the bottle position using OpenCV.
- When a medicine bottle is detected, the robot arm can be further operated (such as grasping) as needed.

---

### 2. Combination of Isaac Sim and ROS: Defining work scenarios and sending instructions

Isaac Sim scenario definition and integration with ROS:
In Isaac Sim, you first define a scene (such as a laboratory environment and a mobile robot), and send control commands through ROS.

Isaac Sim setup scene (Python API):
```Python
from omni.isaac.kit import SimulationApp
import omni.isaac.core.utils.prims as prims

# Start the simulation environment
simulation_app = SimulationApp({"headless": False})

# Create robots and scenes
prims.create_prim("/World/Robot", "Robot")
prims.create_prim("/World/Table", "Cube", position=(0, 0, 0))

# Start the simulation
simulation_app.update()

# ROS controls the robot's movement
import rospy
from geometry_msgs.msg import Twist

def move_robot():
rospy.init_node('robot_control_node')
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rate = rospy.Rate(10)

twist = Twist()
twist.linear.x = 1.0 # Move forward
twist.angular.z = 0.5 # Rotation

while not rospy.is_shutdown():
pub.publish(twist)
rate.sleep()

if __name__ == "__main__":
move_robot()
```

Process Description:
1. Use `Isaac Sim` to define work scenarios, such as test benches and robots.
2. Use ROS to control the robot's movement in Isaac Sim and control the robot's movement by publishing `Twist` messages.
3. The simulator allows you to see the robot’s behavior in the virtual environment in real time.

---

### 3. Asynchronous Task Execution: Digital Pipette Example

Scene description:
Suppose there is an automated laboratory where a digital pipette needs to complete several steps: liquid aspiration, liquid dispensing, cleaning the pipette, and aspiration again.

Task breakdown:
1. Suction of liquid (Task 1)
2. Dispensing Liquid (Task 2)
3. Cleaning the pipette (Task 3)
4. Pipette the liquid again (Task 4)

Code example (ROS + asynchronous task execution):
```Python
import rospy
from std_msgs.msg import String
from threading import Thread

def task1():
rospy.loginfo("Task 1: Aspirate liquid")
rospy.sleep(2) # simulate absorbing time

def task2():
rospy.loginfo("Task 2: Distribute liquid")
rospy.sleep(3)

def task3():
rospy.loginfo("Task 3: Clean the pipette")
rospy.sleep(4)

def task4():
rospy.loginfo("Task 4: Aspirate liquid again")
rospy.sleep(2)

def run_tasks_async():
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t3 = Thread(target=task3)
t4 = Thread(target=task4)

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

if __name__ == "__main__":
rospy.init_node('async_task_node')
run_tasks_async()
```

Description: The above code demonstrates how to execute asynchronous tasks in parallel through multithreading, which is suitable for task control of automated equipment (such as pipettes).

---

### 4. Prefect workflow orchestration example: integrating all tasks

By orchestrating all previous tasks through Prefect, ROS robot control, Isaac Sim simulation, and asynchronous task execution are integrated into a complete workflow.

Prefect Code Example:

```Python
from prefect import task, Flow
import rospy
from geometry_msgs.msg import Twist
from threading import Thread

@task
def ros_control_robot():
rospy.init_node('robot_control_node')
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
twist = Twist()
twist.linear.x = 1.0
twist.angular.z = 0.5
rospy.loginfo("control robot movement")
pub.publish(twist)
rospy.sleep(3)

@task
def run_simulation():
rospy.loginfo("Start Isaac Sim scene")
# Start the logic of Isaac Sim simulator
rospy.sleep(5)

@task
def async_tasks():
rospy.loginfo("Execute asynchronous pipetting task")
t1 = Thread(target=lambda: rospy.loginfo("Task 1: Aspirate liquid"))
t2 = Thread(target=lambda: rospy.loginfo("Task 2: Distribute liquid"))
t1.start()
t2.start()
t1.join()
t2.join()

with Flow("Mobile Robotics Workflow") as flow:
ros_control_robot()
run_simulation()
async_tasks()

# Execute workflow
flow.run()
```

Process Description:
1. ROS control robot: Issue commands through the `ros_control_robot` task to control the movement of the robot.
2. Isaac Sim simulation: Start the Isaac Sim scenario through `run_simulation` to simulate the robot task.
3. Asynchronous task execution: Execute the asynchronous digital pipette task through `async_tasks`.

Through Prefect workflow orchestration, these tasks can be easily managed and scheduled to achieve complex laboratory automation task flows.

## 🚀 Quiz

::::{tab-set}
:sync-group: category

:::{tab-item} W 2024
:sync: w2024

:::

:::{tab-item} Sp/Su 2024
:sync: sp2024

https://q.utoronto.ca/courses/370069/assignments/1393647
:::

:::{tab-item} Sp/Su 2025
:sync: sp2025

https://q.utoronto.ca/courses/393350/assignments/1499709?display=full_width_with_nav
:::

::::

## 📄 Assignment

::::{tab-set}
:sync-group: category

:::{tab-item} W 2024
:sync: w2024

:::

:::{tab-item} Sp/Su 2024
:sync: sp2024

https://q.utoronto.ca/courses/370069/assignments/1393648
:::

:::{tab-item} Sp/Su 2025
:sync: sp2025

https://q.utoronto.ca/courses/393350/assignments/1499710?display=full_width_with_nav
:::

::::

1. **Control the TurtleBot3**: Create a ROS node that moves the robot in a complex path (e.g., figure-eight pattern) while avoiding obstacles using LIDAR.
2. **Simulate and Test in Isaac Sim**: Deploy the robot in Isaac Sim’s photorealistic environment and simulate tasks like object detection and path planning.
3. **Asynchronous Task Management**: Use **Prefect** to orchestrate tasks like motion control, sensor data monitoring, and real-time analysis. Ensure that these tasks run concurrently.
4. **Define Asynchronous Execution**: Provide an explanation of how asynchrony is essential for controlling autonomous robots in a laboratory setting.
