Skip to content

Visualisation Utilities

hades.visualisation.websockets

output events or event results to websockets for live visualisation using custom frontend TS/JS code or other clients

EventContext

Bases: BaseModel

full details of an event notification's result

Source code in hades/visualisation/websockets.py
class EventContext(BaseModel):
    """full details of an event notification's result"""

    source_process: ProcessDetails
    target_process: ProcessDetails
    event: EventWithType
    target_process_response: NotificationResponse
    causing_event: EventWithType | None = None

HadesWS

Bases: Hades

Hades with a websocket server bundled. Waits for at least one client to connect before starting the simulation

Source code in hades/visualisation/websockets.py
class HadesWS(Hades):
    """Hades with a websocket server bundled. Waits for at least one client to connect before starting the simulation"""

    def __init__(
        self,
        random_pomegranate_seed: str | None = "hades",
        max_queue_size: int = 0,
        batch_event_notification_timeout: int | None = 60 * 5,
        record_results: bool = True,
        record_event_history: bool = True,
        use_no_ack_cache: bool = False,
        track_causing_events: bool = True,
        ws_server_host: str = "localhost",
        ws_server_port: int = 8765,
        ws_server=None,
    ) -> None:
        super().__init__(
            random_pomegranate_seed,
            max_queue_size,
            batch_event_notification_timeout,
            record_results,
            record_event_history,
            use_no_ack_cache,
            track_causing_events,
        )
        self._ws_server_host = ws_server_host
        self._ws_server_port = ws_server_port
        self._ws_server = ws_server
        self._ws_clients: set[Any] = set()

    async def ws_server(self, websocket):
        """received client messages"""
        self._ws_clients.add(websocket)
        while True:
            try:
                message = await websocket.recv()
                _logger.debug("received message: %s from ws client", message)
            except ConnectionClosed:
                break

    async def _handle_event_results(
        self,
        results: list[NotificationResponse | BaseException],
        event_source_targets: list[EventSourceTargetCause],
    ):
        """asynchronously rebroadcast all event results to all ws clients"""
        rebroadcast_ws_events = []
        for result, (event, source_process, target_process, causing_event) in zip(results, event_source_targets):
            if isinstance(result, NotificationResponse):
                for client in self._ws_clients:
                    rebroadcast_ws_events.append(
                        asyncio.wait_for(
                            client.send(
                                EventContext(
                                    source_process=ProcessDetails.model_construct(
                                        process_name=source_process.process_name,
                                        instance_identifier=source_process.instance_identifier,
                                    ),
                                    target_process=ProcessDetails.model_construct(
                                        process_name=target_process.process_name,
                                        instance_identifier=target_process.instance_identifier,
                                    ),
                                    event=EventWithType(event_type=event.name, event_contents=event),
                                    target_process_response=result,
                                    causing_event=(
                                        None
                                        if causing_event is None
                                        else EventWithType(event_type=causing_event.name, event_contents=causing_event)
                                    ),
                                ).model_dump_json()
                            ),
                            timeout=self._batch_event_notification_timeout,
                        )
                    )
        await asyncio.gather(*rebroadcast_ws_events)
        await super()._handle_event_results(results, event_source_targets)

    async def run(self, until: int | None = None):
        """start a server if none is injected and wait for a client connection"""
        # Start WebSocket server
        if self._ws_server is None:
            self._ws_server = await websockets.serve(self.ws_server, self._ws_server_host, self._ws_server_port)  # type: ignore

        # Wait for at least one client to connect
        while not self._ws_clients:
            await asyncio.sleep(1)  # Wait for 1 second before checking again
        await super().run(until=until)
        self._ws_server.close()
        await self._ws_server.wait_closed()

run(until=None) async

start a server if none is injected and wait for a client connection

Source code in hades/visualisation/websockets.py
async def run(self, until: int | None = None):
    """start a server if none is injected and wait for a client connection"""
    # Start WebSocket server
    if self._ws_server is None:
        self._ws_server = await websockets.serve(self.ws_server, self._ws_server_host, self._ws_server_port)  # type: ignore

    # Wait for at least one client to connect
    while not self._ws_clients:
        await asyncio.sleep(1)  # Wait for 1 second before checking again
    await super().run(until=until)
    self._ws_server.close()
    await self._ws_server.wait_closed()

ws_server(websocket) async

received client messages

Source code in hades/visualisation/websockets.py
async def ws_server(self, websocket):
    """received client messages"""
    self._ws_clients.add(websocket)
    while True:
        try:
            message = await websocket.recv()
            _logger.debug("received message: %s from ws client", message)
        except ConnectionClosed:
            break

WebSocketProcess

Bases: Process

simple process which sends all the events it receives as JSON to a websockets server this could be used for visualisation or monitoring

Source code in hades/visualisation/websockets.py
class WebSocketProcess(Process):
    """simple process which sends all the events it receives as JSON to a websockets server
    this could be used for visualisation or monitoring
    """

    def __init__(self, websocket_connection) -> None:
        self._connection = websocket_connection
        super().__init__()

    async def _send_event(self, event: Event):
        await self._connection.send(
            EventWithType.model_construct(event_type=event.name, event_contents=event).model_dump_json()
        )

    async def notify(self, event: Event):
        match event:
            case Event() as e:
                await self._send_event(event)
                return NotificationResponse.ACK
        return NotificationResponse.NO_ACK

hades.visualisation.networkx

to_digraph(underworld, allowed_responses=None)

build from hades's event_results object the full set of connections as a networkx digraph

Source code in hades/visualisation/networkx.py
def to_digraph(underworld: Hades, allowed_responses: set[NotificationResponse] | None = None) -> MultiDiGraph:
    """build from hades's event_results object the full set of connections as a networkx digraph"""
    if allowed_responses is None:
        allowed_responses = {NotificationResponse.ACK}
    graph = MultiDiGraph()
    added_edges: set[tuple[str, str, str]] = set()
    for (event, source_process_name, source_process_instance_id, _), event_notifications in sorted(
        underworld.event_results.items(), key=lambda x: x[0][0].t
    ):
        source_node = f"{source_process_name} - {source_process_instance_id}"
        graph.add_node(source_node)
        for (target_process_name, target_process_instance_id), notification_response in event_notifications.items():
            target_node = f"{target_process_name} - {target_process_instance_id}"
            graph.add_node(target_node)
            if notification_response in allowed_responses and (source_node, target_node, event.name) not in added_edges:
                graph.add_edge(source_node, target_node, label=event.name)
                added_edges.add((source_node, target_node, event.name))
    return graph

write_mermaid(G)

output a networkx digraph as a simple mermaid graph

Source code in hades/visualisation/networkx.py
def write_mermaid(G: MultiDiGraph) -> str:
    """output a networkx digraph as a simple mermaid graph"""
    lines = []
    for u_node, v_node, edge in sorted(G.edges(data=True), key=lambda x: x[0] + x[1] + x[2]["label"]):
        lines.append(f"{u_node.replace(' ', '')}({u_node}) -- {edge['label']} --> {v_node.replace(' ', '')}({v_node})")
    mermaid_lines = "\n".join(lines)
    return f"graph LR\n{mermaid_lines}"