Files
offlineimap3-mirror/offlineimap/threadutil.py
2024-04-28 17:07:51 +08:00

215 lines
7.0 KiB
Python

# Copyright (C) 2002-2016 John Goerzen & contributors
# Thread support module
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from threading import Lock, Thread, BoundedSemaphore
from queue import Queue, Empty
import traceback
from offlineimap.ui import getglobalui
STOP_MONITOR = 'STOP_MONITOR'
# General utilities
def semaphorereset(semaphore, originalstate):
"""Block until `semaphore` gets back to its original state, ie all acquired
resources have been released."""
for i in range(originalstate):
semaphore.acquire()
# Now release these.
for i in range(originalstate):
semaphore.release()
class accountThreads:
"""Store the list of all threads in the software so it can be used to find out
what's running and what's not."""
def __init__(self):
self.lock = Lock()
self.list = []
def add(self, thread):
with self.lock:
self.list.append(thread)
def remove(self, thread):
with self.lock:
self.list.remove(thread)
def pop(self):
with self.lock:
if len(self.list) < 1:
return None
return self.list.pop()
def wait(self):
while True:
thread = self.pop()
if thread is None:
break
thread.join()
######################################################################
# Exit-notify threads
######################################################################
exitedThreads = Queue()
def monitor():
"""An infinite "monitoring" loop watching for finished ExitNotifyThread's.
This one is supposed to run in the main thread.
"""
global exitedThreads
ui = getglobalui()
while True:
# Loop forever and call 'callback' for each thread that exited
try:
# We need a timeout in the get() call, so that ctrl-c can throw a
# SIGINT (http://bugs.python.org/issue1360). A timeout with empty
# Queue will raise `Empty`.
#
# ExitNotifyThread add themselves to the exitedThreads queue once
# they are done (normally or with exception).
thread = exitedThreads.get(True, 60)
# Request to abort when callback returns True.
if thread.exit_exception is not None:
if isinstance(thread.exit_exception, SystemExit):
# Bring a SystemExit into the main thread.
# Do not send it back to UI layer right now.
# Maybe later send it to ui.terminate?
raise SystemExit
ui.threadException(thread) # Expected to terminate the program.
# Should never hit this line.
raise AssertionError("thread has 'exit_exception' set to"
" '%s' [%s] but this value is unexpected"
" and the ui did not stop the program." %
(repr(thread.exit_exception), type(thread.exit_exception)))
# Only the monitor thread has this exit message set.
elif thread.exit_message == STOP_MONITOR:
break # Exit the loop here.
else:
ui.threadExited(thread)
except Empty:
pass
class ExitNotifyThread(Thread):
"""This class is designed to alert a "monitor" to the fact that a
thread has exited and to provide for the ability for it to find out
why. All instances are made daemon threads (.daemon=True, so we
bail out when the mainloop dies.
The thread can set instance variables self.exit_message for a human
readable reason of the thread exit.
There is one instance of this class at runtime. The main thread waits for
the monitor to end."""
def __init__(self, *args, **kwargs):
super(ExitNotifyThread, self).__init__(*args, **kwargs)
# These are all child threads that are supposed to go away when
# the main thread is killed.
self.daemon = True
self.exit_message = None
self._exit_exc = None
self._exit_stacktrace = None
def run(self):
"""Allow profiling of a run and store exceptions."""
global exitedThreads
try:
Thread.run(self)
except Exception as e:
# Thread exited with Exception, store it
tb = traceback.format_exc()
self.set_exit_exception(e, tb)
exitedThreads.put(self, True)
def set_exit_exception(self, exc, st=None):
"""Sets Exception and stacktrace of a thread, so that other
threads can query its exit status"""
self._exit_exc = exc
self._exit_stacktrace = st
@property
def exit_exception(self):
"""Returns the cause of the exit, one of:
Exception() -- the thread aborted with this exception
None -- normal termination."""
return self._exit_exc
@property
def exit_stacktrace(self):
"""Returns a string representing the stack trace if set"""
return self._exit_stacktrace
######################################################################
# Instance-limited threads
######################################################################
limitedNamespaces = {}
def initInstanceLimit(limitNamespace, instancemax):
"""Initialize the instance-limited thread implementation.
Run up to intancemax threads for the given limitNamespace. This allows to
honor maxsyncaccounts and maxconnections."""
global limitedNamespaces
if limitNamespace not in limitedNamespaces:
limitedNamespaces[limitNamespace] = BoundedSemaphore(instancemax)
class InstanceLimitedThread(ExitNotifyThread):
def __init__(self, limitNamespace, *args, **kwargs):
self.limitNamespace = limitNamespace
super(InstanceLimitedThread, self).__init__(*args, **kwargs)
def start(self):
global limitedNamespaces
# Will block until the semaphore has free slots.
limitedNamespaces[self.limitNamespace].acquire()
ExitNotifyThread.start(self)
def run(self):
global limitedNamespaces
try:
ExitNotifyThread.run(self)
finally:
if limitedNamespaces and limitedNamespaces[self.limitNamespace]:
limitedNamespaces[self.limitNamespace].release()