Source code for EventManager.processors.regexprocessor

import re

from EventManager.filehandlers.config.regex_entry import RegexEntry


[docs] class RegexProcessor: def __init__(self, regex_entries: list[RegexEntry] = None): self.regex_entries = regex_entries if regex_entries is not None else []
[docs] def process_kv(self, event: str) -> str: return self._process_event(event, "KV")
[docs] def process_json(self, event: str) -> str: event = re.sub(r"\s+", "", event) return self._process_event(event, "JSON")
[docs] def process_xml(self, event: str) -> str: return self._process_event(event, "XML")
def _process_event(self, event: str, format: str) -> str: for entry in self.regex_entries: regex = self._process_regex(format, entry.field_name, entry.regex) replacement = self._process_regex(format, entry.field_name, entry.replacement) event = re.sub(regex, replacement, event) return event def _process_regex(self, format: str, field_name: str, value: str) -> str: if format == "KV": return f'{field_name}="{value}"' elif format == "JSON": return f'"{field_name}":"{value}"' elif format == "XML": return f'<{field_name}>{value}</{field_name}>' else: return "N\\A"