Source code for EventManager.internal.thread_helper

import queue
import threading


[docs] class ThreadHelper: """ A helper class to manage threads in the EventManager. """ def __init__(self): """ Initializes the ThreadHelper. """ self.__event_thread: threading.Thread self.__processing_thread: threading.Thread self.__event_thread_event: threading.Event = threading.Event() self.__processing_thread_event: threading.Event = threading.Event() @property def event_thread(self): """ Returns the event thread. :return: """ return self.__event_thread @property def processing_thread(self): """ Returns the processing thread. :return: """ return self.__processing_thread
[docs] def start_event_thread(self, runnable: callable): """ Starts the event thread with the given runnable function. :param runnable: :return: """ self.__event_thread_event.clear() self.__event_thread = threading.Thread(target=lambda : runnable(self.__event_thread_event)) self.__event_thread.start()
[docs] def start_processing_thread(self, runnable: callable): """ Starts the event thread with the given runnable function. :param runnable: :return: """ self.__processing_thread_event.clear() self.__processing_thread = threading.Thread(target=lambda : runnable(self.__processing_thread_event)) self.__processing_thread.start()
[docs] def stop_thread(self, thread_name: str, thread: threading.Thread, q: queue.Queue, remaining_item_processor: callable): """ Stops the thread and processes remaining items in the queue. :param thread: :param q: :param remaining_item_processor: :return: """ # Check to which thread the thread_name corresponds if thread_name == "event": self.__event_thread_event.set() elif thread_name == "process": self.__processing_thread_event.set() thread.join(timeout=1) # Drain remaining items from the queue while not q.empty(): try: event = q.get_nowait() if event is not None: remaining_item_processor(event) except queue.Empty: break