refactor: DRY up code between wasm and native kernel#9591
Merged
Conversation
…tive and pyodide Both kernels duplicated the same drain-stale-then-handle completion loop and the same control/UI-element fan-out. Extract `drain_stale` and `enqueue_control_request` into kernel_lifecycle so each side becomes a thin caller; drop the now-dead `completion_worker` and `_drain_queue` in complete.py. `Kernel.code_completion` switches to `self.stream` so the worker thread no longer relies on a contextvar set on the kernel thread.
…queuer factory `make_control_enqueuer(control_q, ui_q)` returns the closure both call sites need, dropping `functools.partial` in `create_kernel` and the per-call helper import in `PyodideSession.put_control_request`. The pyodide session binds the enqueuer once at construction time.
`drain_stale(queue)` now returns the newest pending item (or None) instead of threading a `latest` argument through. Callers do a single blocking get and then merge in any newer arrival via walrus. Also trim docstrings and drop the "5 is arbitrary" magic-number comment.
Keyword-only `latest=` makes the call site read as "drain stale behind this already-retrieved request", which is the intent. Reverts the walrus dance at the call sites and restores the `_T` return type.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Contributor
There was a problem hiding this comment.
No issues found across 4 files
Architecture diagram
sequenceDiagram
participant Client as Frontend / WASM
participant PyodideSession as PyodideSession
participant KernelLifecycle as kernel_lifecycle
participant Kernel as Kernel (runtime.py)
participant QMgr as AsyncQueueManager
Note over Client,Kernel: Control Request Flow (WASM)
Client->>PyodideSession: put_control_request(cmd)
PyodideSession->>KernelLifecycle: make_control_enqueuer() closure
KernelLifecycle->>QMgr: control_queue.put_nowait(cmd)
alt cmd is UpdateUIElementCommand or ModelCommand
KernelLifecycle->>QMgr: set_ui_element_queue.put_nowait(cmd)
end
Note over KernelLifecycle,Kernel: Kernel Initialization
KernelLifecycle->>KernelLifecycle: make_control_enqueuer(args.control_queue, args.set_ui_element_queue)
KernelLifecycle->>Kernel: enqueue_control_request=closure
Note over Kernel: Completion Worker (Native / WASM)
Kernel->>Kernel: start_completion_worker(completion_queue)
Kernel->>KernelLifecycle: drain_stale(queue, latest)
loop worker loop
Kernel->>KernelLifecycle: drain_stale(completion_queue, latest=completion_queue.get())
KernelLifecycle->>KernelLifecycle: drain stale entries
KernelLifecycle-->>Kernel: latest request
Kernel->>Kernel: code_completion(request, docstrings_limit=80)
Kernel->>Kernel: complete(request, graph, globals, globals_lock, stream)
end
Note over PyodideSession: Completion Listener (WASM)
PyodideSession->>PyodideSession: listen_completion()
PyodideSession->>KernelLifecycle: drain_stale(completion_queue, latest=await completion_queue.get())
KernelLifecycle->>KernelLifecycle: drain stale entries
KernelLifecycle-->>PyodideSession: latest request
PyodideSession->>Kernel: code_completion(request, docstrings_limit=5)
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors kernel runtime code to share common queue-handling helpers between the native (threaded) kernel and the Pyodide (asyncio) kernel, reducing duplication around control-message enqueueing and completion-request draining.
Changes:
- Move “drain stale completion requests” logic into a shared
drain_stale()helper and use it from both native and Pyodide completion listeners. - Replace ad-hoc control enqueue closures with a shared
make_control_enqueuer()factory used by bothcreate_kernel()andPyodideSession. - Simplify completion worker wiring by routing native completions through
Kernel.code_completion().
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| marimo/_runtime/runtime.py | Replaces the completion worker thread target with a loop using shared stale-draining + Kernel.code_completion(). |
| marimo/_runtime/kernel_lifecycle.py | Introduces shared helpers drain_stale() and make_control_enqueuer() and uses the factory in create_kernel(). |
| marimo/_runtime/complete.py | Removes now-redundant completion worker + drain helper in favor of the shared lifecycle helper. |
| marimo/_pyodide/pyodide_session.py | Uses shared control enqueuer and shared stale-draining in the Pyodide completion listener. |
…op empty() check Address PR feedback: - drain_stale no longer consults queue.empty() — multiprocessing.Queue.empty() is documented as unreliable. Drain via get_nowait() until Empty. - Add unit tests for drain_stale on both asyncio and threading queues. - Add unit tests for make_control_enqueuer covering plain, UI-element, and model commands.
dmadisetti
approved these changes
May 20, 2026
|
🚀 Development release published. You may be able to view the changes at https://marimo.app?v=0.23.7-dev61 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Share queue combination and share drain logic.