Buffering

The built-in buffering functionality prevents data loss due to connection or network problems. Data that could not be sent to the dedicated output system is buffered and resent when the connection is re-established.

Creating of a Buffering Output Module

If you want to create an output module which can act as buffer, you have to implement the following two methods:

  • store_buffer_data(module_id:str, data)
  • get_buffer_data(module_id:str)

These methods are called by buffered output modules when something goes wrong and they store their data. The job of store_buffer_data is to store the data objects per given module_id until it is requested by get_buffer_data. The module_id is the id of the output module requesting data to be buffered. The module_id is extended with the keywords _buffer or _bin. Data objects from modules with the ending _bin will never be requested again. The _bin extension is added, if data could not be stored due to unknown reasons (e.g. invalid data objects).

Note: The methods can be called by separate threads. If your target system is not thread-safe, you must implement this functionality yourself, e.g. by using queues.

Info
A simple and good example of a buffer module is outputs.buffers.ram_1.

Creation of a Buffered Output Module

Call the self._buffer(data=data, invalid=True/False) method if the original procedure fails due to connection problems. If it went wrong because of the format of the data object, you can say that it is an invalid data object. In this case, the data is stored in the buffer and will never be retried.

The buffered data objects are automatically retried. There is no need to think about it.

Example
def _run(self, data: models.Data):
        """
        This method is called when new data needs to be processed.
        :param data: The data object to be processed.
        """
        try:
            response = self.session.post(url=url, json=json_data)
            response.raise_for_status()
        except (requests.exceptions.HTTPError, ConnectionError, requests.exceptions.Timeout) as e:
            # Probably a connection error, try to buffer...
            self._buffer(data=data, invalid=False)
        except Exception as e:
            # Unknown error, send to buffer, but mark as invalid...
            self._buffer(data=data, invalid=True)