Source code for EventManager.processors.enrichingprocessor

import os

from EventManager.processors.Processor import Processor
import socket
import psutil


[docs]class EnrichingProcessor(Processor): __enriching_fields: list = ["hostname", "ip"] __enrichments: dict = {} def __init__(self, enriching_fields: list): """ Initialize the EnrichingProcessor with a dictionary of enrichments. :param enriching_fields: Dictionary containing enrichment data. """ self.__enriching_fields = enriching_fields if enriching_fields is not None else ["hostname", "ip"] self.__enrichments = { "hostname": self.__get_hostname(), "ip": self.__get_ip_address(), "osName": self.__get_os(), "osVersion": self.__get_os_version(), "javaVersion": self.__get_java_version(), "userName": self.__get_user_name(), "availableProcessors": self.__get_available_processors(), "freeMemory": self.__get_free_memory(), "totalMemory": self.__get_total_memory(), }
[docs] def process_kv(self, event: str) -> str: return self.__enrich_kv_event(event)
[docs] def process_json(self, event: str) -> str: return self.__enrich_json_event(event)
[docs] def process_xml(self, event: str) -> str: return self.__enrich_xml_event(event)
def __enrich_kv_event(self, event): builder = [event] for field in self.__enriching_fields: builder.append(f' {field}="{self.__get_value(field)}"') return ''.join(builder) def __enrich_json_event(self, event): builder = [event[:-1], ","] for field in self.__enriching_fields: builder.append(f'"{field}":"{self.__get_value(field)}",') builder[-1] = builder[-1][:-1] # Remove the trailing comma builder.append("}") return ''.join(builder) def __enrich_xml_event(self, event): builder = [event[:-8]] for field in self.__enriching_fields: builder.append(f'<{field}>{self.__get_value(field)}</{field}>') builder.append("</event>") return ''.join(builder) def __get_value(self, field: str) -> str: """ Get the value of a specific field. :param field: The field name to retrieve the value for. :return: The value of the field as a string. """ if field in self.__enrichments: return self.__enrichments[field] else: raise ValueError(f"Field {field} not found in enrichments.") def __get_hostname(self) -> str: """ Get the hostname of the machine. :return: Hostname as a string. """ return socket.gethostname() def __get_ip_address(self) -> str: """ Get the IP address of the machine. :return: IP address as a string. """ return socket.gethostbyname(socket.gethostname()) def __get_os(self) -> str: """ Get the operating system name. :return: Operating system name as a string. """ return os.name def __get_os_version(self) -> str: """ Get the operating system version. :return: Operating system version as a string. """ return os.uname().version def __get_java_version(self) -> str: """ Get the Java version. :return: Java version as a string. """ return os.popen("java -version").read().strip() def __get_user_name(self) -> str: """ Get the username of the current user. :return: Username as a string. """ return os.getlogin() def __get_available_processors(self) -> int: """ Get the number of available processors. :return: Number of available processors as an integer. """ return os.cpu_count() def __get_free_memory(self) -> int: """ Get the amount of free memory. :return: Free memory in bytes as an integer. """ return psutil.virtual_memory().available def __get_total_memory(self) -> int: """ Get the total memory. :return: Total memory in bytes as an integer. """ return psutil.virtual_memory().total