agents.manual_agent
Manual Agent implementation. Allows keyboard control of the agent via the terminal.
1""" 2Manual Agent implementation. 3Allows keyboard control of the agent via the terminal. 4""" 5 6import asyncio 7import json 8import sys 9import termios 10import tty 11from typing import Any, List, Optional 12 13try: 14 from base_agent import BaseAgent 15except ImportError: 16 from agents.base_agent import BaseAgent 17 18 19def getch() -> str: 20 """Reads a single character from the standard input (Linux/macOS). 21 22 Returns: 23 str: The character read, in lowercase. 24 """ 25 fd = sys.stdin.fileno() 26 old_settings = termios.tcgetattr(fd) 27 try: 28 tty.setraw(sys.stdin.fileno()) 29 ch = sys.stdin.read(1) 30 finally: 31 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 32 return ch.lower() 33 34 35class ManualAgent(BaseAgent): 36 """An agent controlled manually via the terminal using W, A, S, D keys instantly.""" 37 38 def __init__(self, server_uri: str = "ws://localhost:8765") -> None: 39 """Initialize the manual agent. 40 41 Args: 42 server_uri (str): URI of the simulation server. 43 """ 44 super().__init__(server_uri) 45 self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"} 46 47 async def get_manual_action(self) -> Optional[str]: 48 """Prompts the user for a valid WASD input. 49 50 Returns: 51 Optional[str]: The chosen direction or None. 52 """ 53 if not self.current_state: 54 return None 55 56 if self.current_state.get("objective_reached"): 57 return None 58 59 valid_actions: List[str] = self.current_state.get("valid_actions", []) 60 61 print(f"\n--- Agent at {self.current_state.get('position')} ---") 62 print(f"Valid directions: {valid_actions}") 63 print("Press W/A/S/D to move... ", end="", flush=True) 64 65 while True: 66 # Run the blocking terminal read in a background thread 67 user_input = await asyncio.to_thread(getch) 68 69 # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode 70 if user_input == "\x03": 71 print("\nExiting...") 72 sys.exit(0) 73 74 if user_input in self.key_mapping: 75 action = self.key_mapping[user_input] 76 77 if action in valid_actions: 78 print(action) 79 return action 80 else: 81 print( 82 f"\rObstacle at {action}. Try again (W/A/S/D)... ", 83 end="", 84 flush=True, 85 ) 86 87 async def deliberate_maze(self) -> Optional[str]: 88 """Logic for maps where 'target' is defined. 89 90 Returns: 91 Optional[str]: The chosen direction or None. 92 """ 93 return await self.get_manual_action() 94 95 async def deliberate_room(self) -> Optional[str]: 96 """Logic for room clearing (no target). 97 98 Returns: 99 Optional[str]: The chosen direction or None. 100 """ 101 return await self.get_manual_action() 102 103 async def send_telemetry(self, websocket: Any) -> None: 104 """Send blank telemetry to satisfy the UI requirement. 105 106 Args: 107 websocket (Any): The current WebSocket connection. 108 """ 109 payload = { 110 "action": "telemetry", 111 "data": { 112 "visited": [], 113 "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0}, 114 }, 115 } 116 await websocket.send(json.dumps(payload)) 117 118 119if __name__ == "__main__": 120 agent = ManualAgent() 121 print("Starting Manual Agent...") 122 try: 123 asyncio.run(agent.run()) 124 except KeyboardInterrupt: 125 print("\nAgent shut down manually.")
20def getch() -> str: 21 """Reads a single character from the standard input (Linux/macOS). 22 23 Returns: 24 str: The character read, in lowercase. 25 """ 26 fd = sys.stdin.fileno() 27 old_settings = termios.tcgetattr(fd) 28 try: 29 tty.setraw(sys.stdin.fileno()) 30 ch = sys.stdin.read(1) 31 finally: 32 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 33 return ch.lower()
Reads a single character from the standard input (Linux/macOS).
Returns: str: The character read, in lowercase.
36class ManualAgent(BaseAgent): 37 """An agent controlled manually via the terminal using W, A, S, D keys instantly.""" 38 39 def __init__(self, server_uri: str = "ws://localhost:8765") -> None: 40 """Initialize the manual agent. 41 42 Args: 43 server_uri (str): URI of the simulation server. 44 """ 45 super().__init__(server_uri) 46 self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"} 47 48 async def get_manual_action(self) -> Optional[str]: 49 """Prompts the user for a valid WASD input. 50 51 Returns: 52 Optional[str]: The chosen direction or None. 53 """ 54 if not self.current_state: 55 return None 56 57 if self.current_state.get("objective_reached"): 58 return None 59 60 valid_actions: List[str] = self.current_state.get("valid_actions", []) 61 62 print(f"\n--- Agent at {self.current_state.get('position')} ---") 63 print(f"Valid directions: {valid_actions}") 64 print("Press W/A/S/D to move... ", end="", flush=True) 65 66 while True: 67 # Run the blocking terminal read in a background thread 68 user_input = await asyncio.to_thread(getch) 69 70 # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode 71 if user_input == "\x03": 72 print("\nExiting...") 73 sys.exit(0) 74 75 if user_input in self.key_mapping: 76 action = self.key_mapping[user_input] 77 78 if action in valid_actions: 79 print(action) 80 return action 81 else: 82 print( 83 f"\rObstacle at {action}. Try again (W/A/S/D)... ", 84 end="", 85 flush=True, 86 ) 87 88 async def deliberate_maze(self) -> Optional[str]: 89 """Logic for maps where 'target' is defined. 90 91 Returns: 92 Optional[str]: The chosen direction or None. 93 """ 94 return await self.get_manual_action() 95 96 async def deliberate_room(self) -> Optional[str]: 97 """Logic for room clearing (no target). 98 99 Returns: 100 Optional[str]: The chosen direction or None. 101 """ 102 return await self.get_manual_action() 103 104 async def send_telemetry(self, websocket: Any) -> None: 105 """Send blank telemetry to satisfy the UI requirement. 106 107 Args: 108 websocket (Any): The current WebSocket connection. 109 """ 110 payload = { 111 "action": "telemetry", 112 "data": { 113 "visited": [], 114 "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0}, 115 }, 116 } 117 await websocket.send(json.dumps(payload))
An agent controlled manually via the terminal using W, A, S, D keys instantly.
39 def __init__(self, server_uri: str = "ws://localhost:8765") -> None: 40 """Initialize the manual agent. 41 42 Args: 43 server_uri (str): URI of the simulation server. 44 """ 45 super().__init__(server_uri) 46 self.key_mapping = {"w": "N", "s": "S", "d": "E", "a": "W"}
Initialize the manual agent.
Args: server_uri (str): URI of the simulation server.
48 async def get_manual_action(self) -> Optional[str]: 49 """Prompts the user for a valid WASD input. 50 51 Returns: 52 Optional[str]: The chosen direction or None. 53 """ 54 if not self.current_state: 55 return None 56 57 if self.current_state.get("objective_reached"): 58 return None 59 60 valid_actions: List[str] = self.current_state.get("valid_actions", []) 61 62 print(f"\n--- Agent at {self.current_state.get('position')} ---") 63 print(f"Valid directions: {valid_actions}") 64 print("Press W/A/S/D to move... ", end="", flush=True) 65 66 while True: 67 # Run the blocking terminal read in a background thread 68 user_input = await asyncio.to_thread(getch) 69 70 # Catch Ctrl+C (ASCII character 3) for clean exits in raw mode 71 if user_input == "\x03": 72 print("\nExiting...") 73 sys.exit(0) 74 75 if user_input in self.key_mapping: 76 action = self.key_mapping[user_input] 77 78 if action in valid_actions: 79 print(action) 80 return action 81 else: 82 print( 83 f"\rObstacle at {action}. Try again (W/A/S/D)... ", 84 end="", 85 flush=True, 86 )
Prompts the user for a valid WASD input.
Returns: Optional[str]: The chosen direction or None.
88 async def deliberate_maze(self) -> Optional[str]: 89 """Logic for maps where 'target' is defined. 90 91 Returns: 92 Optional[str]: The chosen direction or None. 93 """ 94 return await self.get_manual_action()
Logic for maps where 'target' is defined.
Returns: Optional[str]: The chosen direction or None.
96 async def deliberate_room(self) -> Optional[str]: 97 """Logic for room clearing (no target). 98 99 Returns: 100 Optional[str]: The chosen direction or None. 101 """ 102 return await self.get_manual_action()
Logic for room clearing (no target).
Returns: Optional[str]: The chosen direction or None.
104 async def send_telemetry(self, websocket: Any) -> None: 105 """Send blank telemetry to satisfy the UI requirement. 106 107 Args: 108 websocket (Any): The current WebSocket connection. 109 """ 110 payload = { 111 "action": "telemetry", 112 "data": { 113 "visited": [], 114 "current_probs": {"N": 0.0, "S": 0.0, "E": 0.0, "W": 0.0}, 115 }, 116 } 117 await websocket.send(json.dumps(payload))
Send blank telemetry to satisfy the UI requirement.
Args: websocket (Any): The current WebSocket connection.