Source code for masterpiece.supervisor
"""
supervisor.py
This module implements a SupervisorThread, a dedicated supervisory
monitor for a multi-threaded application. The design pattern is:
• Each worker thread receives a shared error_queue.
• When a worker experiences an unhandled exception, it pushes a tuple:
(thread, exception, traceback_string)
into the error_queue instead of silently dying.
• The SupervisorThread runs as a daemon and continuously consumes
error reports from this queue.
• For each crashed worker, the supervisor logs the failure, stops
the thread safely, and invokes custom recreate() logic to spawn
and start a replacement thread.
This architecture creates a fault-tolerant micro-supervisor system
similar to Erlang/Elixir OTP supervisors: threads never die silently,
and the system can self-heal by restarting failed components.
"""
from multiprocessing import Event
from threading import Thread
from queue import Queue, Empty
from typing import Tuple
from masterpiece.masterpiecethread import MasterPieceThread
from typing import cast
[docs]
class SupervisorThread(Thread):
"""Supervisor thread that monitors worker thread failures.
This thread listens on a shared error queue. Worker threads push
(thread, exception, traceback_string) tuples to this queue when they
encounter unhandled exceptions. The supervisor reacts by:
1. Logging the crash event.
2. Stopping and joining the crashed thread.
3. Recreating the worker using its custom `recreate()` method.
4. Starting the replacement thread.
The supervisor runs as a daemon thread so it does not block
application shutdown.
Attributes:
q (Queue[Tuple[Thread, BaseException, str]]):
A thread-safe queue containing crash reports from workers.
"""
[docs]
def __init__(self, error_queue: Queue[Tuple[Thread, BaseException, str]]) -> None:
"""
Args:
error_queue: Shared queue for crash reports from worker threads.
"""
super().__init__(daemon=True)
self.q: Queue[Tuple[Thread, BaseException, str]] = error_queue
self._stop_event: Event = Event()
[docs]
def stop(self) -> None:
"""Request the supervisor to exit its loop cleanly."""
self._stop_event.set()
[docs]
def run(self) -> None:
"""Main supervisor loop."""
while not self._stop_event.is_set():
try:
# Block until a crash report is available, timeout to check stop flag
item = self.q.get(timeout=1.0)
except Empty:
continue
thread, exc, tb = item
# Only handle MasterPieceThread instances
if isinstance(thread, MasterPieceThread):
mt: MasterPieceThread = cast(MasterPieceThread, thread)
print(f"[Supervisor] ERROR: {mt.name} crashed: {exc}\n{tb}")
# Stop and join old thread safely
mt.stop()
mt.join()
# Recreate and restart
try:
replacement: MasterPieceThread = mt.recreate()
replacement.start()
print(f"[Supervisor] Restarted thread {replacement.name}")
replacement.warning(f"Thread {mt.name} crashed with exception: {exc}. Restarted as {replacement.name}.")
except Exception as recreate_exc:
print(f"[Supervisor] Failed to recreate thread {mt.name}: {recreate_exc}")
else:
print(f"[Supervisor] Thread {thread} is not a MasterPieceThread, cannot recreate")