Part 9: Threading Model¶
How does QualCoder v2 handle concurrency between the Qt UI and MCP server?
Two Thread Domains¶
graph TB
subgraph Main ["Main Thread (Qt + asyncio)"]
QtLoop["Qt Event Loop"]
Asyncio["asyncio via qasync"]
EventBus["EventBus.publish()"]
SignalEmit["Signal.emit()"]
Timer["QTimer debouncing"]
end
subgraph MCP ["MCP Worker Threads"]
ToThread["asyncio.to_thread()"]
RepoRead["repo.get_all()"]
RepoWrite["repo.save()"]
end
ToThread -->|thread-local conn| RepoRead
RepoWrite -->|commit| RepoRead
MCP -->|QMetaObject.invokeMethod| Main
Pattern 1: Unified Event Loop (qasync)¶
QualCoder merges Qt and asyncio into one event loop using qasync:
# src/main.py
import qasync
class QualCoderApp:
def __init__(self):
self._app = QApplication(sys.argv)
# qasync merges Qt event loop with asyncio
# MCP server runs as asyncio coroutine on the same loop
Why this matters:
- The MCP server (aiohttp) runs on the same loop as Qt
- No separate thread needed for HTTP handling
- Tool calls still use asyncio.to_thread() for blocking DB operations
Pattern 2: Thread-Safe Signal Emission¶
The most critical threading pattern. Background threads cannot call Signal.emit() directly - Qt requires signals to be emitted on the main thread.
# src/shared/infra/signal_bridge/base.py
class BaseSignalBridge(QObject):
def __init__(self):
self._emission_queue = queue.Queue() # Thread-safe
def _emit_threadsafe(self, signal, payload):
if is_main_thread():
signal.emit(payload) # Direct emit
else:
# Queue the emission and schedule processing on main thread
self._emission_queue.put((signal, payload))
QMetaObject.invokeMethod(
self,
"_do_emit",
Qt.ConnectionType.QueuedConnection, # Marshals to main thread
)
@Slot()
def _do_emit(self):
"""Drains queue on main thread."""
while True:
try:
signal, payload = self._emission_queue.get_nowait()
signal.emit(payload) # Safe: we're on main thread
except queue.Empty:
break
The flow:
sequenceDiagram
participant Worker as MCP Worker Thread
participant Queue as queue.Queue
participant Qt as Qt Main Thread
participant Widget as UI Widget
Worker->>Worker: repo.save() + event_bus.publish()
Worker->>Queue: _emission_queue.put((signal, payload))
Worker->>Qt: QMetaObject.invokeMethod("_do_emit", QueuedConnection)
Note over Qt: Qt event loop picks up queued call
Qt->>Queue: get_nowait()
Queue-->>Qt: (signal, payload)
Qt->>Widget: signal.emit(payload)
Widget->>Widget: Update UI
Pattern 3: EventBus with RLock¶
The EventBus uses a reentrant lock and copies handler lists under lock, then invokes without lock:
# src/shared/infra/event_bus.py
class EventBus:
def __init__(self):
self._lock = RLock() # Reentrant: same thread can re-acquire
self._handlers: dict[str, list[Handler]] = {}
def subscribe(self, event_type, handler):
with self._lock:
self._handlers.setdefault(event_type, []).append(handler)
def publish(self, event):
# Copy under lock
with self._lock:
type_handlers = list(self._handlers.get(event_type, []))
# Invoke WITHOUT lock (prevents deadlock if handler subscribes)
for handler in type_handlers:
handler(event)
Why RLock? A handler might publish another event (reentrant), which needs to acquire the lock again on the same thread.
Why copy? If we held the lock during handler invocation, a handler that calls subscribe() would deadlock (even with RLock, if handler is on another thread).
Pattern 4: MCP asyncio.to_thread¶
MCP tool calls run blocking DB operations in worker threads:
# Conceptual flow in MCP server
async def handle_tool_call(tool_name, args):
# asyncio.to_thread() runs in ThreadPoolExecutor
result = await asyncio.to_thread(
server._execute_tool, tool_name, args, None
)
return result
# Inside _execute_tool (runs in worker thread):
def _execute_tool(self, tool_name, args, context):
# repo.get_all() → Session → thread-local connection
codes = self._ctx.coding_context.code_repo.get_all()
return {"codes": codes}
Each worker thread gets its own SQLite connection via SingletonThreadPool (see Part 8).
Pattern 5: QTimer Debouncing (No Threading)¶
For events that arrive frequently (e.g., mutations for version control), use QTimer instead of threads:
# src/contexts/projects/infra/version_control_listener.py
class VersionControlListener:
DEBOUNCE_MS = 500
def __init__(self, event_bus, ...):
self._pending_events = []
self._timer = QTimer()
self._timer.setSingleShot(True)
self._timer.setInterval(self.DEBOUNCE_MS)
self._timer.timeout.connect(self._flush)
def _on_mutation(self, event):
self._pending_events.append(event)
self._timer.start() # Restarts on each event
def _flush(self):
events = tuple(self._pending_events)
self._pending_events.clear()
auto_commit(...)
Why no threading? All events arrive on the main thread (via EventBus), so QTimer is simpler and correct.
Thread Safety Utilities¶
# src/shared/infra/signal_bridge/thread_utils.py
def is_main_thread() -> bool:
app = QCoreApplication.instance()
if app is not None:
return QThread.currentThread() == app.thread()
return threading.current_thread() is threading.main_thread()
class ThreadChecker:
@staticmethod
def assert_main_thread(context=""):
if not is_main_thread():
raise RuntimeError(
f"Expected main thread but on '{get_current_thread_name()}'"
)
Use ThreadChecker.assert_main_thread() in code that must run on the UI thread (e.g., dialog creation).
Summary¶
| Pattern | When to Use | Key Mechanism |
|---|---|---|
| qasync unified loop | Always (app startup) | Merge Qt + asyncio |
| _emit_threadsafe | Signal emission from any thread | queue.Queue + QMetaObject |
| RLock + copy | EventBus publish/subscribe | Copy under lock, invoke without |
| asyncio.to_thread | MCP tool DB operations | ThreadPoolExecutor + thread-local conn |
| QTimer debounce | High-frequency main-thread events | Single-shot timer restart |
Rules¶
- Never call
Signal.emit()from a background thread - Use_emit_threadsafe() - Never share SQLite connections across threads - Session handles this
- Never hold locks during handler invocation - Copy list, release lock, then invoke
- Use daemon threads for I/O - Clean shutdown via
stop_event - Prefer QTimer over threading for main-thread event batching
- Use module-level functions for thread targets to avoid PyO3 issues
Previous: Part 8: Database Connection Lifecycle
Appendices: - Appendix A: Common Patterns & Recipes - Appendix B: When to Create New Patterns