bevy_tasks/
single_threaded_task_pool.rs

1use std::sync::Arc;
2use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};
3
4thread_local! {
5    static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
6}
7
8/// Used to create a [`TaskPool`].
9#[derive(Debug, Default, Clone)]
10pub struct TaskPoolBuilder {}
11
12/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
13/// task pool. In the case of the multithreaded task pool this struct is used to spawn
14/// tasks on a specific thread. But the wasm task pool just calls
15/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
16/// and so the [`ThreadExecutor`] does nothing.
17#[derive(Default)]
18pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
19impl<'a> ThreadExecutor<'a> {
20    /// Creates a new `ThreadExecutor`
21    pub fn new() -> Self {
22        Self::default()
23    }
24}
25
26impl TaskPoolBuilder {
27    /// Creates a new `TaskPoolBuilder` instance
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// No op on the single threaded task pool
33    pub fn num_threads(self, _num_threads: usize) -> Self {
34        self
35    }
36
37    /// No op on the single threaded task pool
38    pub fn stack_size(self, _stack_size: usize) -> Self {
39        self
40    }
41
42    /// No op on the single threaded task pool
43    pub fn thread_name(self, _thread_name: String) -> Self {
44        self
45    }
46
47    /// Creates a new [`TaskPool`]
48    pub fn build(self) -> TaskPool {
49        TaskPool::new_internal()
50    }
51}
52
53/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
54/// the pool on threads owned by the pool. In this case - main thread only.
55#[derive(Debug, Default, Clone)]
56pub struct TaskPool {}
57
58impl TaskPool {
59    /// Just create a new `ThreadExecutor` for wasm
60    pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
61        Arc::new(ThreadExecutor::new())
62    }
63
64    /// Create a `TaskPool` with the default configuration.
65    pub fn new() -> Self {
66        TaskPoolBuilder::new().build()
67    }
68
69    #[allow(unused_variables)]
70    fn new_internal() -> Self {
71        Self {}
72    }
73
74    /// Return the number of threads owned by the task pool
75    pub fn thread_num(&self) -> usize {
76        1
77    }
78
79    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
80    /// passing a scope object into it. The scope object provided to the callback can be used
81    /// to spawn tasks. This function will await the completion of all tasks before returning.
82    ///
83    /// This is similar to `rayon::scope` and `crossbeam::scope`
84    pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
85    where
86        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
87        T: Send + 'static,
88    {
89        self.scope_with_executor(false, None, f)
90    }
91
92    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
93    /// passing a scope object into it. The scope object provided to the callback can be used
94    /// to spawn tasks. This function will await the completion of all tasks before returning.
95    ///
96    /// This is similar to `rayon::scope` and `crossbeam::scope`
97    #[allow(unsafe_code)]
98    pub fn scope_with_executor<'env, F, T>(
99        &self,
100        _tick_task_pool_executor: bool,
101        _thread_executor: Option<&ThreadExecutor>,
102        f: F,
103    ) -> Vec<T>
104    where
105        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
106        T: Send + 'static,
107    {
108        // SAFETY: This safety comment applies to all references transmuted to 'env.
109        // Any futures spawned with these references need to return before this function completes.
110        // This is guaranteed because we drive all the futures spawned onto the Scope
111        // to completion in this function. However, rust has no way of knowing this so we
112        // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
113        // Any usages of the references passed into `Scope` must be accessed through
114        // the transmuted reference for the rest of this function.
115
116        let executor = &async_executor::LocalExecutor::new();
117        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
118        let executor: &'env async_executor::LocalExecutor<'env> =
119            unsafe { mem::transmute(executor) };
120
121        let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
122        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
123        let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
124            unsafe { mem::transmute(&results) };
125
126        let mut scope = Scope {
127            executor,
128            results,
129            scope: PhantomData,
130            env: PhantomData,
131        };
132
133        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
134        let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
135
136        f(scope_ref);
137
138        // Loop until all tasks are done
139        while executor.try_tick() {}
140
141        let results = scope.results.borrow();
142        results
143            .iter()
144            .map(|result| result.borrow_mut().take().unwrap())
145            .collect()
146    }
147
148    /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
149    /// cancelled and "detached" allowing it to continue running without having to be polled by the
150    /// end-user.
151    ///
152    /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
153    pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
154    where
155        T: 'static,
156    {
157        #[cfg(target_arch = "wasm32")]
158        wasm_bindgen_futures::spawn_local(async move {
159            future.await;
160        });
161
162        #[cfg(not(target_arch = "wasm32"))]
163        {
164            LOCAL_EXECUTOR.with(|executor| {
165                let _task = executor.spawn(future);
166                // Loop until all tasks are done
167                while executor.try_tick() {}
168            });
169        }
170
171        FakeTask
172    }
173
174    /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
175    pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
176    where
177        T: 'static,
178    {
179        self.spawn(future)
180    }
181
182    /// Runs a function with the local executor. Typically used to tick
183    /// the local executor on the main thread as it needs to share time with
184    /// other things.
185    ///
186    /// ```
187    /// use bevy_tasks::TaskPool;
188    ///
189    /// TaskPool::new().with_local_executor(|local_executor| {
190    ///     local_executor.try_tick();
191    /// });
192    /// ```
193    pub fn with_local_executor<F, R>(&self, f: F) -> R
194    where
195        F: FnOnce(&async_executor::LocalExecutor) -> R,
196    {
197        LOCAL_EXECUTOR.with(f)
198    }
199}
200
201/// An empty task used in single-threaded contexts.
202///
203/// This does nothing and is therefore safe, and recommended, to ignore.
204#[derive(Debug)]
205pub struct FakeTask;
206
207impl FakeTask {
208    /// No op on the single threaded task pool
209    pub fn detach(self) {}
210}
211
212/// A `TaskPool` scope for running one or more non-`'static` futures.
213///
214/// For more information, see [`TaskPool::scope`].
215#[derive(Debug)]
216pub struct Scope<'scope, 'env: 'scope, T> {
217    executor: &'scope async_executor::LocalExecutor<'scope>,
218    // Vector to gather results of all futures spawned during scope run
219    results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,
220
221    // make `Scope` invariant over 'scope and 'env
222    scope: PhantomData<&'scope mut &'scope ()>,
223    env: PhantomData<&'env mut &'env ()>,
224}
225
226impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
227    /// Spawns a scoped future onto the executor. The scope *must* outlive
228    /// the provided future. The results of the future will be returned as a part of
229    /// [`TaskPool::scope`]'s return value.
230    ///
231    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
232    ///
233    /// For more information, see [`TaskPool::scope`].
234    pub fn spawn<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
235        self.spawn_on_scope(f);
236    }
237
238    /// Spawns a scoped future onto the executor. The scope *must* outlive
239    /// the provided future. The results of the future will be returned as a part of
240    /// [`TaskPool::scope`]'s return value.
241    ///
242    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
243    ///
244    /// For more information, see [`TaskPool::scope`].
245    pub fn spawn_on_external<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
246        self.spawn_on_scope(f);
247    }
248
249    /// Spawns a scoped future that runs on the thread the scope called from. The
250    /// scope *must* outlive the provided future. The results of the future will be
251    /// returned as a part of [`TaskPool::scope`]'s return value.
252    ///
253    /// For more information, see [`TaskPool::scope`].
254    pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
255        let result = Rc::new(RefCell::new(None));
256        self.results.borrow_mut().push(result.clone());
257        let f = async move {
258            let temp_result = f.await;
259            result.borrow_mut().replace(temp_result);
260        };
261        self.executor.spawn(f).detach();
262    }
263}