Source code for EventManager.processors.maskipv4address

import ipaddress
import re

from EventManager.processors.Processor import Processor


[docs] class MaskIPV4Address(Processor): __ip_address_ranges: list def __init__(self, ip_address_ranges: list): """ Initializes the MaskIPV4Address processor with a list of IP address ranges to mask. :param ip_address_ranges: A list of IP address ranges to mask. """ self.__ip_address_ranges = ip_address_ranges
[docs] def process_kv(self, event: str) -> str: return self.__mask_ip_in_event(event, "ip=\"(\\d+\\.\\d+\\.\\d+\\.\\d+)\"")
[docs] def process_json(self, event: str) -> str: return self.__mask_ip_in_event(event, "\"ip\":\\s+\"(\\d+\\.\\d+\\.\\d+\\.\\d+)\"")
[docs] def process_xml(self, event: str) -> str: return self.__mask_ip_in_event(event, "<ip>(\\d+\\.\\d+\\.\\d+\\.\\d+)</ip>")
def __mask_ip_in_event(self, event: str, regex: str) -> str: """ Masks the IP address in the given event string based on the provided regex pattern. :param event: The event string containing the IP address. :param regex: The regex pattern to match the IP address. :return: The event string with the IP address masked. """ # Implement the logic to mask the IP address using the regex pattern pattern = re.compile(regex) def mask_match(match): ip = match.group(1) if self.__is_ip_in_any_cidr_range(ip): return match.group(0).replace(ip, "***.***.***.***") return match.group(0) return pattern.sub(mask_match, event) def __is_ip_in_any_cidr_range(self, ip: str) -> bool: for cidr in self.__ip_address_ranges: if self._is_ip_in_cidr(ip, cidr): return True return False def _is_ip_in_cidr(self, ip: str, cidr: str) -> bool: try: return ipaddress.ip_address(ip) in ipaddress.ip_network(cidr, strict=False) except ValueError: return False