@@ -119,7 +119,7 @@ def __init__(
119
119
topic : str = "" ,
120
120
) -> None :
121
121
# Storage
122
- self ._event_queue = Queue [EventBatch ](maxsize = max_queue_size )
122
+ self ._event_queue = Queue [Optional [ EventBatch ] ](maxsize = max_queue_size )
123
123
self ._buffer = deque [tuple [int , bytes ]](maxlen = buffer_steps )
124
124
125
125
# ZMQ sockets
@@ -151,9 +151,9 @@ def publish(self, events: EventBatch) -> None:
151
151
def shutdown (self ) -> None :
152
152
"""Stop the publisher thread and clean up resources."""
153
153
self ._running = False
154
+ self ._event_queue .put_nowait (None )
154
155
155
156
start = time .time ()
156
-
157
157
pending_items = True
158
158
while pending_items and (time .time () - start < self .SHUTDOWN_TIMEOUT ):
159
159
pending_items = not self ._event_queue .empty ()
@@ -177,8 +177,7 @@ def shutdown(self) -> None:
177
177
if self ._replay is not None :
178
178
self ._replay .close (linger = 0 )
179
179
finally :
180
- # Do not terminate context; other sockets may use it
181
- pass
180
+ pass # Do not terminate context; other sockets may use it
182
181
183
182
def _socket_setup (self ) -> None :
184
183
"""Initialize sockets
@@ -211,7 +210,7 @@ def _publisher_thread(self) -> None:
211
210
212
211
assert self ._pub is not None # narrows type for mypy
213
212
214
- while self ._running or not self ._event_queue .empty () :
213
+ while self ._running or self ._event_queue .qsize () > 0 :
215
214
# --- replay (non-critical) ---------------------------------
216
215
if self ._replay is not None and self ._replay .poll (0 ):
217
216
try :
@@ -222,6 +221,8 @@ def _publisher_thread(self) -> None:
222
221
# --- main queue (critical) ---------------------------------
223
222
try :
224
223
event = self ._event_queue .get (timeout = 0.1 )
224
+ if event is None :
225
+ break # Sentinel received, exit thread
225
226
except queue .Empty :
226
227
continue
227
228
0 commit comments