agents.manual_agent

Manual Agent implementation. Allows keyboard control of the agent via the terminal.

  1"""
  2Manual Agent implementation.
  3Allows keyboard control of the agent via the terminal.
  4"""
  5
  6import asyncio
  7import json
  8import sys
  9import termios
 10import tty
 11from typing import Any, List, Optional
 12
 13try:
 14    from base_agent import BaseAgent
 15except ImportError:
 16    from agents.base_agent import BaseAgent
 17
 18
 19def getch() -> str:
 20    """Reads a single character from the standard input (Linux/macOS).
 21
 22    Returns:
 23        str: The character read, in lowercase.
 24    """
 25    fd = sys.stdin.fileno()
 26    old_settings = termios.tcgetattr(fd)
 27    try:
 28        tty.setraw(sys.stdin.fileno())
 29        ch = sys.stdin.read(1)
 30    finally:
 31        termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
 32    return ch.lower()
 33
 34
 35class ManualAgent(BaseAgent):
 36    """An agent controlled manually via the terminal using W, A, S, D keys instantly."""
 37
 38    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
 39        """Initialize the manual agent.
 40
 41        Args:
 42            server_uri (str): URI of the simulation server.
 43        """
 44        super().__init__(server_uri)
 45        self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"}
 46
 47    async def get_manual_action(self) -> Optional[str]:
 48        """Prompts the user for a valid WASD input.
 49
 50        Returns:
 51            Optional[str]: The chosen direction or None.
 52        """
 53        if not self.current_state:
 54            return None
 55
 56        if self.current_state.get("objective_reached"):
 57            return None
 58
 59        valid_actions: List[str] = self.current_state.get("valid_actions", [])
 60
 61        print(f"\n--- Agent at {self.current_state.get('position')} ---")
 62        print(f"Valid directions: {valid_actions}")
 63        print("Press W/A/S/D to move... ", end="", flush=True)
 64
 65        while True:
 66            # Run the blocking terminal read in a background thread
 67            user_input = await asyncio.to_thread(getch)
 68
 69            # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode
 70            if user_input == "\x03":
 71                print("\nExiting...")
 72                sys.exit(0)
 73
 74            if user_input in self.key_mapping:
 75                action = self.key_mapping[user_input]
 76
 77                if action in valid_actions:
 78                    print(action)
 79                    return action
 80                else:
 81                    print(
 82                        f"\rObstacle at {action}. Try again (W/A/S/D)... ",
 83                        end="",
 84                        flush=True,
 85                    )
 86
 87    async def deliberate_maze(self) -> Optional[str]:
 88        """Logic for maps where 'target' is defined.
 89
 90        Returns:
 91            Optional[str]: The chosen direction or None.
 92        """
 93        return await self.get_manual_action()
 94
 95    async def deliberate_room(self) -> Optional[str]:
 96        """Logic for room clearing (no target).
 97
 98        Returns:
 99            Optional[str]: The chosen direction or None.
100        """
101        return await self.get_manual_action()
102
103    async def send_telemetry(self, websocket: Any) -> None:
104        """Send blank telemetry to satisfy the UI requirement.
105
106        Args:
107            websocket (Any): The current WebSocket connection.
108        """
109        payload = {
110            "action": "telemetry",
111            "data": {
112                "visited": [],
113                "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0},
114            },
115        }
116        await websocket.send(json.dumps(payload))
117
118
119if __name__ == "__main__":
120    agent = ManualAgent()
121    print("Starting Manual Agent...")
122    try:
123        asyncio.run(agent.run())
124    except KeyboardInterrupt:
125        print("\nAgent shut down manually.")
def getch() -> str:
20def getch() -> str:
21    """Reads a single character from the standard input (Linux/macOS).
22
23    Returns:
24        str: The character read, in lowercase.
25    """
26    fd = sys.stdin.fileno()
27    old_settings = termios.tcgetattr(fd)
28    try:
29        tty.setraw(sys.stdin.fileno())
30        ch = sys.stdin.read(1)
31    finally:
32        termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
33    return ch.lower()

Reads a single character from the standard input (Linux/macOS).

Returns: str: The character read, in lowercase.

class ManualAgent(agents.base_agent.BaseAgent):
 36class ManualAgent(BaseAgent):
 37    """An agent controlled manually via the terminal using W, A, S, D keys instantly."""
 38
 39    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
 40        """Initialize the manual agent.
 41
 42        Args:
 43            server_uri (str): URI of the simulation server.
 44        """
 45        super().__init__(server_uri)
 46        self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"}
 47
 48    async def get_manual_action(self) -> Optional[str]:
 49        """Prompts the user for a valid WASD input.
 50
 51        Returns:
 52            Optional[str]: The chosen direction or None.
 53        """
 54        if not self.current_state:
 55            return None
 56
 57        if self.current_state.get("objective_reached"):
 58            return None
 59
 60        valid_actions: List[str] = self.current_state.get("valid_actions", [])
 61
 62        print(f"\n--- Agent at {self.current_state.get('position')} ---")
 63        print(f"Valid directions: {valid_actions}")
 64        print("Press W/A/S/D to move... ", end="", flush=True)
 65
 66        while True:
 67            # Run the blocking terminal read in a background thread
 68            user_input = await asyncio.to_thread(getch)
 69
 70            # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode
 71            if user_input == "\x03":
 72                print("\nExiting...")
 73                sys.exit(0)
 74
 75            if user_input in self.key_mapping:
 76                action = self.key_mapping[user_input]
 77
 78                if action in valid_actions:
 79                    print(action)
 80                    return action
 81                else:
 82                    print(
 83                        f"\rObstacle at {action}. Try again (W/A/S/D)... ",
 84                        end="",
 85                        flush=True,
 86                    )
 87
 88    async def deliberate_maze(self) -> Optional[str]:
 89        """Logic for maps where 'target' is defined.
 90
 91        Returns:
 92            Optional[str]: The chosen direction or None.
 93        """
 94        return await self.get_manual_action()
 95
 96    async def deliberate_room(self) -> Optional[str]:
 97        """Logic for room clearing (no target).
 98
 99        Returns:
100            Optional[str]: The chosen direction or None.
101        """
102        return await self.get_manual_action()
103
104    async def send_telemetry(self, websocket: Any) -> None:
105        """Send blank telemetry to satisfy the UI requirement.
106
107        Args:
108            websocket (Any): The current WebSocket connection.
109        """
110        payload = {
111            "action": "telemetry",
112            "data": {
113                "visited": [],
114                "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0},
115            },
116        }
117        await websocket.send(json.dumps(payload))

An agent controlled manually via the terminal using W, A, S, D keys instantly.

ManualAgent(server_uri: str = 'ws://localhost:8765')
39    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
40        """Initialize the manual agent.
41
42        Args:
43            server_uri (str): URI of the simulation server.
44        """
45        super().__init__(server_uri)
46        self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"}

Initialize the manual agent.

Args: server_uri (str): URI of the simulation server.

key_mapping
async def get_manual_action(self) -> Optional[str]:
48    async def get_manual_action(self) -> Optional[str]:
49        """Prompts the user for a valid WASD input.
50
51        Returns:
52            Optional[str]: The chosen direction or None.
53        """
54        if not self.current_state:
55            return None
56
57        if self.current_state.get("objective_reached"):
58            return None
59
60        valid_actions: List[str] = self.current_state.get("valid_actions", [])
61
62        print(f"\n--- Agent at {self.current_state.get('position')} ---")
63        print(f"Valid directions: {valid_actions}")
64        print("Press W/A/S/D to move... ", end="", flush=True)
65
66        while True:
67            # Run the blocking terminal read in a background thread
68            user_input = await asyncio.to_thread(getch)
69
70            # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode
71            if user_input == "\x03":
72                print("\nExiting...")
73                sys.exit(0)
74
75            if user_input in self.key_mapping:
76                action = self.key_mapping[user_input]
77
78                if action in valid_actions:
79                    print(action)
80                    return action
81                else:
82                    print(
83                        f"\rObstacle at {action}. Try again (W/A/S/D)... ",
84                        end="",
85                        flush=True,
86                    )

Prompts the user for a valid WASD input.

Returns: Optional[str]: The chosen direction or None.

async def deliberate_maze(self) -> Optional[str]:
88    async def deliberate_maze(self) -> Optional[str]:
89        """Logic for maps where 'target' is defined.
90
91        Returns:
92            Optional[str]: The chosen direction or None.
93        """
94        return await self.get_manual_action()

Logic for maps where 'target' is defined.

Returns: Optional[str]: The chosen direction or None.

async def deliberate_room(self) -> Optional[str]:
 96    async def deliberate_room(self) -> Optional[str]:
 97        """Logic for room clearing (no target).
 98
 99        Returns:
100            Optional[str]: The chosen direction or None.
101        """
102        return await self.get_manual_action()

Logic for room clearing (no target).

Returns: Optional[str]: The chosen direction or None.

async def send_telemetry(self, websocket: Any) -> None:
104    async def send_telemetry(self, websocket: Any) -> None:
105        """Send blank telemetry to satisfy the UI requirement.
106
107        Args:
108            websocket (Any): The current WebSocket connection.
109        """
110        payload = {
111            "action": "telemetry",
112            "data": {
113                "visited": [],
114                "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0},
115            },
116        }
117        await websocket.send(json.dumps(payload))

Send blank telemetry to satisfy the UI requirement.

Args: websocket (Any): The current WebSocket connection.