# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .base import BaseHandler
class BufferHandler(BaseHandler):
"""Handler that maintains a circular buffer of messages based on the
size and actions specified by a user.
:param inner: The underlying handler used to emit messages.
:param message_limit: The maximum number of messages to retain for
context. If None, the buffer will grow without limit.
:param buffered_actions: The set of actions to include in the buffer
rather than log directly.
"""
def __init__(self, inner, message_limit=100, buffered_actions=None):
BaseHandler.__init__(self, inner)
self.inner = inner
self.message_limit = message_limit
if buffered_actions is None:
buffered_actions = ["log", "test_status"]
self.buffered_actions = set(buffered_actions)
self._buffering = True
if self.message_limit is not None:
self._buffer = [None] * self.message_limit
self._buffer_pos = 0
else:
self._buffer = []
self.message_handler.register_message_handlers(
"buffer",
{
"on": self._enable_buffering,
"off": self._disable_buffering,
"flush": self._flush_buffered,
"clear": self._clear_buffer,
},
)
def __call__(self, data):
action = data["action"]
if "bypass_mozlog_buffer" in data:
data.pop("bypass_mozlog_buffer")
self.inner(data)
return
if not self._buffering or action not in self.buffered_actions:
self.inner(data)
return
self._add_message(data)
def _add_message(self, data):
if self.message_limit is None:
self._buffer.append(data)
else:
self._buffer[self._buffer_pos] = data
self._buffer_pos = (self._buffer_pos + 1) % self.message_limit
def _enable_buffering(self):
self._buffering = True
def _disable_buffering(self):
self._buffering = False
def _clear_buffer(self):
"""Clear the buffer of unwanted messages."""
current_size = len([m for m in self._buffer if m is not None])
if self.message_limit is not None:
self._buffer = [None] * self.message_limit
else:
self._buffer = []
return current_size
def _flush_buffered(self):
"""Logs the contents of the current buffer"""
for msg in self._buffer[self._buffer_pos:]:
if msg is not None:
self.inner(msg)
for msg in self._buffer[: self._buffer_pos]:
if msg is not None:
self.inner(msg)
return self._clear_buffer()