diff --git a/src/blueapi/core/event.py b/src/blueapi/core/event.py index fff9c833d..9308f8ef4 100644 --- a/src/blueapi/core/event.py +++ b/src/blueapi/core/event.py @@ -1,4 +1,5 @@ import itertools +import logging from abc import ABC, abstractmethod from collections.abc import Callable from typing import Generic, TypeVar @@ -9,6 +10,8 @@ #: Subscription token type S = TypeVar("S") +LOGGER = logging.getLogger(__name__) + class EventStream(ABC, Generic[E, S]): """ @@ -75,6 +78,11 @@ def publish(self, event: E, correlation_id: str | None = None) -> None: correlation_id: An optional ID that may be used to correlate this event with other events """ - + errs = [] for callback in list(self._subscriptions.values()): - callback(event, correlation_id) + try: + callback(event, correlation_id) + except Exception as e: + errs.append(e) + if errs: + raise ExceptionGroup(f"Error(s) publishing event: {event}", errs) diff --git a/tests/unit_tests/core/test_event.py b/tests/unit_tests/core/test_event.py index 4f39ebc11..2570d9ec7 100644 --- a/tests/unit_tests/core/test_event.py +++ b/tests/unit_tests/core/test_event.py @@ -2,6 +2,7 @@ from concurrent.futures import Future from dataclasses import dataclass from queue import Queue +from unittest import mock import pytest @@ -76,6 +77,22 @@ def test_correlation_id(publisher: EventPublisher[MyEvent]) -> None: assert f.result(timeout=_TIMEOUT) == correlation_id +def test_callback_exceptions_are_contained(publisher: EventPublisher[MyEvent]): + event = MyEvent("foo") + c_id = "bar" + + # First call should raise exception, next should be fine + handler = mock.Mock(side_effect=[ValueError("Bad Event"), ()]) + publisher.subscribe(handler) + publisher.subscribe(handler) + + with pytest.RaisesGroup(ValueError): + publisher.publish(event, c_id) + + # Both handlers should be called but the exception should still be raised + handler.assert_has_calls([mock.call(event, c_id), mock.call(event, c_id)]) + + def _drain(queue: Queue) -> Iterable: while not queue.empty(): yield queue.get_nowait()