Hanabi Developer Hub
/ Build / Add a Python worker
Build

Add a Python worker

Move heavy or native work into a sealed server worker — bytes in, files out.

Inputs
files options
Worker
sealed container
no network by default
Outputs
files
A worker is bytes in, files out.

Most modules are pure UI. A module that needs heavy or native work — image processing, spreadsheet generation, calling an approved API — ships an optional workerAn optional Python program that does heavy or native work off the browser, in a sealed container.: a small Python program the platform runs for it, off the browser.

This guide is the author's contract. For why the sandbox is shaped this way (the threat model, isolation backends, resource tiers), see worker-sandbox-design.md.

Two kinds of worker #

Trusted (first-party)Sandboxed (third-party)
WhoModules shipped with the platformModules from outside developers
RunsIn-process, with full WorkerContext (db, VFS, Office/COM)In an isolated sandbox — bytes in, bytes out, nothing else
Gated bybeing first-partythe admin-approved worker.execute permission + a sandbox backend enabled

This guide covers the sandboxed contract — what an external module author writes. It is deliberately tiny: your worker can touch nothing but its own input bytes and its declared libraries, which is exactly what makes it safe to run.

The contract: one run function #

Your worker entrypoint (entrypoints.worker, e.g. worker/main.py) defines a single callable named `run`:

python
def run(inputs: dict[str, bytes], options: dict) -> dict[str, bytes]:
    ...
  • `inputs` — the job's declared input files as {filename: bytes}. The platform reads them from the module's folders; your worker never sees a path, a file id, a session, the database, or another user.
  • `options` — the job's options object (whatever the UI passed to createJob({...})), as a plain dict.
  • returns — the output files. Each becomes a real file in the module's `Exports` folder. Return any of:
  • a dict { "name.ext": b"...bytes..." }
  • a list of ("name.ext", data) tuples
  • a list of { "name": "...", "data": ... } dicts

data may be bytes or str (auto-encoded UTF-8). Names are reduced to a bare leaf (no directories).

python
# worker/main.py — uppercase every input file
def run(inputs, options):
    suffix = options.get("suffix", "UPPER")
    return {
        f"{suffix}-{name}": text.decode("utf-8", "replace").upper()
        for name, text in inputs.items()
    }

Progress: make run a generator #

To report live progress, `yield` progress events and `return` the outputs. Each {"pct": int, "message": str} you yield streams to the UI as a HANABI_JOB_PROGRESS event (the createJob(..., onProgress) callback). Requires the jobs.emit_progress permission.

python
def run(inputs, options):
    names = list(inputs)
    out = {}
    for i, name in enumerate(names):
        yield {"pct": int(100 * i / max(1, len(names))), "message": f"Converting {name}"}
        out[f"thumb-{name}.png"] = make_thumbnail(inputs[name])   # your library work
    yield {"pct": 100, "message": "Done"}
    return out

Large files: the streaming contract (A3) #

run(inputs, options) hands you every input as bytes — simple, but it loads each file fully into memory, so it's bounded by your tier's memory. For big media (multi-GB transcodes) define run_streaming instead:

python
def run_streaming(in_dir, out_dir, options, ctx=None):
    # read inputs FROM in_dir, write outputs TO out_dir — by path, never wholesale in memory
    from pathlib import Path
    src = next(p for p in Path(in_dir).iterdir() if p.is_file())
    subprocess.run(["ffmpeg", "-i", str(src), "-vn", str(Path(out_dir, "audio.mp3"))], check=True)
  • The platform streams each input file straight into in_dir (it never reads a multi-GB input into the host's memory either), and collects whatever files you leave in out_dir as the job's outputs.
  • It can yield {"pct", "message"} for progress just like run, and takes the same optional 4th ctx for fetch.
  • Prefer run_streaming whenever a tool already reads/writes files (ffmpeg, imagemagick, ghostscript). Use plain run for small in-memory transforms. A worker defines one or the other; if both exist, run_streaming wins. Full example: examples/modules/media-toolkit-worker.

Network: the optional ctx argument #

A worker approved for network.fetch can declare a third parameter to receive a host-mediated fetch capability:

python
def run(inputs, options, ctx):
    res = ctx.fetch("https://api.example.com/v1/rates", method="GET")
    # res -> { "status": int, "headers": {...}, "body": bytes }
    return {"rates.json": res["body"]}
  • ctx.fetch(url, method="GET", headers=None, body=None) reaches only the http(s) origins listed in your manifest's dependencies.services, and only after the host enforces that allowlist. The worker itself has no network socket (the sandbox runs --network none); the request is brokered by the host.
  • Redirects to non-allowed origins are not followed; private/loopback targets are blocked; per-request size and per-job count caps apply.
  • A 2-argument run(inputs, options) is called unchanged — ctx is opt-in.
  • Without the approved network.fetch permission, ctx.fetch raises a clean "network is not approved" error.

Native tools + broad network (admin-HIGH) #

Two default-deny capabilities unlock heavier workers; an admin grants each one per module at review (and sets its quota), on top of worker.execute:

  • `worker.native` runs your worker on the native-tools imageffmpeg, imagemagick (convert), libvips (vips/vipsthumbnail), and ghostscript are on PATH. Shell out to them for transcode / render / compress. The run flags are unchanged (--network none, dropped caps, read-only rootfs), so a native tool can only transform your input bytes.
  • `network.fetch.broad` lifts the declared-origin allowlist: ctx.fetch may reach any public origin the admin policy allows (deny/allow domain lists), with a per-job byte budget + rate limit. Private/loopback stays blocked and every request is audited. Your worker code is identical — just ctx.fetch(url).
python
import subprocess, tempfile
from pathlib import Path

def run(inputs, options, ctx=None):
    name, data = next(iter(inputs.items()))
    with tempfile.TemporaryDirectory() as tmp:
        src = Path(tmp) / name; src.write_bytes(data)
        out = Path(tmp) / "audio.mp3"
        subprocess.run(["ffmpeg", "-y", "-i", str(src), "-vn", "-b:a", "192k", str(out)], check=True)
        return {"audio.mp3": out.read_bytes()}

A worker can also be scheduled (jobs.schedule) or run as a persistent service (worker.service, admin-HIGH, off by default). Full example: examples/modules/media-toolkit-worker.

Declare it in the manifest #

jsonc
{
  "entrypoints": { "ui": "ui/index.html", "worker": "worker/main.py" },
  "permissions": [
    "files.read", "files.write", "jobs.create", "jobs.emit_progress",
    "worker.execute"               // admin-approved; lets the platform run your worker
    // "network.fetch"             // add only if you use ctx.fetch
  ],
  "dependencies": {
    "runtimes": [{ "name": "python", "version": ">=3.11", "required": true }],
    "python": ["pillow>=10.0"],    // your worker's pip requirements
    "services": []                 // http(s) origins for ctx.fetch (if any)
  }
}
  • `worker.execute` is admin-approved, granted at publishing review. Changing the worker code or its declared dependencies.python re-triggers manual review (same as a network.fetch origin change).
  • The UI triggers a run with createJob(options, inputRefs, onProgress, onOutput) (see capability-api.md); the platform marshals the input refs into your inputs and writes your returned outputs to Exports.

Resource tiers & the sandbox #

Each approved worker is assigned a resource tier that bounds one run — memory, CPU seconds, wall-clock timeout, output bytes + file count, process/thread ceiling. Defaults: small = 256 MB / 10 s CPU / 30 s wall / 25 MB out; standard = 512 MB / 60 s CPU / 180 s wall / 100 MB out. A job that exceeds its tier is killed and reported as failed — it can never starve the host. Jobs are admitted through a priority-ordered pool with a global concurrency cap. Details: `worker-sandbox-design.md` §6.

The sandbox ships off by default (HANABI_WORKER_SANDBOX_BACKEND=disabled), so third-party workers don't execute until an operator enables a backend. The container backend (the real isolation boundary) needs Docker on the host — see worker-sandbox-docker-setup.md.

Authoring rules of thumb #

  • Pure transform. Read inputs, compute, return outputs. No file paths, no network except ctx.fetch, no reading the environment or host filesystem — none of it is reachable, and relying on it fails in the sandbox.
  • Deterministic. Same inputs + options → same outputs. Keep a sample under tests/sample-inputs/ (the packager requires one; reviewers run it).
  • Stay inside your tier. Stream large work and bail early on bad input rather than allocating without bound.
  • Declare honestly. Every pip package in dependencies.python; every origin in dependencies.services. Undeclared imports/origins won't be available.

See also #