Source code for EventManager.processors.sampleprocessor
from threading import Lock
from EventManager.processors.Processor import Processor
[docs]
class SampleProcessor(Processor):
"""
The SampleProcessor class implements the Processor interface and provides functionality to sample events.
It drops the first N-1 events and keeps every N-th event, where N is the sample size.
"""
def __init__(self, sample_size: int):
self._sample_size = sample_size - 1
self._sample_count = 0
self._lock = Lock()
[docs]
def process_kv(self, event: str) -> str:
return self._process_event(event)
[docs]
def process_json(self, event: str) -> str:
return self._process_event(event)
[docs]
def process_xml(self, event: str) -> str:
return self._process_event(event)
def _process_event(self, event: str) -> str:
with self._lock:
if self._sample_count < self._sample_size:
self._sample_count += 1
return "" # Drop the event
else:
self._sample_count = 0
return event # Keep the event