Fix add-ons with Python 3.12 by replacing "imp" with "importlib"
[blender-addons.git] / io_scene_fbx / fbx_utils_threading.py
blobbf7631b511dcbba145e7a544c2b02c8fa8aae165
1 # SPDX-FileCopyrightText: 2023 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 from contextlib import contextmanager, nullcontext
6 import os
7 from queue import SimpleQueue
9 # Note: `bpy` cannot be imported here because this module is also used by the fbx2json.py and json2fbx.py scripts.
11 # For debugging/profiling purposes, can be modified at runtime to force single-threaded execution.
12 _MULTITHREADING_ENABLED = True
13 # The concurrent.futures module may not work or may not be available on WebAssembly platforms wasm32-emscripten and
14 # wasm32-wasi.
15 try:
16 from concurrent.futures import ThreadPoolExecutor
17 except ModuleNotFoundError:
18 _MULTITHREADING_ENABLED = False
19 ThreadPoolExecutor = None
20 else:
21 try:
22 # The module may be available, but not be fully functional. An error may be raised when attempting to start a
23 # new thread.
24 with ThreadPoolExecutor() as tpe:
25 # Attempt to start a thread by submitting a callable.
26 tpe.submit(lambda: None)
27 except Exception:
28 # Assume that multithreading is not supported and fall back to single-threaded execution.
29 _MULTITHREADING_ENABLED = False
32 def get_cpu_count():
33 """Get the number of cpus assigned to the current process if that information is available on this system.
34 If not available, get the total number of cpus.
35 If the cpu count is indeterminable, it is assumed that there is only 1 cpu available."""
36 sched_getaffinity = getattr(os, "sched_getaffinity", None)
37 if sched_getaffinity is not None:
38 # Return the number of cpus assigned to the current process.
39 return len(sched_getaffinity(0))
40 count = os.cpu_count()
41 return count if count is not None else 1
44 class MultiThreadedTaskConsumer:
45 """Helper class that encapsulates everything needed to run a function on separate threads, with a single-threaded
46 fallback if multithreading is not available.
48 Lower overhead than typical use of ThreadPoolExecutor because no Future objects are returned, which makes this class
49 more suitable to running many smaller tasks.
51 As with any threaded parallelization, because of Python's Global Interpreter Lock, only one thread can execute
52 Python code at a time, so threaded parallelization is only useful when the functions used release the GIL, such as
53 many IO related functions."""
54 # A special task value used to signal task consumer threads to shut down.
55 _SHUT_DOWN_THREADS = object()
57 __slots__ = ("_consumer_function", "_shared_task_queue", "_task_consumer_futures", "_executor",
58 "_max_consumer_threads", "_shutting_down", "_max_queue_per_consumer")
60 def __init__(self, consumer_function, max_consumer_threads, max_queue_per_consumer=5):
61 # It's recommended to use MultiThreadedTaskConsumer.new_cpu_bound_cm() instead of creating new instances
62 # directly.
63 # __init__ should only be called after checking _MULTITHREADING_ENABLED.
64 assert(_MULTITHREADING_ENABLED)
65 # The function that will be called on separate threads to consume tasks.
66 self._consumer_function = consumer_function
67 # All the threads share a single queue. This is a simplistic approach, but it is unlikely to be problematic
68 # unless the main thread is expected to wait a long time for the consumer threads to finish.
69 self._shared_task_queue = SimpleQueue()
70 # Reference to each thread is kept through the returned Future objects. This is used as part of determining when
71 # new threads should be started and is used to be able to receive and handle exceptions from the threads.
72 self._task_consumer_futures = []
73 # Create the executor.
74 self._executor = ThreadPoolExecutor(max_workers=max_consumer_threads)
75 # Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private,
76 # meaning it could be changed without warning, we'll store the max workers/consumers ourselves.
77 self._max_consumer_threads = max_consumer_threads
78 # The maximum task queue size (before another consumer thread is started) increases by this amount with every
79 # additional consumer thread.
80 self._max_queue_per_consumer = max_queue_per_consumer
81 # When shutting down the threads, this is set to True as an extra safeguard to prevent new tasks being
82 # scheduled.
83 self._shutting_down = False
85 @classmethod
86 def new_cpu_bound_cm(cls, consumer_function, other_cpu_bound_threads_in_use=1, hard_max_threads=32):
87 """Return a context manager that, when entered, returns a wrapper around `consumer_function` that schedules
88 `consumer_function` to be run on a separate thread.
90 If the system can't use multithreading, then the context manager's returned function will instead be the input
91 `consumer_function` argument, causing tasks to be run immediately on the calling thread.
93 When exiting the context manager, it waits for all scheduled tasks to complete and prevents the creation of new
94 tasks, similar to calling ThreadPoolExecutor.shutdown(). For these reasons, the wrapped function should only be
95 called from the thread that entered the context manager, otherwise there is no guarantee that all tasks will get
96 scheduled before the context manager exits.
98 Any task that fails with an exception will cause all task consumer threads to stop.
100 The maximum number of threads used matches the number of cpus available up to a maximum of `hard_max_threads`.
101 `hard_max_threads`'s default of 32 matches ThreadPoolExecutor's default behaviour.
103 The maximum number of threads used is decreased by `other_cpu_bound_threads_in_use`. Defaulting to `1`, assuming
104 that the calling thread will also be doing CPU-bound work.
106 Most IO-bound tasks can probably use a ThreadPoolExecutor directly instead because there will typically be fewer
107 tasks and, on average, each individual task will take longer.
108 If needed, `cls.new_cpu_bound_cm(consumer_function, -4)` could be suitable for lots of small IO-bound tasks,
109 because it ensures a minimum of 5 threads, like the default ThreadPoolExecutor."""
110 if _MULTITHREADING_ENABLED:
111 max_threads = get_cpu_count() - other_cpu_bound_threads_in_use
112 max_threads = min(max_threads, hard_max_threads)
113 if max_threads > 0:
114 return cls(consumer_function, max_threads)._wrap_executor_cm()
115 # Fall back to single-threaded.
116 return nullcontext(consumer_function)
118 def _task_consumer_callable(self):
119 """Callable that is run by each task consumer thread.
120 Signals the other task consumer threads to stop when stopped intentionally or when an exception occurs."""
121 try:
122 while True:
123 # Blocks until it can get a task.
124 task_args = self._shared_task_queue.get()
126 if task_args is self._SHUT_DOWN_THREADS:
127 # This special value signals that it's time for all the threads to stop.
128 break
129 else:
130 # Call the task consumer function.
131 self._consumer_function(*task_args)
132 finally:
133 # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has
134 # occurred.
135 # Add _SHUT_DOWN_THREADS to the queue so that the other consumer threads will also shut down.
136 self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
138 def _schedule_task(self, *args):
139 """Task consumer threads are only started as tasks are added.
141 To mitigate starting lots of threads if many tasks are scheduled in quick succession, new threads are only
142 started if the number of queued tasks grows too large.
144 This function is a slight misuse of ThreadPoolExecutor. Normally each task to be scheduled would be submitted
145 through ThreadPoolExecutor.submit, but doing so is noticeably slower for small tasks. We could start new Thread
146 instances manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for
147 waiting for threads to finish and handling exceptions without having to implement an API using Thread ourselves.
149 if self._shutting_down:
150 # Shouldn't occur through normal usage.
151 raise RuntimeError("Cannot schedule new tasks after shutdown")
152 # Schedule the task by adding it to the task queue.
153 self._shared_task_queue.put(args)
154 # Check if more consumer threads need to be added to account for the rate at which tasks are being scheduled
155 # compared to the rate at which tasks are being consumed.
156 current_consumer_count = len(self._task_consumer_futures)
157 if current_consumer_count < self._max_consumer_threads:
158 # The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's
159 # likely that the queue size will still be over the max, causing another new thread to be added immediately.
160 # Increasing the max queue size whenever a new thread is started gives some time for the new thread to start
161 # up and begin consuming tasks before it's determined that another thread is needed.
162 max_queue_size_for_current_consumers = self._max_queue_per_consumer * current_consumer_count
164 if self._shared_task_queue.qsize() > max_queue_size_for_current_consumers:
165 # Add a new consumer thread because the queue has grown too large.
166 self._task_consumer_futures.append(self._executor.submit(self._task_consumer_callable))
168 @contextmanager
169 def _wrap_executor_cm(self):
170 """Wrap the executor's context manager to instead return self._schedule_task and such that the threads
171 automatically start shutting down before the executor itself starts shutting down."""
172 # .__enter__()
173 # Exiting the context manager of the executor will wait for all threads to finish and prevent new
174 # threads from being created, as if its shutdown() method had been called.
175 with self._executor:
176 try:
177 yield self._schedule_task
178 finally:
179 # .__exit__()
180 self._shutting_down = True
181 # Signal all consumer threads to finish up and shut down so that the executor can shut down.
182 # When this is run on the same thread that schedules new tasks, this guarantees that no more tasks will
183 # be scheduled after the consumer threads start to shut down.
184 self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
186 # Because `self._executor` was entered with a context manager, it will wait for all the consumer threads
187 # to finish even if we propagate an exception from one of the threads here.
188 for future in self._task_consumer_futures:
189 # .exception() waits for the future to finish and returns its raised exception or None.
190 ex = future.exception()
191 if ex is not None:
192 # If one of the threads raised an exception, propagate it to the main thread.
193 # Only the first exception will be propagated if there were multiple.
194 raise ex