Threading und Ausführung
Zu verstehen, wie Module ausgeführt werden, wie Daten zwischen ihnen fließen und welche Threading-Garantien gelten, ist entscheidend, um korrekte und performante Module zu schreiben. Diese Seite erklärt das Ausführungsmodell für jeden Modultyp sowie die Konfigurationsoptionen, die steuern, wie Verknüpfungen zwischen Modulen ausgeführt werden.
Synchrone und asynchrone Methoden
Die Methoden start, stop und _run können sowohl synchron als auch asynchron implementiert werden - das Framework erkennt den Implementierungstyp zur Laufzeit und ruft die Methode entsprechend auf.
# Synchronous _run:
def _run(self, data: models.Data) -> models.Data:
data.fields['result'] = compute(data.fields['value'])
return data
# Asynchronous _run:
async def _run(self, data: models.Data) -> models.Data:
data.fields['result'] = await fetch(data.fields['url'])
return data
Dasselbe gilt für start und stop:
async def start(self):
self._client = await create_client(self.configuration.host)
while self.active:
data = await self._client.read()
self._call_links(data)
async def stop(self):
await self._client.disconnect()
_run sollte nicht über längere Zeit blockieren.
Da run(data) auf dem Worker-Thread des aufrufenden Moduls ausgeführt wird, blockiert ein blockierendes _run ebenfalls diesen Worker-Thread und kann dazu führen, dass sich Daten im aufrufenden Modul aufstauen.
Für langlaufende oder I/O-intensive Arbeiten sollte eine asynchrone Implementierung bevorzugt oder die blockierende Arbeit aus dem Worker-Thread ausgelagert werden.
Eingangsmodule
Eingangsmodule sind die Quelle der Daten in der Pipeline.
Das Framework ruft start in einem dedizierten Daemon-Thread auf.
def start(self):
self._client = connect(self.configuration.host)
while self.active:
data = self._client.read()
self._call_links(data)
def stop(self):
self._client.disconnect()
Wenn start eine Exception auslöst, versucht das Framework den Aufruf in einem konfigurierbaren Intervall (config.RETRY_INTERVAL) erneut, solange das Modul aktiv ist.
Wenn start ohne Exception zurückkehrt, geht das Framework davon aus, dass das Modul seine eigene Retry-Logik intern behandelt, und ruft start nicht erneut auf.
Variablenmodule und Tag-Module folgen demselben Muster.
Tag-Module erhalten zusätzlich ein eingehendes Datenobjekt über run(data) - sie erweitern dieses um die von _run() zurückgegebenen Schlüssel-Wert-Paare und leiten es anschließend weiter.
Verarbeitungsmodule
Verarbeitungsmodule erhalten Daten über run(data), das auf dem Worker-Thread des aufrufenden Moduls ausgeführt wird.
Die Basisimplementierung validiert die eingehenden Daten, ruft _run(data) auf und leitet das zurückgegebene Datenobjekt in einem einzigen Schritt an alle nachgelagerten Verknüpfungen weiter.
Wenn thread_safe=True an super().__init__ übergeben wird, reiht die Basisimplementierung eingehende Datenobjekte in eine Warteschlange ein, anstatt sie sofort zu verarbeiten.
Ein dedizierter Queue-Worker-Thread wird beim ersten Aufruf lazy gestartet und verarbeitet die Elemente nacheinander.
Dies ist nützlich, wenn _run nicht sicher gleichzeitig von mehreren Threads aufgerufen werden kann - zum Beispiel, wenn ein interner Zustand verwaltet wird.
def __init__(self, configuration: Configuration):
# thread_safe=True: _run is only ever called by one thread at a time.
super().__init__(configuration=configuration, thread_safe=True)
Ausgangsmodule
Ausgangsmodule erhalten Daten über run(data).
Die Basisimplementierung reiht das eingehende Datenobjekt immer in eine Warteschlange ein und verarbeitet es in einem dedizierten Queue-Worker-Thread - es gibt keinen Modus ohne Warteschlange.
Der Queue-Worker wird beim ersten Aufruf lazy gestartet.
Dadurch ist garantiert, dass _run sequentiell von einem einzelnen Thread aufgerufen wird, unabhängig davon, wie viele vorgelagerte Module gleichzeitig Daten weiterleiten.
Links
Links definieren, welche Module eine Kopie des Datenobjekts erhalten, nachdem das aktuelle Modul es verarbeitet hat.
Sie werden in der Modulkonfiguration festgelegt und zur Laufzeit aufgelöst.
Verknüpfungen können während des laufenden Betriebs der Pipeline hinzugefügt oder entfernt werden - das Framework erkennt Änderungen bei jedem Aufruf von _call_links und startet oder beendet die entsprechenden Worker-Threads entsprechend.
Jedes verknüpfte Modul erhält eine tiefe Kopie des Datenobjekts, sodass Änderungen in einem nachgelagerten Modul keine Auswirkungen auf andere haben.
Worker-Anzahl pro Link
Der Konfigurationsparameter worker_count_per_link steuert, wie viele persistente Worker-Threads pro verknüpftem Modul erstellt werden.
Mehrere Worker ermöglichen es, das verknüpfte Modul parallel für verschiedene Datenobjekte aufzurufen.
Die Worker werden im Round-Robin-Verfahren zugewiesen.
| Wert | Verhalten |
|---|---|
1 (Standard) |
Ein persistenter Worker-Thread pro verknüpftem Modul. Datenobjekte werden in FIFO-Reihenfolge verarbeitet. |
N > 1 |
N Worker-Threads pro verknüpftem Modul. Datenobjekte werden im Round-Robin-Verfahren verteilt. Verwenden, wenn das verknüpfte Modul thread-sicher ist und der Durchsatz erhöht werden soll. |
0 |
Spawn-Modus. Für jedes einzelne Datenobjekt wird ein neuer Daemon-Thread erstellt. Nur verwenden, wenn extrem niedrige Latenz wichtiger ist als Ressourceneffizienz. |
Nur die neuesten Daten weiterleiten
Das Setzen von forward_latest_data_only=True in der Konfiguration schaltet den Worker in den Nur-neueste-Daten-Modus.
Wenn ein neues Datenobjekt eintrifft, während der Worker noch das vorherige verarbeitet, wird das ausstehende Objekt durch das neuere ersetzt.
Sobald der Worker wieder frei ist, wird nur das aktuellste Objekt verarbeitet.
Dies ist nützlich für hochfrequente Sensoren, bei denen die Verarbeitung eines Rückstaus veralteter Messwerte ineffizient ist - nur der aktuelle Wert ist relevant.
Im Spawn-Modus (worker_count_per_link=0) hat dies keine Wirkung.
Introduction
Setup Dev Env