agents.base_agent

Abstract base class for simulation agents in the SI2 - Maze environment.

  1"""
  2Abstract base class for simulation agents in the SI2 - Maze environment.
  3"""
  4
  5import asyncio
  6import json
  7import logging
  8from typing import Any, Dict, Optional
  9
 10import websockets
 11
 12# Configure standard logging
 13logging.basicConfig(level=logging.INFO, format="%(asctime)s - AGENT - %(levelname)s - %(message)s")
 14
 15
 16class BaseAgent:
 17    """
 18    Abstract base class for simulation agents.
 19    Handles all WebSocket communications, state updates, routing, and UI telemetry.
 20    Subclasses MUST implement the deliberate_maze and deliberate_room methods.
 21    """
 22
 23    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
 24        """Initialize the agent.
 25
 26        Args:
 27            server_uri (str): URI of the simulation server.
 28        """
 29        self.server_uri: str = server_uri
 30        self.current_state: Optional[Dict[str, Any]] = None
 31        self.step_delay: float = 0.15  # Configurable delay between moves
 32        self.idle_logged: bool = False
 33
 34    async def run(self) -> None:
 35        """Main connection loop.
 36
 37        Do not override this unless modifying network protocols.
 38        """
 39        try:
 40            async with websockets.connect(self.server_uri) as websocket:
 41                await websocket.send(json.dumps({"client": "agent"}))
 42                logging.info(f"Connected to {self.server_uri}")
 43
 44                async for message in websocket:
 45                    if isinstance(message, bytes):
 46                        message = message.decode("utf-8")
 47                    data = json.loads(message)
 48
 49                    if data.get("type") == "error":
 50                        logging.error(f"Server Error: {data.get('message')}")
 51                        continue
 52
 53                    if data.get("type") == "state":
 54                        self.current_state = data
 55
 56                        if self.current_state and self.current_state.get("objective_reached"):
 57                            if not self.idle_logged:
 58                                await self.deliberate()
 59                                await self.send_telemetry(websocket)
 60                                logging.info("Objective reached. Idling...")
 61                                self.idle_logged = True
 62                            continue
 63                        else:
 64                            self.idle_logged = False
 65
 66                        action = await self.deliberate()
 67
 68                        if action:
 69                            await self.send_telemetry(websocket)
 70                            await websocket.send(json.dumps({"action": "move", "direction": action}))
 71                            await asyncio.sleep(self.step_delay)
 72
 73                    elif data.get("type") == "reset":
 74                        self.reset_memory()
 75                        logging.info("Memory wiped due to simulation reset.")
 76
 77        except Exception as e:
 78            logging.error(f"Connection error: {e}")
 79
 80    async def deliberate(self) -> Optional[str]:
 81        """Routes the deliberation based on the presence of a target (Maze vs Room).
 82
 83        Returns:
 84            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
 85        """
 86        if not self.current_state:
 87            return None
 88
 89        if self.current_state.get("target") is not None:
 90            return await self.deliberate_maze()
 91        else:
 92            return await self.deliberate_room()
 93
 94    async def deliberate_maze(self) -> Optional[str]:
 95        """Logic for maze navigation.
 96
 97        Raises:
 98            NotImplementedError: Subclasses must implement this.
 99
100        Returns:
101            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
102        """
103        raise NotImplementedError("Subclasses must implement deliberate_maze()")
104
105    async def deliberate_room(self) -> Optional[str]:
106        """Logic for room exploration.
107
108        Raises:
109            NotImplementedError: Subclasses must implement this.
110
111        Returns:
112            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
113        """
114        raise NotImplementedError("Subclasses must implement deliberate_room()")
115
116    def reset_memory(self) -> None:
117        """Clears internal tracking variables when the simulation resets."""
118        pass
119
120    async def send_telemetry(self, websocket: Any) -> None:
121        """Packages internal memory/probabilities and sends them to the frontend UI.
122
123        Args:
124            websocket (Any): The current WebSocket connection.
125        """
126        pass
class BaseAgent:
 17class BaseAgent:
 18    """
 19    Abstract base class for simulation agents.
 20    Handles all WebSocket communications, state updates, routing, and UI telemetry.
 21    Subclasses MUST implement the deliberate_maze and deliberate_room methods.
 22    """
 23
 24    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
 25        """Initialize the agent.
 26
 27        Args:
 28            server_uri (str): URI of the simulation server.
 29        """
 30        self.server_uri: str = server_uri
 31        self.current_state: Optional[Dict[str, Any]] = None
 32        self.step_delay: float = 0.15  # Configurable delay between moves
 33        self.idle_logged: bool = False
 34
 35    async def run(self) -> None:
 36        """Main connection loop.
 37
 38        Do not override this unless modifying network protocols.
 39        """
 40        try:
 41            async with websockets.connect(self.server_uri) as websocket:
 42                await websocket.send(json.dumps({"client": "agent"}))
 43                logging.info(f"Connected to {self.server_uri}")
 44
 45                async for message in websocket:
 46                    if isinstance(message, bytes):
 47                        message = message.decode("utf-8")
 48                    data = json.loads(message)
 49
 50                    if data.get("type") == "error":
 51                        logging.error(f"Server Error: {data.get('message')}")
 52                        continue
 53
 54                    if data.get("type") == "state":
 55                        self.current_state = data
 56
 57                        if self.current_state and self.current_state.get("objective_reached"):
 58                            if not self.idle_logged:
 59                                await self.deliberate()
 60                                await self.send_telemetry(websocket)
 61                                logging.info("Objective reached. Idling...")
 62                                self.idle_logged = True
 63                            continue
 64                        else:
 65                            self.idle_logged = False
 66
 67                        action = await self.deliberate()
 68
 69                        if action:
 70                            await self.send_telemetry(websocket)
 71                            await websocket.send(json.dumps({"action": "move", "direction": action}))
 72                            await asyncio.sleep(self.step_delay)
 73
 74                    elif data.get("type") == "reset":
 75                        self.reset_memory()
 76                        logging.info("Memory wiped due to simulation reset.")
 77
 78        except Exception as e:
 79            logging.error(f"Connection error: {e}")
 80
 81    async def deliberate(self) -> Optional[str]:
 82        """Routes the deliberation based on the presence of a target (Maze vs Room).
 83
 84        Returns:
 85            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
 86        """
 87        if not self.current_state:
 88            return None
 89
 90        if self.current_state.get("target") is not None:
 91            return await self.deliberate_maze()
 92        else:
 93            return await self.deliberate_room()
 94
 95    async def deliberate_maze(self) -> Optional[str]:
 96        """Logic for maze navigation.
 97
 98        Raises:
 99            NotImplementedError: Subclasses must implement this.
100
101        Returns:
102            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
103        """
104        raise NotImplementedError("Subclasses must implement deliberate_maze()")
105
106    async def deliberate_room(self) -> Optional[str]:
107        """Logic for room exploration.
108
109        Raises:
110            NotImplementedError: Subclasses must implement this.
111
112        Returns:
113            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
114        """
115        raise NotImplementedError("Subclasses must implement deliberate_room()")
116
117    def reset_memory(self) -> None:
118        """Clears internal tracking variables when the simulation resets."""
119        pass
120
121    async def send_telemetry(self, websocket: Any) -> None:
122        """Packages internal memory/probabilities and sends them to the frontend UI.
123
124        Args:
125            websocket (Any): The current WebSocket connection.
126        """
127        pass

Abstract base class for simulation agents. Handles all WebSocket communications, state updates, routing, and UI telemetry. Subclasses MUST implement the deliberate_maze and deliberate_room methods.

BaseAgent(server_uri: str = 'ws://localhost:8765')
24    def __init__(self, server_uri: str = "ws://localhost:8765") -> None:
25        """Initialize the agent.
26
27        Args:
28            server_uri (str): URI of the simulation server.
29        """
30        self.server_uri: str = server_uri
31        self.current_state: Optional[Dict[str, Any]] = None
32        self.step_delay: float = 0.15  # Configurable delay between moves
33        self.idle_logged: bool = False

Initialize the agent.

Args: server_uri (str): URI of the simulation server.

server_uri: str
current_state: Optional[Dict[str, Any]]
step_delay: float
idle_logged: bool
async def run(self) -> None:
35    async def run(self) -> None:
36        """Main connection loop.
37
38        Do not override this unless modifying network protocols.
39        """
40        try:
41            async with websockets.connect(self.server_uri) as websocket:
42                await websocket.send(json.dumps({"client": "agent"}))
43                logging.info(f"Connected to {self.server_uri}")
44
45                async for message in websocket:
46                    if isinstance(message, bytes):
47                        message = message.decode("utf-8")
48                    data = json.loads(message)
49
50                    if data.get("type") == "error":
51                        logging.error(f"Server Error: {data.get('message')}")
52                        continue
53
54                    if data.get("type") == "state":
55                        self.current_state = data
56
57                        if self.current_state and self.current_state.get("objective_reached"):
58                            if not self.idle_logged:
59                                await self.deliberate()
60                                await self.send_telemetry(websocket)
61                                logging.info("Objective reached. Idling...")
62                                self.idle_logged = True
63                            continue
64                        else:
65                            self.idle_logged = False
66
67                        action = await self.deliberate()
68
69                        if action:
70                            await self.send_telemetry(websocket)
71                            await websocket.send(json.dumps({"action": "move", "direction": action}))
72                            await asyncio.sleep(self.step_delay)
73
74                    elif data.get("type") == "reset":
75                        self.reset_memory()
76                        logging.info("Memory wiped due to simulation reset.")
77
78        except Exception as e:
79            logging.error(f"Connection error: {e}")

Main connection loop.

Do not override this unless modifying network protocols.

async def deliberate(self) -> Optional[str]:
81    async def deliberate(self) -> Optional[str]:
82        """Routes the deliberation based on the presence of a target (Maze vs Room).
83
84        Returns:
85            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
86        """
87        if not self.current_state:
88            return None
89
90        if self.current_state.get("target") is not None:
91            return await self.deliberate_maze()
92        else:
93            return await self.deliberate_room()

Routes the deliberation based on the presence of a target (Maze vs Room).

Returns: Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.

async def deliberate_maze(self) -> Optional[str]:
 95    async def deliberate_maze(self) -> Optional[str]:
 96        """Logic for maze navigation.
 97
 98        Raises:
 99            NotImplementedError: Subclasses must implement this.
100
101        Returns:
102            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
103        """
104        raise NotImplementedError("Subclasses must implement deliberate_maze()")

Logic for maze navigation.

Raises: NotImplementedError: Subclasses must implement this.

Returns: Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.

async def deliberate_room(self) -> Optional[str]:
106    async def deliberate_room(self) -> Optional[str]:
107        """Logic for room exploration.
108
109        Raises:
110            NotImplementedError: Subclasses must implement this.
111
112        Returns:
113            Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.
114        """
115        raise NotImplementedError("Subclasses must implement deliberate_room()")

Logic for room exploration.

Raises: NotImplementedError: Subclasses must implement this.

Returns: Optional[str]: The chosen direction ('N', 'S', 'E', 'W') or None.

def reset_memory(self) -> None:
117    def reset_memory(self) -> None:
118        """Clears internal tracking variables when the simulation resets."""
119        pass

Clears internal tracking variables when the simulation resets.

async def send_telemetry(self, websocket: Any) -> None:
121    async def send_telemetry(self, websocket: Any) -> None:
122        """Packages internal memory/probabilities and sends them to the frontend UI.
123
124        Args:
125            websocket (Any): The current WebSocket connection.
126        """
127        pass

Packages internal memory/probabilities and sends them to the frontend UI.

Args: websocket (Any): The current WebSocket connection.