import cv2
import sys
import json
import msgpack
import asyncio
import logging
import secrets
import platform
import signal
import threading
import time
from typing import Dict
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from websocket import create_connection
from .manager import Manager
from ..message.transporter import MessageType, TransportMessage
from ..message.srv.types import Reset, Step
from ..utils.run_executable import is_luckyworld_running, run_luckyworld_executable
from ..utils.library_dev import library_dev
from ..core.models import ObservationModel
from .node import Node
from ..utils.event_loop import (
get_event_loop,
initialize_event_loop,
)
from ..utils.helpers import (
validate_params,
get_robot_config,
)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("luckyrobots")
# FastAPI app and manager instances
app = FastAPI()
manager = Manager()
[docs]
class LuckyRobots(Node):
"""Main LuckyRobots node for managing robot communication and control"""
host = "localhost"
port = 3000
robot_client = None
world_client = None
_pending_resets = {}
_pending_steps = {}
_running = False
_nodes: Dict[str, "Node"] = {}
_shutdown_event = threading.Event()
[docs]
def __init__(self, host: str = None, port: int = None):
initialize_event_loop()
self.host = host or self.host
self.port = port or self.port
# Initialize clients and state
self.robot_client = None
self.world_client = None
self._pending_resets = {}
self._pending_steps = {}
self._running = False
self._nodes: Dict[str, Node] = {}
self._shutdown_event = threading.Event()
if not self._is_websocket_server_running():
self._start_websocket_server()
super().__init__("lucky_robots_manager", "", self.host, self.port)
app.lucky_robots = self
def _is_websocket_server_running(self) -> bool:
"""Check if the websocket server is already running"""
try:
ws_url = f"ws://{self.host}:{self.port}/nodes"
ws = create_connection(ws_url, timeout=1)
ws.close()
logger.info(f"WebSocket server running on {self.host}:{self.port}")
return True
except Exception as e:
logger.info(f"WebSocket server not running on {self.host}:{self.port}")
return False
def _start_websocket_server(self) -> None:
"""Start the websocket server in a separate thread using uvicorn"""
def run_server():
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
uvicorn.run(app, host=self.host, port=self.port, log_level="warning")
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
logger.info(f"Starting WebSocket server on {self.host}:{self.port}")
# Give the server time to start
time.sleep(0.5)
[docs]
@staticmethod
def set_host(ip_address: str) -> None:
"""Set the host address for the LuckyRobots node"""
LuckyRobots.host = ip_address
[docs]
@staticmethod
def get_robot_config(robot: str = None) -> dict:
"""Get the configuration for the LuckyRobots node"""
return get_robot_config(robot)
[docs]
def register_node(self, node: Node) -> None:
"""Register a node with the LuckyRobots node"""
self._nodes[node.full_name] = node
logger.info(f"Registered node: {node.full_name}")
async def _setup_async(self):
"""Setup the LuckyRobots node asynchronously"""
self.reset_service = await self.create_service(
Reset, "/reset", self.handle_reset
)
self.step_service = await self.create_service(Step, "/step", self.handle_step)
[docs]
def start(
self,
scene: str = "ArmLevel",
robot: str = "so100",
task: str = "pickandplace",
observation_type: str = "pixels_agent_pos",
game_path: str = None,
) -> None:
"""Start the LuckyRobots node"""
if self._running:
logger.warning("LuckyRobots is already running")
return
validate_params(scene, robot, task, observation_type)
self.process_cameras = "pixels" in observation_type
if not is_luckyworld_running() and "--lr-no-executable" not in sys.argv:
logger.warning("LuckyWorld is not running, starting it now...")
run_luckyworld_executable(scene, robot, task, game_path)
library_dev()
self._setup_signal_handlers()
# Start all registered nodes
for node in self._nodes.values():
try:
node.start()
except Exception as e:
logger.error(f"Error starting node {node.full_name}: {e}")
# Start the luckyrobots node
super().start()
self._running = True
def _setup_signal_handlers(self) -> None:
"""Setup signal handlers for the LuckyRobots node to handle Ctrl+C"""
def sigint_handler(signum, frame):
print("\nCtrl+C pressed. Shutting down...")
self.shutdown()
signal.signal(signal.SIGINT, sigint_handler)
def _display_welcome_message(self) -> None:
"""Display the welcome message for the LuckyRobots node in the terminal"""
welcome_art = [
"*" * 60,
" ",
" ",
"▄▄▌ ▄• ▄▌ ▄▄· ▄ •▄ ▄· ▄▌▄▄▄ ▄▄▄▄· ▄▄▄▄▄.▄▄ · ",
"██• █▪██▌▐█ ▌▪█▌▄▌▪▐█▪██▌▀▄ █·▪ ▐█ ▀█▪▪ •██ ▐█ ▀. ",
"██▪ █▌▐█▌██ ▄▄▐▀▀▄·▐█▌▐█▪▐▀▀▄ ▄█▀▄ ▐█▀▀█▄ ▄█▀▄ ▐█.▪▄▀▀▀█▄",
"▐█▌▐▌▐█▄█▌▐███▌▐█.█▌ ▐█▀·.▐█•█▌▐█▌.▐▌██▄▪▐█▐█▌.▐▌ ▐█▌·▐█▄▪▐█",
".▀▀▀ ▀▀▀ ·▀▀▀ ·▀ ▀ ▀ • .▀ ▀ ▀█▄▀▪·▀▀▀▀ ▀█▄▀▪ ▀▀▀ ▀▀▀▀ ",
" ",
" ",
]
for line in welcome_art:
print(line)
if platform.system() == "Darwin":
mac_instructions = [
"*" * 60,
"For macOS users:",
"Please be patient. The application may take up to a minute to open on its first launch.",
"If the application doesn't appear, please follow these steps:",
"1. Open System Settings",
"2. Navigate to Privacy & Security",
"3. Scroll down and click 'Allow' next to the 'luckyrobots' app",
"*" * 60,
]
for line in mac_instructions:
print(line)
final_messages = [
"Lucky Robots application started successfully.",
"To move the robot: Choose a level and tick the HTTP checkbox.",
"To receive camera feed: Choose a level and tick the Capture checkbox.",
"*" * 60,
]
for line in final_messages:
print(line)
[docs]
def wait_for_world_client(self, timeout: float = 60.0) -> bool:
"""Wait for the world client to connect to the websocket server"""
start_time = time.perf_counter()
logger.info(f"Waiting for world client to connect for {timeout} seconds")
while not self.world_client and time.perf_counter() - start_time < timeout:
time.sleep(0.5)
if self.world_client:
logger.info("World client connected successfully")
return True
else:
logger.error("No world client connected after 60 seconds")
self.shutdown()
raise
[docs]
async def handle_reset(self, request: Reset.Request) -> Reset.Response:
"""Handle the reset request by forwarding to the world client"""
if self.world_client is None:
logger.error("No world client connection available")
self.shutdown()
raise
request_id = secrets.token_hex(4)
shared_loop = get_event_loop()
response_future = shared_loop.create_future()
self._pending_resets[request_id] = response_future
seed = getattr(request, "seed", None)
options = getattr(request, "options", None)
request_data = {
"request_type": "reset",
"request_id": request_id,
"seed": seed,
"options": options,
}
try:
await self.world_client.send_text(json.dumps(request_data))
response_data = await asyncio.wait_for(response_future, timeout=30.0)
observation = ObservationModel(**response_data["Observation"])
if self.process_cameras:
observation.process_all_cameras()
return Reset.Response(
success=True,
message="Reset request processed",
request_type=response_data["RequestType"],
request_id=response_data["RequestID"],
time_stamp=response_data["TimeStamp"],
observation=observation,
info=response_data["Info"],
)
except Exception as e:
self._pending_resets.pop(request_id, None)
logger.error(f"Error processing reset request: {e}")
self.shutdown()
raise
[docs]
async def handle_step(self, request: Step.Request) -> Step.Response:
"""Handle the step request by forwarding to the world client"""
if self.world_client is None:
logger.error("No world client connection available")
self.shutdown()
raise
request_id = secrets.token_hex(4)
shared_loop = get_event_loop()
response_future = shared_loop.create_future()
self._pending_steps[request_id] = response_future
self._pending_steps[request_id] = response_future
request_data = {
"request_type": "step",
"request_id": request_id,
"actuator_values": request.actuator_values,
}
try:
await self.world_client.send_text(json.dumps(request_data))
response_data = await asyncio.wait_for(response_future, timeout=30.0)
observation = ObservationModel(**response_data["Observation"])
if self.process_cameras:
observation.process_all_cameras()
return Step.Response(
success=True,
message="Step request processed",
request_type=response_data["RequestType"],
request_id=response_data["RequestID"],
time_stamp=response_data["TimeStamp"],
observation=observation,
info=response_data["Info"],
)
except Exception as e:
self._pending_steps.pop(request_id, None)
logger.error(f"Error processing step request: {e}")
self.shutdown()
raise
[docs]
def spin(self) -> None:
"""Spin the LuckyRobots node to keep it running"""
if not self._running:
logger.warning("LuckyRobots is not running")
return
self._display_welcome_message()
logger.info("LuckyRobots spinning")
try:
self._shutdown_event.wait()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received. Shutting down...")
self.shutdown()
logger.info("LuckyRobots stopped spinning")
def _stop_websocket_server(self) -> None:
"""Stop the WebSocket server if it's running"""
if hasattr(self, "_server") and self._server is not None:
logger.info("Stopping WebSocket server...")
self._server.should_exit = True
if (
hasattr(self, "_server_thread")
and self._server_thread
and self._server_thread.is_alive()
):
self._server_thread.join(timeout=2.0)
if self._server_thread.is_alive():
logger.warning(
"WebSocket server thread did not terminate gracefully"
)
else:
logger.info("WebSocket server stopped")
self._server = None
self._server_thread = None
def _cleanup_camera_windows(self) -> None:
"""Clean up all OpenCV windows and reset tracking"""
try:
# Only cleanup if we're in the main thread to avoid Qt warnings
if threading.current_thread() == threading.main_thread():
cv2.destroyAllWindows()
cv2.waitKey(1)
else:
# If not in main thread, just skip OpenCV cleanup
# The windows will close when the main thread exits anyway
pass
except Exception:
# Ignore any errors during cleanup
pass
[docs]
def shutdown(self) -> None:
"""Shutdown the LuckyRobots node and clean up resources"""
if not self._running:
return
logger.info("Starting LuckyRobots shutdown")
self._running = False
# 1. Shutdown nodes first (they depend on transport)
for node_name, node in self._nodes.items():
try:
node.shutdown()
except Exception as e:
logger.error(f"Error shutting down node {node_name}: {e}")
# 2. Shutdown self (transport layer)
try:
super().shutdown()
except Exception as e:
logger.error(f"Error shutting down LuckyRobots transport: {e}")
# 3. Clean up UI resources
self._cleanup_camera_windows()
# 4. Set shutdown event
self._shutdown_event.set()
logger.info("LuckyRobots shutdown complete")
@app.websocket("/nodes")
async def nodes_endpoint(websocket: WebSocket) -> None:
"""WebSocket endpoint for node communication"""
await websocket.accept()
node_name = None
try:
# Wait for the first message, which should be NODE_ANNOUNCE
message = await websocket.receive_bytes()
message_data = msgpack.unpackb(message)
message = TransportMessage(**message_data)
if message.msg_type != MessageType.NODE_ANNOUNCE:
logger.warning(
f"First message from node should be NODE_ANNOUNCE, got {message.msg_type}"
)
await websocket.close(4000, "First message must be NODE_ANNOUNCE")
return
# Register the node
node_name = message.node_name
await manager.register_node(node_name, websocket)
# Message processing loop
while True:
try:
message = await websocket.receive_bytes()
message_data = msgpack.unpackb(message)
message = TransportMessage(**message_data)
# Process message based on type
handlers = {
MessageType.SUBSCRIBE: lambda: manager.subscribe(
node_name, message.topic_or_service
),
MessageType.UNSUBSCRIBE: lambda: manager.unsubscribe(
node_name, message.topic_or_service
),
MessageType.SERVICE_REGISTER: lambda: manager.register_service(
node_name, message.topic_or_service
),
MessageType.SERVICE_UNREGISTER: lambda: manager.unregister_service(
node_name, message.topic_or_service
),
MessageType.NODE_SHUTDOWN: lambda: None, # Will break the loop
}
if message.msg_type in handlers:
if message.msg_type == MessageType.NODE_SHUTDOWN:
logger.info(f"Node {node_name} shutting down")
break
await handlers[message.msg_type]()
else:
await manager.route_message(message)
except msgpack.UnpackValueError:
logger.error(f"Received invalid msgpack from {node_name}")
except Exception as e:
logger.error(f"Error processing message from {node_name}: {e}")
except WebSocketDisconnect:
logger.info(f"Node {node_name} disconnected")
@app.websocket("/world")
async def world_endpoint(websocket: WebSocket) -> None:
"""WebSocket endpoint for world client communication"""
await websocket.accept()
if hasattr(app, "lucky_robots"):
app.lucky_robots.world_client = websocket
logger.info("World client connected")
lucky_robots = app.lucky_robots
pending_resets = lucky_robots._pending_resets
pending_steps = lucky_robots._pending_steps
try:
while True:
try:
message_bytes = await websocket.receive_bytes()
message_data = msgpack.unpackb(message_bytes)
request_type = message_data.get("RequestType")
request_id = message_data.get("RequestID")
shared_loop = get_event_loop()
if request_type == "reset_response":
future = pending_resets.get(request_id)
shared_loop.call_soon_threadsafe(
lambda: future.set_result(message_data)
if not future.done()
else None
)
shared_loop.call_soon_threadsafe(
lambda: pending_resets.pop(request_id, None)
)
elif request_type == "step_response":
future = pending_steps.get(request_id)
shared_loop.call_soon_threadsafe(
lambda: future.set_result(message_data)
if not future.done()
else None
)
shared_loop.call_soon_threadsafe(
lambda: pending_resets.pop(request_id, None)
if request_type == "reset_response"
else pending_steps.pop(request_id, None)
)
else:
logger.warning(f"Unhandled message type: {request_type}")
except WebSocketDisconnect as e:
logger.info(f"WebSocket disconnected. Code: {e.code}")
break
except Exception as e:
logger.error(f"Message processing error: {type(e).__name__}: {e}")
break
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"Critical error in world_endpoint: {type(e).__name__}: {e}")
finally:
if hasattr(app, "lucky_robots") and app.lucky_robots.world_client == websocket:
app.lucky_robots.world_client = None