Puffern

Die integrierte Pufferfunktion verhindert Datenverluste aufgrund von Verbindungs- oder Netzwerkproblemen. Daten, die nicht an das dedizierte Ausgabesystem gesendet werden konnten, werden gepuffert und erneut gesendet, wenn die Verbindung wiederhergestellt ist.

Erstellen eines Pufferausgangsmoduls

Wenn Sie ein Ausgangsmodul erstellen möchten, das als Puffer fungieren kann, müssen Sie die folgenden beiden Methoden implementieren:

  • store_buffer_data(module_id:str, data)
  • get_buffer_data(module_id:str)

Diese Methoden werden von gepufferten Ausgangsmodulen aufgerufen, wenn etwas schiefgeht und sie ihre Daten speichern müssen. Die Aufgabe von store_buffer_data besteht darin, die Datenobjekte für die angegebene module_id zu speichern, bis sie von get_buffer_data angefordert werden. Die module_id ist die ID des Ausgangmoduls, das Daten puffern möchte. Die module_id wird mit den Schlüsselwörtern _buffer oder _bin erweitert. Datenobjekte von Modulen mit der Endung _bin werden niemals wieder angefordert. Die Erweiterung _bin wird hinzugefügt, wenn Daten aus unbekannten Gründen (z. B. ungültige Datenobjekte) nicht gespeichert werden konnten.

Hinweis: Die Methoden können von separaten Threads aufgerufen werden. Wenn Ihr Zielsystem nicht thread-sicher ist, müssen Sie diese Funktionalität selbst implementieren, z. B. durch die Verwendung von Queues.

Info
Ein einfaches und gutes Beispiel für ein Puffermodul ist outputs.buffers.ram_1.

Erstellung eines gepufferten Ausgangsmoduls

Rufen Sie die Methode self._buffer(data=data, invalid=True/False) auf, wenn die ursprüngliche Prozedur aufgrund von Verbindungsproblemen fehlschlägt. Wenn das Problem am Format des Datenobjekts liegt, können Sie angeben, dass es sich um ein ungültiges Datenobjekt handelt. In diesem Fall wird das Datenobjekt in den Puffer gespeichert, jedoch niemals erneut versucht dieses zu speichern.

Die gepufferten Datenobjekte werden automatisch erneut versucht. Sie müssen sich darüber keine Gedanken machen.

Beispiel
def _run(self, data: models.Data):
        """
        This method is called when new data needs to be processed.
        :param data: The data object to be processed.
        """
        try:
            response = self.session.post(url=url, json=json_data)
            response.raise_for_status()
        except (requests.exceptions.HTTPError, ConnectionError, requests.exceptions.Timeout) as e:
            # Probably a connection error, try to buffer...
            self._buffer(data=data, invalid=False)
        except Exception as e:
            # Unknown error, send to buffer, but mark as invalid...
            self._buffer(data=data, invalid=True)