Examples

Basic Robot Controller

Here’s a simple example showing how to create a robot controller:

from luckyrobots import LuckyRobots, Node, Reset, Step
import numpy as np
import asyncio

class MyController(Node):
    def __init__(self):
        super().__init__("my_controller")

    async def _setup_async(self):
        # Create service clients for reset and step
        self.reset_client = self.create_client(Reset, "/reset")
        self.step_client = self.create_client(Step, "/step")

    async def run_robot(self):
        # Reset the environment
        await self.reset_client.call(Reset.Request())
        print("Environment reset!")

        # Run 10 steps with random actions
        for i in range(10):
            # Sample random action (6 values for so100 robot)
            action = np.random.uniform(-1, 1, size=6)

            # Send action to robot
            response = await self.step_client.call(
                Step.Request(actuator_values=action.tolist())
            )

            print(f"Step {i+1}: Action sent, got observation")
            await asyncio.sleep(0.1)  # Small delay

# Setup and run
controller = MyController()
luckyrobots = LuckyRobots()
luckyrobots.register_node(controller)

# Start simulation
luckyrobots.start(
    scene="kitchen",
    robot="so100",
    task="pickandplace"
)

Using Robot Configuration

Access robot-specific settings for proper action limits:

from luckyrobots import LuckyRobots, Node, Reset, Step
import numpy as np

class ConfiguredController(Node):
    def __init__(self, robot_name="so100"):
        super().__init__("configured_controller")
        self.robot_config = LuckyRobots.get_robot_config(robot_name)

    async def _setup_async(self):
        self.reset_client = self.create_client(Reset, "/reset")
        self.step_client = self.create_client(Step, "/step")

    def sample_valid_action(self):
        """Sample action within robot's actual limits"""
        limits = self.robot_config["action_space"]["actuator_limits"]
        lower = [limit["lower"] for limit in limits]
        upper = [limit["upper"] for limit in limits]
        return np.random.uniform(lower, upper)

    async def control_loop(self):
        await self.reset_client.call(Reset.Request())

        for step in range(20):
            action = self.sample_valid_action()
            await self.step_client.call(
                Step.Request(actuator_values=action.tolist())
            )
            print(f"Step {step}: Valid action within limits")

Accessing Observations

Get sensor data from the robot:

class ObservationController(Node):
    async def _setup_async(self):
        self.reset_client = self.create_client(Reset, "/reset")
        self.step_client = self.create_client(Step, "/step")

    async def observe_robot(self):
        # Reset and get initial observation
        reset_response = await self.reset_client.call(Reset.Request())
        observation = reset_response.observation

        # Print joint states
        joint_states = observation.observation_state
        print(f"Joint positions: {joint_states}")

        # Check for cameras
        if observation.observation_cameras:
            print(f"Found {len(observation.observation_cameras)} cameras")
            for camera in observation.observation_cameras:
                print(f"Camera: {camera.camera_name}")

Command Line Usage

Run the included controller example with different options:

# Basic usage
python controller.py

# Specify robot and scene
python controller.py --robot so100 --scene kitchen --task pickandplace

# Show camera feed
python controller.py --show-camera

# Custom rate
python controller.py --rate 30

# Custom host
python controller.py --host 192.168.1.100 --port 3001

Simple Complete Example

Put it all together:

from luckyrobots import LuckyRobots, Node, Reset, Step, run_coroutine
import numpy as np
import asyncio

class SimpleRobot(Node):
    async def _setup_async(self):
        self.reset_client = self.create_client(Reset, "/reset")
        self.step_client = self.create_client(Step, "/step")

    async def move_robot(self):
        # Reset
        await self.reset_client.call(Reset.Request())

        # Move for 5 steps
        for i in range(5):
            action = [0.1, 0.0, 0.0, 0.0, 0.0, 1.0]  # Simple action
            await self.step_client.call(Step.Request(actuator_values=action))
            await asyncio.sleep(0.5)

        print("Robot movement complete!")

def main():
    robot = SimpleRobot()
    luckyrobots = LuckyRobots()
    luckyrobots.register_node(robot)
    luckyrobots.start(scene="kitchen", robot="so100", task="pickandplace")

    # Run the robot
    run_coroutine(robot.move_robot())

if __name__ == "__main__":
    main()

Example showing how to access camera data from observations:

import cv2
from luckyrobots import LuckyRobots, Node, Reset, Step

class CameraController(Node):
    async def _setup_async(self):
        self.reset_client = self.create_client(Reset, "/reset")
        self.step_client = self.create_client(Step, "/step")

    async def process_cameras(self, observation):
        """Process camera data from observation"""
        if observation.observation_cameras:
            for camera in observation.observation_cameras:
                print(f"Camera: {camera.camera_name}")
                print(f"Image shape: {camera.shape}")

                # Display image (if image_data is processed)
                if hasattr(camera, 'image_data') and camera.image_data is not None:
                    cv2.imshow(camera.camera_name, camera.image_data)
                    cv2.waitKey(1)

    async def run_with_cameras(self):
        reset_response = await self.reset_client.call(Reset.Request())
        await self.process_cameras(reset_response.observation)

        for i in range(50):
            action = [0.1, 0.0, 0.0, 0.0, 0.0, 1.0]  # Simple action
            step_response = await self.step_client.call(
                Step.Request(actuator_values=action)
            )
            await self.process_cameras(step_response.observation)

Command Line Interface

The included controller example supports command line arguments:

# Basic usage
python controller.py --robot so100 --scene kitchen --task pickandplace

# With camera display
python controller.py --show-camera --rate 30

# Custom host/port
python controller.py --host 192.168.1.100 --port 3001

Service and Publisher Examples

Creating custom services and publishers:

from luckyrobots import Node

class ServiceNode(Node):
    async def _setup_async(self):
        # Create a custom service
        await self.create_service(
            MyServiceType,
            "my_service",
            self.handle_my_service
        )

        # Create a publisher
        self.my_publisher = self.create_publisher(
            MyMessageType,
            "my_topic"
        )

        # Create a subscriber
        self.my_subscriber = self.create_subscription(
            MyMessageType,
            "other_topic",
            self.handle_message
        )

    async def handle_my_service(self, request):
        # Process service request
        return MyServiceType.Response(success=True)

    def handle_message(self, message):
        # Process received message
        print(f"Received: {message}")

    def publish_data(self, data):
        # Publish a message
        message = MyMessageType(data=data)
        self.my_publisher.publish(message)