API Reference

Core Classes

LuckyRobots

LuckyEngineClient

class luckyrobots.LuckyEngineClient(host: str = '127.0.0.1', port: int = 50051, timeout: float = 5.0, *, robot_name: str | None = None)[source]

Client for connecting to the LuckyEngine gRPC server.

Provides access to gRPC services for RL training: - AgentService: stepping, resets - SceneService: simulation mode control - MujocoService: health checks, joint state

Usage:

client = LuckyEngineClient(host=”127.0.0.1”, port=50051) client.connect() client.wait_for_server()

schema = client.get_agent_schema() obs = client.step(actions=[0.0] * 12)

client.close()

__init__(host: str = '127.0.0.1', port: int = 50051, timeout: float = 5.0, *, robot_name: str | None = None) None[source]

Initialize the LuckyEngine gRPC client.

Parameters:
  • host – gRPC server host address.

  • port – gRPC server port.

  • timeout – Default timeout for RPC calls in seconds.

  • robot_name – Default robot name for calls that require it.

connect() None[source]

Connect to the LuckyEngine gRPC server.

Raises:

GrpcConnectionError – If connection fails.

close() None[source]

Close the gRPC channel.

is_connected() bool[source]

Check if the client is connected.

health_check(timeout: float | None = None) bool[source]

Perform a health check by calling GetMujocoInfo.

Parameters:

timeout – Timeout in seconds (uses default if None).

Returns:

True if server responds, False otherwise.

wait_for_server(timeout: float = 30.0, poll_interval: float = 0.5) bool[source]

Wait for the gRPC server to become available.

Parameters:
  • timeout – Maximum time to wait in seconds.

  • poll_interval – Time between connection attempts.

Returns:

True if server became available, False if timeout.

property pb: Any

Access protobuf modules grouped by domain (e.g., client.pb.scene).

property robot_name: str | None

Default robot name used by calls that accept an optional robot_name.

set_robot_name(robot_name: str) None[source]

Set the default robot name used by calls that accept an optional robot_name.

property scene: Any

SceneService stub.

property mujoco: Any

MujocoService stub.

property agent: Any

AgentService stub.

property debug: Any

DebugService stub.

configure_cameras(cameras: list[dict]) None[source]

Configure cameras to capture on every Step RPC.

Parameters:

cameras – List of camera configs. Each dict has keys: name: Camera entity name in the scene. width: Desired image width (0 = native resolution). height: Desired image height (0 = native resolution).

get_joint_state(robot_name: str = '', timeout: float | None = None)[source]

Get current joint state (positions and velocities).

Parameters:
  • robot_name – Robot entity name (uses default if empty).

  • timeout – RPC timeout in seconds.

Returns:

GetJointStateResponse with state.positions (qpos) and state.velocities (qvel).

get_mujoco_info(robot_name: str = '', timeout: float | None = None)[source]

Get MuJoCo model information (joint names, limits, etc.).

get_agent_schema(agent_name: str = '', timeout: float | None = None)[source]

Get agent schema (observation/action sizes and names).

The schema is cached for subsequent step() calls to enable named access to observation values.

Parameters:
  • agent_name – Agent name (empty = default agent).

  • timeout – RPC timeout.

Returns:

GetAgentSchemaResponse with schema containing observation_names, action_names, observation_size, and action_size.

reset_agent(agent_name: str = '', randomization_cfg: Any | None = None, timeout: float | None = None)[source]

Reset a specific agent.

Parameters:
  • agent_name – Agent logical name. Empty string means default agent.

  • randomization_cfg – Optional simulation contract config for this reset.

  • timeout – Timeout in seconds (uses default if None).

Returns:

ResetAgentResponse with success and message fields.

step(actions: list[float], agent_name: str = '', step_timeout_s: float = 0.0, timeout: float | None = None) ObservationResponse[source]

Synchronous RL step: apply action, wait for physics, return observation.

Parameters:
  • actions – Action vector to apply for this step.

  • agent_name – Agent name (empty = default agent).

  • step_timeout_s – Server-side timeout for waiting for the physics step (seconds). 0 means use server default.

  • timeout – RPC timeout in seconds.

Returns:

ObservationResponse with observation after physics step.

report_progress(*, run_id: str = '', task_name: str = '', policy_name: str = '', phase: str = '', current_episode: int = 0, total_episodes: int = 0, current_step: int = 0, max_steps: int = 0, elapsed_s: float = 0.0, status_text: str = '', finished: bool = False) None[source]

Report evaluation/training progress to the engine for UI display.

Fire-and-forget: errors are logged but never raised.

set_simulation_mode(mode: str = 'fast', timeout: float | None = None)[source]

Set simulation timing mode.

Parameters:
  • mode – “realtime”, “deterministic”, or “fast” - realtime: Physics runs at 1x wall-clock speed - deterministic: Physics runs at fixed rate - fast: Physics runs as fast as possible (for RL training)

  • timeout – RPC timeout in seconds.

Returns:

SetSimulationModeResponse with success and current mode.

benchmark(duration_seconds: float = 5.0, method: str = 'step', print_results: bool = False) BenchmarkResult[source]

Benchmark a client method by calling it in a tight loop.

Parameters:
  • duration_seconds – How long to run the benchmark.

  • method – Method to benchmark. Currently supports “step”.

  • print_results – Print results to stdout.

Returns:

BenchmarkResult with timing statistics.

Raises:

ValueError – If method is not recognized.

GrpcConnectionError

class luckyrobots.GrpcConnectionError(message: str)[source]

Raised when gRPC connection fails.

__init__(message: str)[source]

Models

Engine Management

Engine Lifecycle

Engine lifecycle management for LuckyEngine.