Threading and Execution

Understanding how modules are executed, how data flows between them, and which threading guarantees apply is essential for writing correct and performant modules. This page explains the execution model for each module type and the configuration options that control how links between modules are dispatched.


Sync and async methods

The start, stop, and _run methods may all be implemented as either regular sync or async methods - the framework detects the implementation type at runtime and dispatches accordingly.

# Synchronous _run:
def _run(self, data: models.Data) -> models.Data:
    data.fields['result'] = compute(data.fields['value'])
    return data

# Asynchronous _run:
async def _run(self, data: models.Data) -> models.Data:
    data.fields['result'] = await fetch(data.fields['url'])
    return data

The same applies to start and stop:

async def start(self):
    self._client = await create_client(self.configuration.host)
    while self.active:
        data = await self._client.read()
        self._call_links(data)

async def stop(self):
    await self._client.disconnect()
Info
Each async method is executed on a persistent thread-local event loop. The loop is created once per worker thread on first use and reused for all subsequent calls on that thread. This avoids the overhead of creating a new event loop on every data item, which matters for high-frequency pipelines.
Warning
The _run implementation should not block for a long time. Because run(data) is executed on the calling module worker thread, a blocking _run also blocks that worker thread and can cause data to accumulate in the calling module. For long-running or I/O-heavy work, prefer an asynchronous implementation or move the blocking work off the worker thread.

Input modules

Input modules are the source of data in the pipeline. The framework calls start in a dedicated daemon thread.

def start(self):
    self._client = connect(self.configuration.host)
    while self.active:
        data = self._client.read()
        self._call_links(data)

def stop(self):
    self._client.disconnect()

If start raises an exception, the framework retries it at a configurable interval (config.RETRY_INTERVAL) for as long as the module is active. If start returns without raising, the framework assumes the module handles its own retry logic internally and does not call start again.

Variable modules and tag modules follow the same pattern. Tag modules additionally receive an incoming data object via run(data) - they enrich it with key-value pairs returned by _run() and then forward it.


Processor modules

Processor modules receive data via run(data), which is called on the calling worker thread. The base implementation validates the incoming data, calls _run(data), and forwards the returned data object to all downstream links in a single step.

If thread_safe=True is passed to super().__init__, the base queues incoming data objects instead of processing them immediately. A dedicated queue worker thread is started lazily on the first call and processes items one at a time. This is useful when _run is not safe to call concurrently from multiple threads - for example when it maintains internal state.

def __init__(self, configuration: Configuration):
    # thread_safe=True: _run is only ever called by one thread at a time.
    super().__init__(configuration=configuration, thread_safe=True)

Output modules

Output modules receive data via run(data). The base implementation always queues the incoming data object and processes it in a dedicated queue worker thread - there is no non-queued mode. The queue worker is started lazily on the first call. This guarantees that _run is called sequentially from a single thread, regardless of how many upstream modules are forwarding data concurrently.


Links

Links define which modules receive a copy of the data object after the current module has processed it. They are configured in the module configuration and resolved at runtime. Links can be added or removed while the pipeline is running - the framework detects changes on every _call_links call and starts or stops worker threads accordingly.

Each linked module gets a deep copy of the data object, so mutations in one downstream module do not affect others.

Worker count per link

The worker_count_per_link configuration parameter controls how many persistent worker threads are created per linked module. Multiple workers allow the linked module to be called concurrently for different data objects. Workers are assigned in round-robin order.

Value Behaviour
1 (default) One persistent worker thread per linked module. Data objects are processed in FIFO order.
N > 1 N worker threads per linked module. Data objects are distributed round-robin. Use when the linked module is thread-safe and you want to increase throughput.
0 Spawn mode. A fresh daemon thread is created for every single data object. Use only when extremely low latency matters more than resource efficiency.

Forward latest data only

Setting forward_latest_data_only=True in the configuration switches the worker into latest-only mode. If a new data object arrives while the worker is still processing the previous one, the pending object is replaced by the newer arrival. Only the most recent object is processed once the worker becomes free.

This is useful for high-frequency sensors where processing a backlog of outdated readings is wasteful - only the current value matters. It has no effect in spawn mode (worker_count_per_link=0).

Info
Worker threads are daemon threads — they do not prevent the process from exiting. When a module is stopped, all its worker threads are shut down gracefully before the stop routine completes.