From 2201090f44b7746af8d02ed7e8d28815ce5589ae Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 12 Dec 2025 15:42:48 +0000 Subject: [PATCH 1/3] Ignore (but log) errors from event stream callbacks If a callback raises an exception, it shouldn't prevent other callbacks receiving the same event. It should also not raise the exception at the call site that published the event. --- src/blueapi/core/event.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/blueapi/core/event.py b/src/blueapi/core/event.py index fff9c833d5..f6103cd0d6 100644 --- a/src/blueapi/core/event.py +++ b/src/blueapi/core/event.py @@ -1,4 +1,5 @@ import itertools +import logging from abc import ABC, abstractmethod from collections.abc import Callable from typing import Generic, TypeVar @@ -9,6 +10,8 @@ #: Subscription token type S = TypeVar("S") +LOGGER = logging.getLogger(__name__) + class EventStream(ABC, Generic[E, S]): """ @@ -77,4 +80,10 @@ def publish(self, event: E, correlation_id: str | None = None) -> None: """ for callback in list(self._subscriptions.values()): - callback(event, correlation_id) + try: + callback(event, correlation_id) + except Exception as e: + LOGGER.error( + f"Failed to send event {event} with {correlation_id=}", + exc_info=e, + ) From 59a454fb87a77fcb7fd27bd6f648cdd511971556 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Fri, 12 Dec 2025 16:41:35 +0000 Subject: [PATCH 2/3] Test event callback exception handling --- tests/unit_tests/core/test_event.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/unit_tests/core/test_event.py b/tests/unit_tests/core/test_event.py index 4f39ebc11f..24ccb5c856 100644 --- a/tests/unit_tests/core/test_event.py +++ b/tests/unit_tests/core/test_event.py @@ -2,6 +2,7 @@ from concurrent.futures import Future from dataclasses import dataclass from queue import Queue +from unittest import mock import pytest @@ -76,6 +77,21 @@ def test_correlation_id(publisher: EventPublisher[MyEvent]) -> None: assert f.result(timeout=_TIMEOUT) == correlation_id +def test_callback_exceptions_are_contained(publisher: EventPublisher[MyEvent]): + event = MyEvent("foo") + c_id = "bar" + + # First call should raise exception, next should be fine + handler = mock.Mock(side_effect=[ValueError("Bad Event"), ()]) + publisher.subscribe(handler) + publisher.subscribe(handler) + + publisher.publish(event, c_id) + + # Both handlers should be called and the exception should not be raised from publish + handler.assert_has_calls([mock.call(event, c_id), mock.call(event, c_id)]) + + def _drain(queue: Queue) -> Iterable: while not queue.empty(): yield queue.get_nowait() From 7d6cd80ef8e1bb272a8c3c232f1465f8e2e9a864 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 13 Jan 2026 14:07:10 +0000 Subject: [PATCH 3/3] Wrap callback exceptions into single group --- src/blueapi/core/event.py | 9 ++++----- tests/unit_tests/core/test_event.py | 5 +++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/blueapi/core/event.py b/src/blueapi/core/event.py index f6103cd0d6..9308f8ef4f 100644 --- a/src/blueapi/core/event.py +++ b/src/blueapi/core/event.py @@ -78,12 +78,11 @@ def publish(self, event: E, correlation_id: str | None = None) -> None: correlation_id: An optional ID that may be used to correlate this event with other events """ - + errs = [] for callback in list(self._subscriptions.values()): try: callback(event, correlation_id) except Exception as e: - LOGGER.error( - f"Failed to send event {event} with {correlation_id=}", - exc_info=e, - ) + errs.append(e) + if errs: + raise ExceptionGroup(f"Error(s) publishing event: {event}", errs) diff --git a/tests/unit_tests/core/test_event.py b/tests/unit_tests/core/test_event.py index 24ccb5c856..2570d9ec7f 100644 --- a/tests/unit_tests/core/test_event.py +++ b/tests/unit_tests/core/test_event.py @@ -86,9 +86,10 @@ def test_callback_exceptions_are_contained(publisher: EventPublisher[MyEvent]): publisher.subscribe(handler) publisher.subscribe(handler) - publisher.publish(event, c_id) + with pytest.RaisesGroup(ValueError): + publisher.publish(event, c_id) - # Both handlers should be called and the exception should not be raised from publish + # Both handlers should be called but the exception should still be raised handler.assert_has_calls([mock.call(event, c_id), mock.call(event, c_id)])