Threading und Ausführung

Zu verstehen, wie Module ausgeführt werden, wie Daten zwischen ihnen fließen und welche Threading-Garantien gelten, ist entscheidend, um korrekte und performante Module zu schreiben. Diese Seite erklärt das Ausführungsmodell für jeden Modultyp sowie die Konfigurationsoptionen, die steuern, wie Verknüpfungen zwischen Modulen ausgeführt werden.


Synchrone und asynchrone Methoden

Die Methoden start, stop und _run können sowohl synchron als auch asynchron implementiert werden - das Framework erkennt den Implementierungstyp zur Laufzeit und ruft die Methode entsprechend auf.

# Synchronous _run:
def _run(self, data: models.Data) -> models.Data:
    data.fields['result'] = compute(data.fields['value'])
    return data

# Asynchronous _run:
async def _run(self, data: models.Data) -> models.Data:
    data.fields['result'] = await fetch(data.fields['url'])
    return data

Dasselbe gilt für start und stop:

async def start(self):
    self._client = await create_client(self.configuration.host)
    while self.active:
        data = await self._client.read()
        self._call_links(data)

async def stop(self):
    await self._client.disconnect()
Info
Jede asynchrone Methode wird auf einem persistenten thread-lokalen Event-Loop ausgeführt. Der Loop wird bei der ersten Verwendung einmal pro Worker-Thread erstellt und für alle nachfolgenden Aufrufe auf diesem Thread wiederverwendet. Dadurch wird der Aufwand vermieden, für jedes Datenelement einen neuen Event-Loop zu erstellen, was insbesondere bei hochfrequenten Pipelines wichtig ist.
Warnung
Die Implementierung von _run sollte nicht über längere Zeit blockieren. Da run(data) auf dem Worker-Thread des aufrufenden Moduls ausgeführt wird, blockiert ein blockierendes _run ebenfalls diesen Worker-Thread und kann dazu führen, dass sich Daten im aufrufenden Modul aufstauen. Für langlaufende oder I/O-intensive Arbeiten sollte eine asynchrone Implementierung bevorzugt oder die blockierende Arbeit aus dem Worker-Thread ausgelagert werden.

Eingangsmodule

Eingangsmodule sind die Quelle der Daten in der Pipeline. Das Framework ruft start in einem dedizierten Daemon-Thread auf.

def start(self):
    self._client = connect(self.configuration.host)
    while self.active:
        data = self._client.read()
        self._call_links(data)

def stop(self):
    self._client.disconnect()

Wenn start eine Exception auslöst, versucht das Framework den Aufruf in einem konfigurierbaren Intervall (config.RETRY_INTERVAL) erneut, solange das Modul aktiv ist. Wenn start ohne Exception zurückkehrt, geht das Framework davon aus, dass das Modul seine eigene Retry-Logik intern behandelt, und ruft start nicht erneut auf.

Variablenmodule und Tag-Module folgen demselben Muster. Tag-Module erhalten zusätzlich ein eingehendes Datenobjekt über run(data) - sie erweitern dieses um die von _run() zurückgegebenen Schlüssel-Wert-Paare und leiten es anschließend weiter.


Verarbeitungsmodule

Verarbeitungsmodule erhalten Daten über run(data), das auf dem Worker-Thread des aufrufenden Moduls ausgeführt wird. Die Basisimplementierung validiert die eingehenden Daten, ruft _run(data) auf und leitet das zurückgegebene Datenobjekt in einem einzigen Schritt an alle nachgelagerten Verknüpfungen weiter.

Wenn thread_safe=True an super().__init__ übergeben wird, reiht die Basisimplementierung eingehende Datenobjekte in eine Warteschlange ein, anstatt sie sofort zu verarbeiten. Ein dedizierter Queue-Worker-Thread wird beim ersten Aufruf lazy gestartet und verarbeitet die Elemente nacheinander. Dies ist nützlich, wenn _run nicht sicher gleichzeitig von mehreren Threads aufgerufen werden kann - zum Beispiel, wenn ein interner Zustand verwaltet wird.

def __init__(self, configuration: Configuration):
    # thread_safe=True: _run is only ever called by one thread at a time.
    super().__init__(configuration=configuration, thread_safe=True)

Ausgangsmodule

Ausgangsmodule erhalten Daten über run(data). Die Basisimplementierung reiht das eingehende Datenobjekt immer in eine Warteschlange ein und verarbeitet es in einem dedizierten Queue-Worker-Thread - es gibt keinen Modus ohne Warteschlange. Der Queue-Worker wird beim ersten Aufruf lazy gestartet. Dadurch ist garantiert, dass _run sequentiell von einem einzelnen Thread aufgerufen wird, unabhängig davon, wie viele vorgelagerte Module gleichzeitig Daten weiterleiten.


Links

Links definieren, welche Module eine Kopie des Datenobjekts erhalten, nachdem das aktuelle Modul es verarbeitet hat. Sie werden in der Modulkonfiguration festgelegt und zur Laufzeit aufgelöst. Verknüpfungen können während des laufenden Betriebs der Pipeline hinzugefügt oder entfernt werden - das Framework erkennt Änderungen bei jedem Aufruf von _call_links und startet oder beendet die entsprechenden Worker-Threads entsprechend.

Jedes verknüpfte Modul erhält eine tiefe Kopie des Datenobjekts, sodass Änderungen in einem nachgelagerten Modul keine Auswirkungen auf andere haben.

Worker-Anzahl pro Link

Der Konfigurationsparameter worker_count_per_link steuert, wie viele persistente Worker-Threads pro verknüpftem Modul erstellt werden. Mehrere Worker ermöglichen es, das verknüpfte Modul parallel für verschiedene Datenobjekte aufzurufen. Die Worker werden im Round-Robin-Verfahren zugewiesen.

Wert Verhalten
1 (Standard) Ein persistenter Worker-Thread pro verknüpftem Modul. Datenobjekte werden in FIFO-Reihenfolge verarbeitet.
N > 1 N Worker-Threads pro verknüpftem Modul. Datenobjekte werden im Round-Robin-Verfahren verteilt. Verwenden, wenn das verknüpfte Modul thread-sicher ist und der Durchsatz erhöht werden soll.
0 Spawn-Modus. Für jedes einzelne Datenobjekt wird ein neuer Daemon-Thread erstellt. Nur verwenden, wenn extrem niedrige Latenz wichtiger ist als Ressourceneffizienz.

Nur die neuesten Daten weiterleiten

Das Setzen von forward_latest_data_only=True in der Konfiguration schaltet den Worker in den Nur-neueste-Daten-Modus. Wenn ein neues Datenobjekt eintrifft, während der Worker noch das vorherige verarbeitet, wird das ausstehende Objekt durch das neuere ersetzt. Sobald der Worker wieder frei ist, wird nur das aktuellste Objekt verarbeitet.

Dies ist nützlich für hochfrequente Sensoren, bei denen die Verarbeitung eines Rückstaus veralteter Messwerte ineffizient ist - nur der aktuelle Wert ist relevant. Im Spawn-Modus (worker_count_per_link=0) hat dies keine Wirkung.

Info
Worker-Threads sind Daemon-Threads - sie verhindern nicht, dass der Prozess beendet wird. Wenn ein Modul gestoppt wird, werden alle seine Worker-Threads ordnungsgemäß beendet, bevor die Stop-Routine abgeschlossen ist.