API Reference
Core Classes
LuckyRobots
LuckyEngineClient
- class luckyrobots.LuckyEngineClient(host: str = '127.0.0.1', port: int = 50051, timeout: float = 5.0, *, robot_name: str | None = None)[source]
Client for connecting to the LuckyEngine gRPC server.
Provides access to gRPC services for RL training: - AgentService: stepping, resets - SceneService: simulation mode control - MujocoService: health checks, joint state
- Usage:
client = LuckyEngineClient(host=”127.0.0.1”, port=50051) client.connect() client.wait_for_server()
schema = client.get_agent_schema() obs = client.step(actions=[0.0] * 12)
client.close()
- __init__(host: str = '127.0.0.1', port: int = 50051, timeout: float = 5.0, *, robot_name: str | None = None) None[source]
Initialize the LuckyEngine gRPC client.
- Parameters:
host – gRPC server host address.
port – gRPC server port.
timeout – Default timeout for RPC calls in seconds.
robot_name – Default robot name for calls that require it.
- connect() None[source]
Connect to the LuckyEngine gRPC server.
- Raises:
GrpcConnectionError – If connection fails.
- health_check(timeout: float | None = None) bool[source]
Perform a health check by calling GetMujocoInfo.
- Parameters:
timeout – Timeout in seconds (uses default if None).
- Returns:
True if server responds, False otherwise.
- wait_for_server(timeout: float = 30.0, poll_interval: float = 0.5) bool[source]
Wait for the gRPC server to become available.
- Parameters:
timeout – Maximum time to wait in seconds.
poll_interval – Time between connection attempts.
- Returns:
True if server became available, False if timeout.
- property pb: Any
Access protobuf modules grouped by domain (e.g., client.pb.scene).
- property robot_name: str | None
Default robot name used by calls that accept an optional robot_name.
- set_robot_name(robot_name: str) None[source]
Set the default robot name used by calls that accept an optional robot_name.
- property scene: Any
SceneService stub.
- property mujoco: Any
MujocoService stub.
- property agent: Any
AgentService stub.
- property debug: Any
DebugService stub.
- configure_cameras(cameras: list[dict]) None[source]
Configure cameras to capture on every Step RPC.
- Parameters:
cameras – List of camera configs. Each dict has keys: name: Camera entity name in the scene. width: Desired image width (0 = native resolution). height: Desired image height (0 = native resolution).
- get_joint_state(robot_name: str = '', timeout: float | None = None)[source]
Get current joint state (positions and velocities).
- Parameters:
robot_name – Robot entity name (uses default if empty).
timeout – RPC timeout in seconds.
- Returns:
GetJointStateResponse with state.positions (qpos) and state.velocities (qvel).
- get_mujoco_info(robot_name: str = '', timeout: float | None = None)[source]
Get MuJoCo model information (joint names, limits, etc.).
- get_agent_schema(agent_name: str = '', timeout: float | None = None)[source]
Get agent schema (observation/action sizes and names).
The schema is cached for subsequent step() calls to enable named access to observation values.
- Parameters:
agent_name – Agent name (empty = default agent).
timeout – RPC timeout.
- Returns:
GetAgentSchemaResponse with schema containing observation_names, action_names, observation_size, and action_size.
- reset_agent(agent_name: str = '', randomization_cfg: Any | None = None, timeout: float | None = None)[source]
Reset a specific agent.
- Parameters:
agent_name – Agent logical name. Empty string means default agent.
randomization_cfg – Optional simulation contract config for this reset.
timeout – Timeout in seconds (uses default if None).
- Returns:
ResetAgentResponse with success and message fields.
- step(actions: list[float], agent_name: str = '', step_timeout_s: float = 0.0, timeout: float | None = None) ObservationResponse[source]
Synchronous RL step: apply action, wait for physics, return observation.
- Parameters:
actions – Action vector to apply for this step.
agent_name – Agent name (empty = default agent).
step_timeout_s – Server-side timeout for waiting for the physics step (seconds). 0 means use server default.
timeout – RPC timeout in seconds.
- Returns:
ObservationResponse with observation after physics step.
- report_progress(*, run_id: str = '', task_name: str = '', policy_name: str = '', phase: str = '', current_episode: int = 0, total_episodes: int = 0, current_step: int = 0, max_steps: int = 0, elapsed_s: float = 0.0, status_text: str = '', finished: bool = False) None[source]
Report evaluation/training progress to the engine for UI display.
Fire-and-forget: errors are logged but never raised.
- set_simulation_mode(mode: str = 'fast', timeout: float | None = None)[source]
Set simulation timing mode.
- Parameters:
mode – “realtime”, “deterministic”, or “fast” - realtime: Physics runs at 1x wall-clock speed - deterministic: Physics runs at fixed rate - fast: Physics runs as fast as possible (for RL training)
timeout – RPC timeout in seconds.
- Returns:
SetSimulationModeResponse with success and current mode.
- benchmark(duration_seconds: float = 5.0, method: str = 'step', print_results: bool = False) BenchmarkResult[source]
Benchmark a client method by calling it in a tight loop.
- Parameters:
duration_seconds – How long to run the benchmark.
method – Method to benchmark. Currently supports “step”.
print_results – Print results to stdout.
- Returns:
BenchmarkResult with timing statistics.
- Raises:
ValueError – If method is not recognized.
GrpcConnectionError
Models
Engine Management
Engine Lifecycle
Engine lifecycle management for LuckyEngine.