Puffern
Die integrierte Pufferfunktion verhindert Datenverluste aufgrund von Verbindungs- oder Netzwerkproblemen. Daten, die nicht an das dedizierte Ausgabesystem gesendet werden konnten, werden gepuffert und erneut gesendet, wenn die Verbindung wiederhergestellt ist.
Erstellen eines Pufferausgangsmoduls
Wenn Sie ein Ausgangsmodul erstellen möchten, das als Puffer fungieren kann, müssen Sie die folgenden beiden Methoden implementieren:
-
store_buffer_data(module_id:str, data)
-
get_buffer_data(module_id:str)
Diese Methoden werden von gepufferten Ausgangsmodulen aufgerufen, wenn etwas schiefgeht und sie ihre Daten speichern müssen. Die Aufgabe von store_buffer_data
besteht darin, die Datenobjekte für die angegebene module_id
zu speichern, bis sie von get_buffer_data
angefordert werden. Die module_id
ist die ID des Ausgangmoduls, das Daten puffern möchte. Die module_id
wird mit den Schlüsselwörtern _buffer
oder _bin
erweitert. Datenobjekte von Modulen mit der Endung _bin
werden niemals wieder angefordert. Die Erweiterung _bin
wird hinzugefügt, wenn Daten aus unbekannten Gründen (z. B. ungültige Datenobjekte) nicht gespeichert werden konnten.
Hinweis: Die Methoden können von separaten Threads aufgerufen werden. Wenn Ihr Zielsystem nicht thread-sicher ist, müssen Sie diese Funktionalität selbst implementieren, z. B. durch die Verwendung von Queues.
outputs.buffers.ram_1
.
Erstellung eines gepufferten Ausgangsmoduls
Rufen Sie die Methode self._buffer(data=data, invalid=True/False)
auf, wenn die ursprüngliche Prozedur aufgrund von Verbindungsproblemen fehlschlägt. Wenn das Problem am Format des Datenobjekts liegt, können Sie angeben, dass es sich um ein ungültiges Datenobjekt handelt. In diesem Fall wird das Datenobjekt in den Puffer gespeichert, jedoch niemals erneut versucht dieses zu speichern.
Die gepufferten Datenobjekte werden automatisch erneut versucht. Sie müssen sich darüber keine Gedanken machen.
Beispiel
def _run(self, data: models.Data):
"""
This method is called when new data needs to be processed.
:param data: The data object to be processed.
"""
try:
response = self.session.post(url=url, json=json_data)
response.raise_for_status()
except (requests.exceptions.HTTPError, ConnectionError, requests.exceptions.Timeout) as e:
# Probably a connection error, try to buffer...
self._buffer(data=data, invalid=False)
except Exception as e:
# Unknown error, send to buffer, but mark as invalid...
self._buffer(data=data, invalid=True)
Introduction
Template