Source code for EventManager.internal.managerbase

import queue
import threading
from typing import TYPE_CHECKING

from EventManager.filehandlers.config.output_entry import OutputEntry
from EventManager.filehandlers.config.processor_entry import ProcessorEntry
from EventManager.formatters.event_formatter import EventFormatter
from EventManager.internal.event_metadata_builder import EventMetaDataBuilder
from EventManager.internal.processor_helper import ProcessorHelper
from EventManager.internal.thread_helper import ThreadHelper

if TYPE_CHECKING:
    from EventManager.internal.output_helper import OutputHelper


[docs]class ManagerBase: """ The ManagerBase is the base class for the EventManager and InternalEventManager. It handles the initialization of the log handler, event processing, and output management. """ def __init__(self, log_handler = None, config_path: str = None): """ Initializes the ManagerBase with either a LogHandler or a config path. :param log_handler: An existing LogHandler instance. :param config_path: A path to a config file to create a LogHandler. """ from EventManager.filehandlers.log_handler import LogHandler from EventManager.internal.output_helper import OutputHelper self._processor_helper: 'ProcessorHelper' self._output_helper: 'OutputHelper' self._event_queue: queue = queue.Queue() self._processing_queue: queue = queue.Queue() self._thread_helper: 'ThreadHelper' = ThreadHelper() if log_handler: self._log_handler: 'LogHandler' = log_handler elif config_path: self._log_handler: 'LogHandler' = LogHandler(config_path) self._processor_helper = ProcessorHelper(self._log_handler) self._output_helper = OutputHelper(self._log_handler) @property def log_handler(self): """ Returns the LogHandler instance. """ return self._log_handler def _initiate_threads(self, internal_event_manager=None): """ Initializes the threads for processing events and outputting results. :param internal_event_manager: Optional InternalEventManager instance. If not provided, uses self.__log_handler. """ self.__initialise_processor_thread_and_outputs() def event_thread(stop_event: threading.Event): while not stop_event.is_set(): try: event = self._event_queue.get(timeout=0.1) if internal_event_manager: self.output_event(event, internal_event_manager) else: self.output_event(event) except queue.Empty: continue self._thread_helper.start_event_thread(event_thread) def __initialise_processor_thread_and_outputs(self): """ Initializes the processing thread and output destinations. """ self._processor_helper.initialise_processors() self._output_helper.initialise_outputs(self) def processing_thread(stop_event: threading.Event): while not stop_event.is_set(): try: event = self._processing_queue.get(timeout=0.1) event = self._processor_helper.process_event(event) if event and event.strip(): self.write_event_to_queue(event) except queue.Empty: continue self._thread_helper.start_processing_thread(processing_thread) def _stop_all_threads(self, internal_event_manager=None): """ Stops all threads gracefully and processes remaining events. """ def process_remaining_event(event): try: event = self._processor_helper.process_event(event) self.write_event_to_queue(event) except Exception as e: if internal_event_manager: internal_event_manager.log_error(f"Error processing remaining events: {str(e)}") else: print(f"Error processing remaining events: {str(e)}") self._thread_helper.stop_thread( "process", self._thread_helper.processing_thread, self._processing_queue, process_remaining_event ) def output_remaining_event(event): try: self.output_event(event) except Exception as e: if internal_event_manager: internal_event_manager.log_error(f"Error writing remaining events: {str(e)}") else: print(f"Error writing remaining events: {str(e)}") self._thread_helper.stop_thread( "event", self._thread_helper.processing_thread, self._event_queue, output_remaining_event )
[docs] def write_event_to_queue(self, event): """ Adds processed event to the event queue. """ self._event_queue.put(event)
[docs] def write_event_to_processing_queue(self, event): """ Adds raw event to the processing queue. """ self._processing_queue.put(event)
[docs] def output_event(self, event:str, internal_event_manager=None): """ Passes the event to the output destinations. """ self._output_helper.output_event(event, internal_event_manager)
[docs] def log_message(self, level: str, *messages): """ Formats and queues a log message for processing and eventual writing to log file. :param level: Log level (e.g., INFO, ERROR). :param messages: A single message (Exception or str), or multiple KeyValueWrapper instances. """ meta_data = EventMetaDataBuilder.build_metadata(level, self._log_handler) event_format = self._log_handler.config.event.event_format if len(messages) == 1 and isinstance(messages[0], (str, Exception)): # Handle single message string or exception message = messages[0] formatted = message.args[0] if isinstance(message, Exception) else str(message) formatter = { "kv": EventFormatter.KEY_VALUE, "csv": EventFormatter.CSV, "xml": EventFormatter.XML, "json": EventFormatter.JSON }.get(event_format, EventFormatter.DEFAULT) event = formatter.format_message(meta_data, formatted) else: # Handle structured key-value messages formatter = { "kv": EventFormatter.KEY_VALUE, "csv": EventFormatter.CSV, "xml": EventFormatter.XML, "json": EventFormatter.JSON }.get(event_format, EventFormatter.DEFAULT) event = formatter.format(meta_data, *messages) self.write_event_to_processing_queue(event)
[docs] def add_output(self, output_entry: 'OutputEntry') -> bool: """ Adds a new output destination based on the provided OutputEntry. :param output_entry: The OutputEntry instance containing the output configuration. :return: True if the output was added successfully, False otherwise. """ return self._output_helper.add_output(output_entry)
[docs] def remove_output(self, output): """ Removes an output destination. :param output: Either an OutputEntry object or a class name as a string. :return: True if the output was removed successfully, False otherwise. """ if isinstance(output, str): return self._output_helper.remove_output(output) else: return self._output_helper.remove_output(output)
[docs] def add_processor(self, processor): """ Adds a processor to the processing queue. :param processor: The processor to be added. """ self._processor_helper.add_processor(processor)
[docs] def remove_processor(self, processor: ProcessorEntry = None, processor_name: str = None): """ Removes a processor from the processing queue. :param processor: The processor to be removed. :param processor_name: The name of the processor to be removed. """ processor = processor or processor_name self._processor_helper.remove_processor(processor)
def _cast_exception_stack_trace_to_string(self) -> str: """ Converts the stack trace of an exception to a string. :return: The stack trace as a string. """ import traceback import sys exc_type, exc_value, exc_tb = sys.exc_info() return "".join(traceback.format_exception(exc_type, exc_value, exc_tb)) def _are_info_logs_enabled(self) -> bool: """ Checks if information or debugging logs are enabled. :return: True if information or debugging logs are enabled, False otherwise. """ information_mode = self._log_handler.config.event.informational_mode debugging_mode = self._log_handler.config.event.debugging_mode return information_mode or debugging_mode