Process
The entity doing the main body of work within the simulation is the Process
. The notify(event)
method of every process registered
(via hades_instance.register_process()
) will be called for each event in the queue for the next time step as a group of asynchronous tasks.
That is to say: events are broadcast and handled asynchronously by the registered processes.
For justification of this see the design justification.
Pattern matching events¶
The pattern suggested for processes, is for the process to match
events that it uses and respond with some sort of acknowledgement of what was done with the event. E.g.
from hades import NotificationResponse, Hades, Event
class MyProcess(Process):
def __init__(self, important_identifiers: list[str]):
self._important_identifiers = important_identifiers
async def notify(self event: Event):
match event:
case SomeEvent(t=t, some_identifier=some_identifier, data=data):
if some_identifier not in important_identifiers:
return NotificationResponse.ACK_BUT_IGNORED
data_for_other_event = self._do_something_with_data(d)
self.add_event(OtherEvent(t=t+1, data=data_for_other_event))
return NotificationResponse.ACK
return NotificationResponse.NO_ACK
Process Notifications and Asynchronous Handling¶
The Hades Framework's core functionality involves handling and broadcasting events asynchronously. The implementation uses Python's native asyncio library for task scheduling and execution. All events for a given time-step are broadcast to all registered processes and handled independently within their context.
Asynchronous Behaviour and Data Consistency¶
However, as powerful as this design pattern may be, it also introduces a certain level of complexity when dealing with shared data resources.
Consider the scenario where multiple events are sent to a single process, and each event triggers modifications on a shared data resource, for example, a list within the process. Since all the events are handled asynchronously and independently, the order of execution is not guaranteed, and data inconsistency can arise due to race conditions.
This is a critical aspect of using the Hades Framework: always ensure data consistency and thread-safety when designing your processes.
Managing Asynchronous Operations within Processes¶
Here are some recommended ways to handle shared data within processes:
- Use synchronization primitives: Python's asyncio library provides several synchronization primitives like locks (asyncio.Lock()) and semaphores (asyncio.Semaphore()). Use these primitives to ensure only one coroutine within a process modifies the shared resource at any given time.
- Immutable data structures: If possible, use immutable data structures. This eliminates the possibility of shared data being modified concurrently by different coroutines, thus avoiding race conditions.
- Avoid shared state: Whenever possible, avoid using shared state within processes. Design your processes such that they work primarily with local data (from the event).
Note
There is no need to worry about this between processes (as they shouldn't share mutable state), only multiple events handled within the same process.
Example of this:
# Copyright 2023 Brit Group Services Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from hades import Event, Hades, NotificationResponse, Process
class EventOne(Event):
pass
class EventTwo(Event):
pass
class MyProcess(Process):
def __init__(self):
super().__init__()
self._event_data_list = []
async def notify(self, event: Event):
match event:
case EventOne(t=t):
print("event one arrives first")
await asyncio.sleep(0.1) # Simulate a delay
self._event_data_list.append("One")
return NotificationResponse.ACK
case EventTwo(t=t):
print("event two arrives second")
await asyncio.sleep(0.05)
self._event_data_list.append("Two")
return NotificationResponse.ACK
return NotificationResponse.NO_ACK
class MyLockingProcess(MyProcess):
def __init__(self):
super().__init__()
self.lock = asyncio.Lock()
async def notify(self, event: Event):
async with self.lock:
return await super().notify(event)
async def test_async_notify(capsys):
hades = Hades()
process = MyProcess()
hades.register_process(process)
# note these arrive at the same timestep
hades.add_event(process, EventOne(t=0))
hades.add_event(process, EventTwo(t=0))
await hades.run()
# two happens before one
assert process._event_data_list == ["Two", "One"]
# even though one arrives before two
assert capsys.readouterr().out == """event one arrives first
event two arrives second
"""
async def test_async_notify_with_lock(capsys):
hades = Hades()
process = MyLockingProcess()
hades.register_process(process)
hades.add_event(process, EventOne(t=0))
hades.add_event(process, EventTwo(t=0))
await hades.run()
# with the lock events happen in the order they arrive.
assert process._event_data_list == ["One", "Two"]
assert capsys.readouterr().out == """event one arrives first
event two arrives second
"""
PredefinedEventAdder
¶
Bases: Process
adds some predefined events to hades then unregisters itself to avoid any overhead
Source code in hades/core/process.py
Process
¶
Source code in hades/core/process.py
instance_identifier: str
property
¶
unique identifier for the process. This does not need to be globally unique however, multiple instances of a given process_name with the same instance identifier will not be allowed across the styx (into hades)
RandomProcess
¶
Bases: Process
a process which adds a .random attribute with the given seed