Source code for rafcontpp.model.interruptable_thread

# Copyright (C) 2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 12.07.2019


import threading

# A lock to synchronize planning thread map accesses.
interruptable_threads_lock = threading.Lock()
# a dictionary of all running interruptable threads.
interruptable_threads = {}


[docs]def current_thread(): """ current thread returns the current interruptable thread, it was called from. :return: InterruptableThread: The current interruptable thread, or None if current thread is not interruptable. """ current_thread_id = threading.current_thread().ident current_thread = None with interruptable_threads_lock: if current_thread_id in interruptable_threads: current_thread = interruptable_threads[current_thread_id] return current_thread
[docs]class InterruptableThread(threading.Thread): """ It's a usual thread, but its interruptable. """ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): """ A thread that is interruptable. """ threading.Thread.__init__(self, group, target, name, args, kwargs) self.__interrupted_flag = threading.Event()
[docs] def run(self): # add new thread to list of all interruptable threads with interruptable_threads_lock: interruptable_threads[self.ident] = self #execute threads run method super(InterruptableThread, self).run() #remove thread from list of all interruptable threads with interruptable_threads_lock: elem = interruptable_threads.pop(self.ident, None)
[docs] def interrupt(self): """ Sets the Threads interrupted flag. """ self.__interrupted_flag.set()
[docs] def is_interrupted(self): """ :return: Boolean: True if the Interrupted Flag is set. False otherwise. """ return self.__interrupted_flag.is_set()