bevy_asset/processor/
mod.rs

1mod log;
2mod process;
3
4pub use log::*;
5pub use process::*;
6
7use crate::{
8    io::{
9        AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
10        AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
11        MissingAssetSourceError,
12    },
13    meta::{
14        get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
15        AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
16    },
17    AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
18    MissingAssetLoaderForExtensionError,
19};
20use bevy_ecs::prelude::*;
21use bevy_tasks::IoTaskPool;
22use bevy_utils::tracing::{debug, error, trace, warn};
23#[cfg(feature = "trace")]
24use bevy_utils::{
25    tracing::{info_span, instrument::Instrument},
26    ConditionalSendFuture,
27};
28use bevy_utils::{HashMap, HashSet};
29use futures_io::ErrorKind;
30use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
31use parking_lot::RwLock;
32use std::{
33    collections::VecDeque,
34    path::{Path, PathBuf},
35    sync::Arc,
36};
37use thiserror::Error;
38
39// Needed for doc strings
40#[allow(unused_imports)]
41use crate::io::{AssetReader, AssetWriter};
42
43/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`] / [`AssetWriter`] pair),
44/// processes them in some way, and writes them to a destination [`AssetSource`].
45///
46/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets that
47/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
48///
49/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
50///
51/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
52/// asset, which is used to determine if the asset source has actually changed.  
53///
54/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
55/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
56///
57/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
58#[derive(Resource, Clone)]
59pub struct AssetProcessor {
60    server: AssetServer,
61    pub(crate) data: Arc<AssetProcessorData>,
62}
63
64pub struct AssetProcessorData {
65    pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
66    log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
67    processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
68    /// Default processors for file extensions
69    default_processors: RwLock<HashMap<Box<str>, &'static str>>,
70    state: async_lock::RwLock<ProcessorState>,
71    sources: AssetSources,
72    initialized_sender: async_broadcast::Sender<()>,
73    initialized_receiver: async_broadcast::Receiver<()>,
74    finished_sender: async_broadcast::Sender<()>,
75    finished_receiver: async_broadcast::Receiver<()>,
76}
77
78impl AssetProcessor {
79    /// Creates a new [`AssetProcessor`] instance.
80    pub fn new(source: &mut AssetSourceBuilders) -> Self {
81        let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
82        // The asset processor uses its own asset server with its own id space
83        let mut sources = source.build_sources(false, false);
84        sources.gate_on_processor(data.clone());
85        let server = AssetServer::new_with_meta_check(
86            sources,
87            AssetServerMode::Processed,
88            AssetMetaCheck::Always,
89            false,
90        );
91        Self { server, data }
92    }
93
94    /// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
95    /// the main App. It has different processor-specific configuration and a different ID space.
96    pub fn server(&self) -> &AssetServer {
97        &self.server
98    }
99
100    async fn set_state(&self, state: ProcessorState) {
101        let mut state_guard = self.data.state.write().await;
102        let last_state = *state_guard;
103        *state_guard = state;
104        if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
105            self.data.finished_sender.broadcast(()).await.unwrap();
106        } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
107            self.data.initialized_sender.broadcast(()).await.unwrap();
108        }
109    }
110
111    /// Retrieves the current [`ProcessorState`]
112    pub async fn get_state(&self) -> ProcessorState {
113        *self.data.state.read().await
114    }
115
116    /// Retrieves the [`AssetSource`] for this processor
117    #[inline]
118    pub fn get_source<'a, 'b>(
119        &'a self,
120        id: impl Into<AssetSourceId<'b>>,
121    ) -> Result<&'a AssetSource, MissingAssetSourceError> {
122        self.data.sources.get(id.into())
123    }
124
125    #[inline]
126    pub fn sources(&self) -> &AssetSources {
127        &self.data.sources
128    }
129
130    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
131    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
132    async fn log_unrecoverable(&self) {
133        let mut log = self.data.log.write().await;
134        let log = log.as_mut().unwrap();
135        log.unrecoverable().await.unwrap();
136    }
137
138    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
139    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
140    async fn log_begin_processing(&self, path: &AssetPath<'_>) {
141        let mut log = self.data.log.write().await;
142        let log = log.as_mut().unwrap();
143        log.begin_processing(path).await.unwrap();
144    }
145
146    /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
147    async fn log_end_processing(&self, path: &AssetPath<'_>) {
148        let mut log = self.data.log.write().await;
149        let log = log.as_mut().unwrap();
150        log.end_processing(path).await.unwrap();
151    }
152
153    /// Starts the processor in a background thread.
154    pub fn start(_processor: Res<Self>) {
155        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
156        error!("Cannot run AssetProcessor in single threaded mode (or WASM) yet.");
157        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
158        {
159            let processor = _processor.clone();
160            std::thread::spawn(move || {
161                processor.process_assets();
162                bevy_tasks::block_on(processor.listen_for_source_change_events());
163            });
164        }
165    }
166
167    /// Processes all assets. This will:
168    /// * For each "processed [`AssetSource`]:
169    /// * Scan the [`ProcessorTransactionLog`] and recover from any failures detected
170    /// * Scan the processed [`AssetReader`] to build the current view of already processed assets.
171    /// * Scan the unprocessed [`AssetReader`] and remove any final processed assets that are invalid or no longer exist.
172    /// * For each asset in the unprocessed [`AssetReader`], kick off a new "process job", which will process the asset
173    /// (if the latest version of the asset has not been processed).
174    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
175    pub fn process_assets(&self) {
176        let start_time = std::time::Instant::now();
177        debug!("Processing Assets");
178        IoTaskPool::get().scope(|scope| {
179            scope.spawn(async move {
180                self.initialize().await.unwrap();
181                for source in self.sources().iter_processed() {
182                    self.process_assets_internal(scope, source, PathBuf::from(""))
183                        .await
184                        .unwrap();
185                }
186            });
187        });
188        // This must happen _after_ the scope resolves or it will happen "too early"
189        // Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
190        bevy_tasks::block_on(self.finish_processing_assets());
191        let end_time = std::time::Instant::now();
192        debug!("Processing finished in {:?}", end_time - start_time);
193    }
194
195    /// Listens for changes to assets in the source [`AssetSource`] and update state accordingly.
196    // PERF: parallelize change event processing
197    pub async fn listen_for_source_change_events(&self) {
198        debug!("Listening for changes to source assets");
199        loop {
200            let mut started_processing = false;
201
202            for source in self.data.sources.iter_processed() {
203                if let Some(receiver) = source.event_receiver() {
204                    for event in receiver.try_iter() {
205                        if !started_processing {
206                            self.set_state(ProcessorState::Processing).await;
207                            started_processing = true;
208                        }
209
210                        self.handle_asset_source_event(source, event).await;
211                    }
212                }
213            }
214
215            if started_processing {
216                self.finish_processing_assets().await;
217            }
218        }
219    }
220
221    async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
222        trace!("{event:?}");
223        match event {
224            AssetSourceEvent::AddedAsset(path)
225            | AssetSourceEvent::AddedMeta(path)
226            | AssetSourceEvent::ModifiedAsset(path)
227            | AssetSourceEvent::ModifiedMeta(path) => {
228                self.process_asset(source, path).await;
229            }
230            AssetSourceEvent::RemovedAsset(path) => {
231                self.handle_removed_asset(source, path).await;
232            }
233            AssetSourceEvent::RemovedMeta(path) => {
234                self.handle_removed_meta(source, path).await;
235            }
236            AssetSourceEvent::AddedFolder(path) => {
237                self.handle_added_folder(source, path).await;
238            }
239            // NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
240            // touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
241            // Currently this event handler is not parallel, but it could be (and likely should be) in the future.
242            AssetSourceEvent::RemovedFolder(path) => {
243                self.handle_removed_folder(source, &path).await;
244            }
245            AssetSourceEvent::RenamedAsset { old, new } => {
246                // If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
247                // Sometimes this event is returned when an asset is moved "back" into the asset folder
248                if old == new {
249                    self.process_asset(source, new).await;
250                } else {
251                    self.handle_renamed_asset(source, old, new).await;
252                }
253            }
254            AssetSourceEvent::RenamedMeta { old, new } => {
255                // If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
256                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
257                if old == new {
258                    self.process_asset(source, new).await;
259                } else {
260                    debug!("Meta renamed from {old:?} to {new:?}");
261                    let mut infos = self.data.asset_infos.write().await;
262                    // Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
263                    // if they should be re-imported (and/or have new meta generated)
264                    let new_asset_path = AssetPath::from(new).with_source(source.id());
265                    let old_asset_path = AssetPath::from(old).with_source(source.id());
266                    infos.check_reprocess_queue.push_back(old_asset_path);
267                    infos.check_reprocess_queue.push_back(new_asset_path);
268                }
269            }
270            AssetSourceEvent::RenamedFolder { old, new } => {
271                // If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
272                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
273                if old == new {
274                    self.handle_added_folder(source, new).await;
275                } else {
276                    // PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
277                    // requires some nuance when it comes to path handling.
278                    self.handle_removed_folder(source, &old).await;
279                    self.handle_added_folder(source, new).await;
280                }
281            }
282            AssetSourceEvent::RemovedUnknown { path, is_meta } => {
283                let processed_reader = source.processed_reader().unwrap();
284                match processed_reader.is_directory(&path).await {
285                    Ok(is_directory) => {
286                        if is_directory {
287                            self.handle_removed_folder(source, &path).await;
288                        } else if is_meta {
289                            self.handle_removed_meta(source, path).await;
290                        } else {
291                            self.handle_removed_asset(source, path).await;
292                        }
293                    }
294                    Err(err) => {
295                        match err {
296                            AssetReaderError::NotFound(_) => {
297                                // if the path is not found, a processed version does not exist
298                            }
299                            AssetReaderError::Io(err) => {
300                                error!(
301                                    "Path '{}' was removed, but the destination reader could not determine if it \
302                                    was a folder or a file due to the following error: {err}",
303                                    AssetPath::from_path(&path).with_source(source.id())
304                                );
305                            }
306                            AssetReaderError::HttpError(status) => {
307                                error!(
308                                    "Path '{}' was removed, but the destination reader could not determine if it \
309                                    was a folder or a file due to receiving an unexpected HTTP Status {status}",
310                                    AssetPath::from_path(&path).with_source(source.id())
311                                );
312                            }
313                        }
314                    }
315                }
316            }
317        }
318    }
319
320    async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
321        debug!(
322            "Folder {} was added. Attempting to re-process",
323            AssetPath::from_path(&path).with_source(source.id())
324        );
325        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
326        error!("AddFolder event cannot be handled in single threaded mode (or WASM) yet.");
327        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
328        IoTaskPool::get().scope(|scope| {
329            scope.spawn(async move {
330                self.process_assets_internal(scope, source, path)
331                    .await
332                    .unwrap();
333            });
334        });
335    }
336
337    /// Responds to a removed meta event by reprocessing the asset at the given path.
338    async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
339        // If meta was removed, we might need to regenerate it.
340        // Likewise, the user might be manually re-adding the asset.
341        // Therefore, we shouldn't automatically delete the asset ... that is a
342        // user-initiated action.
343        debug!(
344            "Meta for asset {:?} was removed. Attempting to re-process",
345            AssetPath::from_path(&path).with_source(source.id())
346        );
347        self.process_asset(source, path).await;
348    }
349
350    /// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
351    async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
352        debug!("Removing folder {:?} because source was removed", path);
353        let processed_reader = source.processed_reader().unwrap();
354        match processed_reader.read_directory(path).await {
355            Ok(mut path_stream) => {
356                while let Some(child_path) = path_stream.next().await {
357                    self.handle_removed_asset(source, child_path).await;
358                }
359            }
360            Err(err) => match err {
361                AssetReaderError::NotFound(_err) => {
362                    // The processed folder does not exist. No need to update anything
363                }
364                AssetReaderError::HttpError(status) => {
365                    self.log_unrecoverable().await;
366                    error!(
367                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
368                        in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
369                    );
370                }
371                AssetReaderError::Io(err) => {
372                    self.log_unrecoverable().await;
373                    error!(
374                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
375                        in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
376                    );
377                }
378            },
379        }
380        let processed_writer = source.processed_writer().unwrap();
381        if let Err(err) = processed_writer.remove_directory(path).await {
382            match err {
383                AssetWriterError::Io(err) => {
384                    // we can ignore NotFound because if the "final" file in a folder was removed
385                    // then we automatically clean up this folder
386                    if err.kind() != ErrorKind::NotFound {
387                        let asset_path = AssetPath::from_path(path).with_source(source.id());
388                        error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
389                    }
390                }
391            }
392        }
393    }
394
395    /// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
396    /// asset have finished, thanks to the `file_transaction_lock`.
397    async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
398        let asset_path = AssetPath::from(path).with_source(source.id());
399        debug!("Removing processed {asset_path} because source was removed");
400        let mut infos = self.data.asset_infos.write().await;
401        if let Some(info) = infos.get(&asset_path) {
402            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
403            // can finish their operations
404            let _write_lock = info.file_transaction_lock.write();
405            self.remove_processed_asset_and_meta(source, asset_path.path())
406                .await;
407        }
408        infos.remove(&asset_path).await;
409    }
410
411    /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
412    /// This will cause direct path dependencies to break.
413    async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
414        let mut infos = self.data.asset_infos.write().await;
415        let old = AssetPath::from(old).with_source(source.id());
416        let new = AssetPath::from(new).with_source(source.id());
417        let processed_writer = source.processed_writer().unwrap();
418        if let Some(info) = infos.get(&old) {
419            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
420            // can finish their operations
421            let _write_lock = info.file_transaction_lock.write();
422            processed_writer
423                .rename(old.path(), new.path())
424                .await
425                .unwrap();
426            processed_writer
427                .rename_meta(old.path(), new.path())
428                .await
429                .unwrap();
430        }
431        infos.rename(&old, &new).await;
432    }
433
434    async fn finish_processing_assets(&self) {
435        self.try_reprocessing_queued().await;
436        // clean up metadata in asset server
437        self.server.data.infos.write().consume_handle_drop_events();
438        self.set_state(ProcessorState::Finished).await;
439    }
440
441    #[allow(unused)]
442    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
443    async fn process_assets_internal<'scope>(
444        &'scope self,
445        scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
446        source: &'scope AssetSource,
447        path: PathBuf,
448    ) -> Result<(), AssetReaderError> {
449        if source.reader().is_directory(&path).await? {
450            let mut path_stream = source.reader().read_directory(&path).await?;
451            while let Some(path) = path_stream.next().await {
452                Box::pin(self.process_assets_internal(scope, source, path)).await?;
453            }
454        } else {
455            // Files without extensions are skipped
456            let processor = self.clone();
457            scope.spawn(async move {
458                processor.process_asset(source, path).await;
459            });
460        }
461        Ok(())
462    }
463
464    async fn try_reprocessing_queued(&self) {
465        loop {
466            let mut check_reprocess_queue =
467                std::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
468            IoTaskPool::get().scope(|scope| {
469                for path in check_reprocess_queue.drain(..) {
470                    let processor = self.clone();
471                    let source = self.get_source(path.source()).unwrap();
472                    scope.spawn(async move {
473                        processor.process_asset(source, path.into()).await;
474                    });
475                }
476            });
477            let infos = self.data.asset_infos.read().await;
478            if infos.check_reprocess_queue.is_empty() {
479                break;
480            }
481        }
482    }
483
484    /// Register a new asset processor.
485    pub fn register_processor<P: Process>(&self, processor: P) {
486        let mut process_plans = self.data.processors.write();
487        #[cfg(feature = "trace")]
488        let processor = InstrumentedAssetProcessor(processor);
489        process_plans.insert(std::any::type_name::<P>(), Arc::new(processor));
490    }
491
492    /// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
493    pub fn set_default_processor<P: Process>(&self, extension: &str) {
494        let mut default_processors = self.data.default_processors.write();
495        default_processors.insert(extension.into(), std::any::type_name::<P>());
496    }
497
498    /// Returns the default processor for the given `extension`, if it exists.
499    pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
500        let default_processors = self.data.default_processors.read();
501        let key = default_processors.get(extension)?;
502        self.data.processors.read().get(key).cloned()
503    }
504
505    /// Returns the processor with the given `processor_type_name`, if it exists.
506    pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
507        let processors = self.data.processors.read();
508        processors.get(processor_type_name).cloned()
509    }
510
511    /// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
512    /// This info will later be used to determine whether or not to re-process an asset
513    ///
514    /// This will validate transactions and recover failed transactions when necessary.
515    #[allow(unused)]
516    async fn initialize(&self) -> Result<(), InitializeError> {
517        self.validate_transaction_log_and_recover().await;
518        let mut asset_infos = self.data.asset_infos.write().await;
519
520        /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
521        /// folders when they are discovered.
522        async fn get_asset_paths<'a>(
523            reader: &'a dyn ErasedAssetReader,
524            clean_empty_folders_writer: Option<&'a dyn ErasedAssetWriter>,
525            path: PathBuf,
526            paths: &'a mut Vec<PathBuf>,
527        ) -> Result<bool, AssetReaderError> {
528            if reader.is_directory(&path).await? {
529                let mut path_stream = reader.read_directory(&path).await?;
530                let mut contains_files = false;
531
532                while let Some(child_path) = path_stream.next().await {
533                    contains_files |= Box::pin(get_asset_paths(
534                        reader,
535                        clean_empty_folders_writer,
536                        child_path,
537                        paths,
538                    ))
539                    .await?;
540                }
541                if !contains_files && path.parent().is_some() {
542                    if let Some(writer) = clean_empty_folders_writer {
543                        // it is ok for this to fail as it is just a cleanup job.
544                        let _ = writer.remove_empty_directory(&path).await;
545                    }
546                }
547                Ok(contains_files)
548            } else {
549                paths.push(path);
550                Ok(true)
551            }
552        }
553
554        for source in self.sources().iter_processed() {
555            let Ok(processed_reader) = source.processed_reader() else {
556                continue;
557            };
558            let Ok(processed_writer) = source.processed_writer() else {
559                continue;
560            };
561            let mut unprocessed_paths = Vec::new();
562            get_asset_paths(
563                source.reader(),
564                None,
565                PathBuf::from(""),
566                &mut unprocessed_paths,
567            )
568            .await
569            .map_err(InitializeError::FailedToReadSourcePaths)?;
570
571            let mut processed_paths = Vec::new();
572            get_asset_paths(
573                processed_reader,
574                Some(processed_writer),
575                PathBuf::from(""),
576                &mut processed_paths,
577            )
578            .await
579            .map_err(InitializeError::FailedToReadDestinationPaths)?;
580
581            for path in unprocessed_paths {
582                asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
583            }
584
585            for path in processed_paths {
586                let mut dependencies = Vec::new();
587                let asset_path = AssetPath::from(path).with_source(source.id());
588                if let Some(info) = asset_infos.get_mut(&asset_path) {
589                    match processed_reader.read_meta_bytes(asset_path.path()).await {
590                        Ok(meta_bytes) => {
591                            match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
592                                Ok(minimal) => {
593                                    trace!(
594                                        "Populated processed info for asset {asset_path} {:?}",
595                                        minimal.processed_info
596                                    );
597
598                                    if let Some(processed_info) = &minimal.processed_info {
599                                        for process_dependency_info in
600                                            &processed_info.process_dependencies
601                                        {
602                                            dependencies.push(process_dependency_info.path.clone());
603                                        }
604                                    }
605                                    info.processed_info = minimal.processed_info;
606                                }
607                                Err(err) => {
608                                    trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
609                                    self.remove_processed_asset_and_meta(source, asset_path.path())
610                                        .await;
611                                }
612                            }
613                        }
614                        Err(err) => {
615                            trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
616                            self.remove_processed_asset_and_meta(source, asset_path.path())
617                                .await;
618                        }
619                    }
620                } else {
621                    trace!("Removing processed data for non-existent asset {asset_path}");
622                    self.remove_processed_asset_and_meta(source, asset_path.path())
623                        .await;
624                }
625
626                for dependency in dependencies {
627                    asset_infos.add_dependant(&dependency, asset_path.clone());
628                }
629            }
630        }
631
632        self.set_state(ProcessorState::Processing).await;
633
634        Ok(())
635    }
636
637    /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
638    /// does it remove existing in-memory metadata.
639    async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
640        if let Err(err) = source.processed_writer().unwrap().remove(path).await {
641            warn!("Failed to remove non-existent asset {path:?}: {err}");
642        }
643
644        if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
645            warn!("Failed to remove non-existent meta {path:?}: {err}");
646        }
647
648        self.clean_empty_processed_ancestor_folders(source, path)
649            .await;
650    }
651
652    async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
653        // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
654        if path.is_absolute() {
655            error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
656            return;
657        }
658        while let Some(parent) = path.parent() {
659            if parent == Path::new("") {
660                break;
661            }
662            if source
663                .processed_writer()
664                .unwrap()
665                .remove_empty_directory(parent)
666                .await
667                .is_err()
668            {
669                // if we fail to delete a folder, stop walking up the tree
670                break;
671            }
672        }
673    }
674
675    /// Processes the asset (if it has not already been processed or the asset source has changed).
676    /// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
677    /// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
678    /// to block reads until the asset is processed).
679    ///
680    /// [`LoadContext`]: crate::loader::LoadContext
681    /// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
682    async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
683        let asset_path = AssetPath::from(path).with_source(source.id());
684        let result = self.process_asset_internal(source, &asset_path).await;
685        let mut infos = self.data.asset_infos.write().await;
686        infos.finish_processing(asset_path, result).await;
687    }
688
689    async fn process_asset_internal(
690        &self,
691        source: &AssetSource,
692        asset_path: &AssetPath<'static>,
693    ) -> Result<ProcessResult, ProcessError> {
694        // TODO: The extension check was removed now that AssetPath is the input. is that ok?
695        // TODO: check if already processing to protect against duplicate hot-reload events
696        debug!("Processing {:?}", asset_path);
697        let server = &self.server;
698        let path = asset_path.path();
699        let reader = source.reader();
700
701        let reader_err = |err| ProcessError::AssetReaderError {
702            path: asset_path.clone(),
703            err,
704        };
705        let writer_err = |err| ProcessError::AssetWriterError {
706            path: asset_path.clone(),
707            err,
708        };
709
710        // Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
711        let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
712
713        let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
714            Ok(meta_bytes) => {
715                let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
716                    ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
717                })?;
718                let (meta, processor) = match minimal.asset {
719                    AssetActionMinimal::Load { loader } => {
720                        let loader = server.get_asset_loader_with_type_name(&loader).await?;
721                        let meta = loader.deserialize_meta(&meta_bytes)?;
722                        (meta, None)
723                    }
724                    AssetActionMinimal::Process { processor } => {
725                        let processor = self
726                            .get_processor(&processor)
727                            .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
728                        let meta = processor.deserialize_meta(&meta_bytes)?;
729                        (meta, Some(processor))
730                    }
731                    AssetActionMinimal::Ignore => {
732                        return Ok(ProcessResult::Ignored);
733                    }
734                };
735                (meta, meta_bytes, processor)
736            }
737            Err(AssetReaderError::NotFound(_path)) => {
738                let (meta, processor) = if let Some(processor) = asset_path
739                    .get_full_extension()
740                    .and_then(|ext| self.get_default_processor(&ext))
741                {
742                    let meta = processor.default_meta();
743                    (meta, Some(processor))
744                } else {
745                    match server.get_path_asset_loader(asset_path.clone()).await {
746                        Ok(loader) => (loader.default_meta(), None),
747                        Err(MissingAssetLoaderForExtensionError { .. }) => {
748                            let meta: Box<dyn AssetMetaDyn> =
749                                Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
750                            (meta, None)
751                        }
752                    }
753                };
754                let meta_bytes = meta.serialize();
755                // write meta to source location if it doesn't already exist
756                source
757                    .writer()?
758                    .write_meta_bytes(path, &meta_bytes)
759                    .await
760                    .map_err(writer_err)?;
761                (meta, meta_bytes, processor)
762            }
763            Err(err) => {
764                return Err(ProcessError::ReadAssetMetaError {
765                    path: asset_path.clone(),
766                    err,
767                })
768            }
769        };
770
771        let processed_writer = source.processed_writer()?;
772
773        let mut asset_bytes = Vec::new();
774        byte_reader
775            .read_to_end(&mut asset_bytes)
776            .await
777            .map_err(|e| ProcessError::AssetReaderError {
778                path: asset_path.clone(),
779                err: AssetReaderError::Io(e.into()),
780            })?;
781
782        // PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
783        // The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
784        // Hard to say which is worse
785        let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
786        let mut new_processed_info = ProcessedInfo {
787            hash: new_hash,
788            full_hash: new_hash,
789            process_dependencies: Vec::new(),
790        };
791
792        {
793            let infos = self.data.asset_infos.read().await;
794            if let Some(current_processed_info) = infos
795                .get(asset_path)
796                .and_then(|i| i.processed_info.as_ref())
797            {
798                if current_processed_info.hash == new_hash {
799                    let mut dependency_changed = false;
800                    for current_dep_info in &current_processed_info.process_dependencies {
801                        let live_hash = infos
802                            .get(&current_dep_info.path)
803                            .and_then(|i| i.processed_info.as_ref())
804                            .map(|i| i.full_hash);
805                        if live_hash != Some(current_dep_info.full_hash) {
806                            dependency_changed = true;
807                            break;
808                        }
809                    }
810                    if !dependency_changed {
811                        return Ok(ProcessResult::SkippedNotChanged);
812                    }
813                }
814            }
815        }
816        // Note: this lock must remain alive until all processed asset asset and meta writes have finished (or failed)
817        // See ProcessedAssetInfo::file_transaction_lock docs for more info
818        let _transaction_lock = {
819            let mut infos = self.data.asset_infos.write().await;
820            let info = infos.get_or_insert(asset_path.clone());
821            info.file_transaction_lock.write_arc().await
822        };
823
824        // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
825        // Directly writing to the asset destination in the processor necessitates this behavior
826        // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
827        self.log_begin_processing(asset_path).await;
828        if let Some(processor) = processor {
829            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
830            let mut processed_meta = {
831                let mut context =
832                    ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
833                processor
834                    .process(&mut context, source_meta, &mut *writer)
835                    .await?
836            };
837
838            writer
839                .flush()
840                .await
841                .map_err(|e| ProcessError::AssetWriterError {
842                    path: asset_path.clone(),
843                    err: AssetWriterError::Io(e),
844                })?;
845
846            let full_hash = get_full_asset_hash(
847                new_hash,
848                new_processed_info
849                    .process_dependencies
850                    .iter()
851                    .map(|i| i.full_hash),
852            );
853            new_processed_info.full_hash = full_hash;
854            *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
855            let meta_bytes = processed_meta.serialize();
856            processed_writer
857                .write_meta_bytes(path, &meta_bytes)
858                .await
859                .map_err(writer_err)?;
860        } else {
861            processed_writer
862                .write_bytes(path, &asset_bytes)
863                .await
864                .map_err(writer_err)?;
865            *source_meta.processed_info_mut() = Some(new_processed_info.clone());
866            let meta_bytes = source_meta.serialize();
867            processed_writer
868                .write_meta_bytes(path, &meta_bytes)
869                .await
870                .map_err(writer_err)?;
871        }
872        self.log_end_processing(asset_path).await;
873
874        Ok(ProcessResult::Processed(new_processed_info))
875    }
876
877    async fn validate_transaction_log_and_recover(&self) {
878        if let Err(err) = ProcessorTransactionLog::validate().await {
879            let state_is_valid = match err {
880                ValidateLogError::ReadLogError(err) => {
881                    error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
882                    false
883                }
884                ValidateLogError::UnrecoverableError => {
885                    error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
886                    false
887                }
888                ValidateLogError::EntryErrors(entry_errors) => {
889                    let mut state_is_valid = true;
890                    for entry_error in entry_errors {
891                        match entry_error {
892                            LogEntryError::DuplicateTransaction(_)
893                            | LogEntryError::EndedMissingTransaction(_) => {
894                                error!("{}", entry_error);
895                                state_is_valid = false;
896                                break;
897                            }
898                            LogEntryError::UnfinishedTransaction(path) => {
899                                debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
900                                let mut unrecoverable_err = |message: &dyn std::fmt::Display| {
901                                    error!("Failed to remove asset {path:?}: {message}");
902                                    state_is_valid = false;
903                                };
904                                let Ok(source) = self.get_source(path.source()) else {
905                                    unrecoverable_err(&"AssetSource does not exist");
906                                    continue;
907                                };
908                                let Ok(processed_writer) = source.processed_writer() else {
909                                    unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
910                                    continue;
911                                };
912
913                                if let Err(err) = processed_writer.remove(path.path()).await {
914                                    match err {
915                                        AssetWriterError::Io(err) => {
916                                            // any error but NotFound means we could be in a bad state
917                                            if err.kind() != ErrorKind::NotFound {
918                                                unrecoverable_err(&err);
919                                            }
920                                        }
921                                    }
922                                }
923                                if let Err(err) = processed_writer.remove_meta(path.path()).await {
924                                    match err {
925                                        AssetWriterError::Io(err) => {
926                                            // any error but NotFound means we could be in a bad state
927                                            if err.kind() != ErrorKind::NotFound {
928                                                unrecoverable_err(&err);
929                                            }
930                                        }
931                                    }
932                                }
933                            }
934                        }
935                    }
936                    state_is_valid
937                }
938            };
939
940            if !state_is_valid {
941                error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
942                for source in self.sources().iter_processed() {
943                    let Ok(processed_writer) = source.processed_writer() else {
944                        continue;
945                    };
946                    if let Err(err) = processed_writer
947                        .remove_assets_in_directory(Path::new(""))
948                        .await
949                    {
950                        panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
951                    }
952                }
953            }
954        }
955        let mut log = self.data.log.write().await;
956        *log = match ProcessorTransactionLog::new().await {
957            Ok(log) => Some(log),
958            Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
959        };
960    }
961}
962
963impl AssetProcessorData {
964    pub fn new(source: AssetSources) -> Self {
965        let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
966        let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
967        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
968        // not block if there was older state present.
969        finished_sender.set_overflow(true);
970        initialized_sender.set_overflow(true);
971
972        AssetProcessorData {
973            sources: source,
974            finished_sender,
975            finished_receiver,
976            initialized_sender,
977            initialized_receiver,
978            state: async_lock::RwLock::new(ProcessorState::Initializing),
979            log: Default::default(),
980            processors: Default::default(),
981            asset_infos: Default::default(),
982            default_processors: Default::default(),
983        }
984    }
985
986    /// Returns a future that will not finish until the path has been processed.
987    pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
988        self.wait_until_initialized().await;
989        let mut receiver = {
990            let infos = self.asset_infos.write().await;
991            let info = infos.get(&path);
992            match info {
993                Some(info) => match info.status {
994                    Some(result) => return result,
995                    // This receiver must be created prior to losing the read lock to ensure this is transactional
996                    None => info.status_receiver.clone(),
997                },
998                None => return ProcessStatus::NonExistent,
999            }
1000        };
1001        receiver.recv().await.unwrap()
1002    }
1003
1004    /// Returns a future that will not finish until the processor has been initialized.
1005    pub async fn wait_until_initialized(&self) {
1006        let receiver = {
1007            let state = self.state.read().await;
1008            match *state {
1009                ProcessorState::Initializing => {
1010                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1011                    Some(self.initialized_receiver.clone())
1012                }
1013                _ => None,
1014            }
1015        };
1016
1017        if let Some(mut receiver) = receiver {
1018            receiver.recv().await.unwrap();
1019        }
1020    }
1021
1022    /// Returns a future that will not finish until processing has finished.
1023    pub async fn wait_until_finished(&self) {
1024        let receiver = {
1025            let state = self.state.read().await;
1026            match *state {
1027                ProcessorState::Initializing | ProcessorState::Processing => {
1028                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1029                    Some(self.finished_receiver.clone())
1030                }
1031                ProcessorState::Finished => None,
1032            }
1033        };
1034
1035        if let Some(mut receiver) = receiver {
1036            receiver.recv().await.unwrap();
1037        }
1038    }
1039}
1040
1041#[cfg(feature = "trace")]
1042struct InstrumentedAssetProcessor<T>(T);
1043
1044#[cfg(feature = "trace")]
1045impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1046    type Settings = T::Settings;
1047    type OutputLoader = T::OutputLoader;
1048
1049    fn process<'a>(
1050        &'a self,
1051        context: &'a mut ProcessContext,
1052        meta: AssetMeta<(), Self>,
1053        writer: &'a mut crate::io::Writer,
1054    ) -> impl ConditionalSendFuture<
1055        Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1056    > {
1057        // Change the processor type for the `AssetMeta`, which works because we share the `Settings` type.
1058        let meta = AssetMeta {
1059            meta_format_version: meta.meta_format_version,
1060            processed_info: meta.processed_info,
1061            asset: meta.asset,
1062        };
1063        let span = info_span!(
1064            "asset processing",
1065            processor = std::any::type_name::<T>(),
1066            asset = context.path().to_string(),
1067        );
1068        self.0.process(context, meta, writer).instrument(span)
1069    }
1070}
1071
1072/// The (successful) result of processing an asset
1073#[derive(Debug, Clone)]
1074pub enum ProcessResult {
1075    Processed(ProcessedInfo),
1076    SkippedNotChanged,
1077    Ignored,
1078}
1079
1080/// The final status of processing an asset
1081#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1082pub enum ProcessStatus {
1083    Processed,
1084    Failed,
1085    NonExistent,
1086}
1087
1088// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1089#[derive(Debug)]
1090pub(crate) struct ProcessorAssetInfo {
1091    processed_info: Option<ProcessedInfo>,
1092    /// Paths of assets that depend on this asset when they are being processed.
1093    dependants: HashSet<AssetPath<'static>>,
1094    status: Option<ProcessStatus>,
1095    /// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1096    /// _This lock must be locked whenever a read or write to processed assets occurs_
1097    /// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1098    /// * when the processor is running in parallel with an app
1099    /// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1100    ///     * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1101    /// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1102    /// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1103    pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1104    status_sender: async_broadcast::Sender<ProcessStatus>,
1105    status_receiver: async_broadcast::Receiver<ProcessStatus>,
1106}
1107
1108impl Default for ProcessorAssetInfo {
1109    fn default() -> Self {
1110        let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1111        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1112        // not block if there was older state present.
1113        status_sender.set_overflow(true);
1114        Self {
1115            processed_info: Default::default(),
1116            dependants: Default::default(),
1117            file_transaction_lock: Default::default(),
1118            status: None,
1119            status_sender,
1120            status_receiver,
1121        }
1122    }
1123}
1124
1125impl ProcessorAssetInfo {
1126    async fn update_status(&mut self, status: ProcessStatus) {
1127        if self.status != Some(status) {
1128            self.status = Some(status);
1129            self.status_sender.broadcast(status).await.unwrap();
1130        }
1131    }
1132}
1133
1134/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1135/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1136/// consistent as events are processed.
1137#[derive(Default, Debug)]
1138pub struct ProcessorAssetInfos {
1139    /// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1140    /// be considered non-existent.
1141    /// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1142    /// `non_existent_dependants` DATA IS CONSUMED
1143    infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1144    /// Dependants for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1145    /// If the dependant asset is added, it can "resolve" these dependencies and re-compute those assets.
1146    /// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1147    /// check this maps for dependencies and add them. If an asset is removed, it should update the dependants here.
1148    non_existent_dependants: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1149    check_reprocess_queue: VecDeque<AssetPath<'static>>,
1150}
1151
1152impl ProcessorAssetInfos {
1153    fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1154        self.infos.entry(asset_path.clone()).or_insert_with(|| {
1155            let mut info = ProcessorAssetInfo::default();
1156            // track existing dependants by resolving existing "hanging" dependants.
1157            if let Some(dependants) = self.non_existent_dependants.remove(&asset_path) {
1158                info.dependants = dependants;
1159            }
1160            info
1161        })
1162    }
1163
1164    pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1165        self.infos.get(asset_path)
1166    }
1167
1168    fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1169        self.infos.get_mut(asset_path)
1170    }
1171
1172    fn add_dependant(&mut self, asset_path: &AssetPath<'static>, dependant: AssetPath<'static>) {
1173        if let Some(info) = self.get_mut(asset_path) {
1174            info.dependants.insert(dependant);
1175        } else {
1176            let dependants = self
1177                .non_existent_dependants
1178                .entry(asset_path.clone())
1179                .or_default();
1180            dependants.insert(dependant);
1181        }
1182    }
1183
1184    /// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1185    async fn finish_processing(
1186        &mut self,
1187        asset_path: AssetPath<'static>,
1188        result: Result<ProcessResult, ProcessError>,
1189    ) {
1190        match result {
1191            Ok(ProcessResult::Processed(processed_info)) => {
1192                debug!("Finished processing \"{:?}\"", asset_path);
1193                // clean up old dependants
1194                let old_processed_info = self
1195                    .infos
1196                    .get_mut(&asset_path)
1197                    .and_then(|i| i.processed_info.take());
1198                if let Some(old_processed_info) = old_processed_info {
1199                    self.clear_dependencies(&asset_path, old_processed_info);
1200                }
1201
1202                // populate new dependants
1203                for process_dependency_info in &processed_info.process_dependencies {
1204                    self.add_dependant(&process_dependency_info.path, asset_path.to_owned());
1205                }
1206                let info = self.get_or_insert(asset_path);
1207                info.processed_info = Some(processed_info);
1208                info.update_status(ProcessStatus::Processed).await;
1209                let dependants = info.dependants.iter().cloned().collect::<Vec<_>>();
1210                for path in dependants {
1211                    self.check_reprocess_queue.push_back(path);
1212                }
1213            }
1214            Ok(ProcessResult::SkippedNotChanged) => {
1215                debug!("Skipping processing (unchanged) \"{:?}\"", asset_path);
1216                let info = self.get_mut(&asset_path).expect("info should exist");
1217                // NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1218                // of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1219                // This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1220                // Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1221                // If "block until latest state is reflected" is required, we can easily add a less granular
1222                // "block until first pass finished" mode
1223                info.update_status(ProcessStatus::Processed).await;
1224            }
1225            Ok(ProcessResult::Ignored) => {
1226                debug!("Skipping processing (ignored) \"{:?}\"", asset_path);
1227            }
1228            Err(ProcessError::ExtensionRequired) => {
1229                // Skip assets without extensions
1230            }
1231            Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1232                trace!("No loader found for {asset_path}");
1233            }
1234            Err(ProcessError::AssetReaderError {
1235                err: AssetReaderError::NotFound(_),
1236                ..
1237            }) => {
1238                // if there is no asset source, no processing can be done
1239                trace!("No need to process asset {asset_path} because it does not exist");
1240            }
1241            Err(err) => {
1242                error!("Failed to process asset {asset_path}: {err}");
1243                // if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1244                if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1245                    err
1246                {
1247                    let info = self.get_mut(&asset_path).expect("info should exist");
1248                    info.processed_info = Some(ProcessedInfo {
1249                        hash: AssetHash::default(),
1250                        full_hash: AssetHash::default(),
1251                        process_dependencies: vec![],
1252                    });
1253                    self.add_dependant(dependency.path(), asset_path.to_owned());
1254                }
1255
1256                let info = self.get_mut(&asset_path).expect("info should exist");
1257                info.update_status(ProcessStatus::Failed).await;
1258            }
1259        }
1260    }
1261
1262    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1263    async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1264        let info = self.infos.remove(asset_path);
1265        if let Some(info) = info {
1266            if let Some(processed_info) = info.processed_info {
1267                self.clear_dependencies(asset_path, processed_info);
1268            }
1269            // Tell all listeners this asset does not exist
1270            info.status_sender
1271                .broadcast(ProcessStatus::NonExistent)
1272                .await
1273                .unwrap();
1274            if !info.dependants.is_empty() {
1275                error!(
1276                    "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1277                    info.dependants
1278                );
1279                self.non_existent_dependants
1280                    .insert(asset_path.clone(), info.dependants);
1281            }
1282        }
1283    }
1284
1285    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1286    async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1287        let info = self.infos.remove(old);
1288        if let Some(mut info) = info {
1289            if !info.dependants.is_empty() {
1290                // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1291                // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1292                // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1293                // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1294                // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1295                // TODO: it would be nice to log an error here for dependants that aren't also being moved + fixed.
1296                // (see the remove impl).
1297                error!(
1298                    "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1299                    info.dependants
1300                );
1301                self.non_existent_dependants
1302                    .insert(old.clone(), std::mem::take(&mut info.dependants));
1303            }
1304            if let Some(processed_info) = &info.processed_info {
1305                // Update "dependant" lists for this asset's "process dependencies" to use new path.
1306                for dep in &processed_info.process_dependencies {
1307                    if let Some(info) = self.infos.get_mut(&dep.path) {
1308                        info.dependants.remove(old);
1309                        info.dependants.insert(new.clone());
1310                    } else if let Some(dependants) = self.non_existent_dependants.get_mut(&dep.path)
1311                    {
1312                        dependants.remove(old);
1313                        dependants.insert(new.clone());
1314                    }
1315                }
1316            }
1317            // Tell all listeners this asset no longer exists
1318            info.status_sender
1319                .broadcast(ProcessStatus::NonExistent)
1320                .await
1321                .unwrap();
1322            let dependants: Vec<AssetPath<'static>> = {
1323                let new_info = self.get_or_insert(new.clone());
1324                new_info.processed_info = info.processed_info;
1325                new_info.status = info.status;
1326                // Ensure things waiting on the new path are informed of the status of this asset
1327                if let Some(status) = new_info.status {
1328                    new_info.status_sender.broadcast(status).await.unwrap();
1329                }
1330                new_info.dependants.iter().cloned().collect()
1331            };
1332            // Queue the asset for a reprocess check, in case it needs new meta.
1333            self.check_reprocess_queue.push_back(new.clone());
1334            for dependant in dependants {
1335                // Queue dependants for reprocessing because they might have been waiting for this asset.
1336                self.check_reprocess_queue.push_back(dependant);
1337            }
1338        }
1339    }
1340
1341    fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1342        for old_load_dep in removed_info.process_dependencies {
1343            if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1344                info.dependants.remove(asset_path);
1345            } else if let Some(dependants) =
1346                self.non_existent_dependants.get_mut(&old_load_dep.path)
1347            {
1348                dependants.remove(asset_path);
1349            }
1350        }
1351    }
1352}
1353
1354/// The current state of the [`AssetProcessor`].
1355#[derive(Copy, Clone, PartialEq, Eq)]
1356pub enum ProcessorState {
1357    /// The processor is still initializing, which involves scanning the current asset folders,
1358    /// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1359    /// and cleaning up old / unused assets.
1360    Initializing,
1361    /// The processor is currently processing assets.
1362    Processing,
1363    /// The processor has finished processing all valid assets and reporting invalid assets.
1364    Finished,
1365}
1366
1367/// An error that occurs when initializing the [`AssetProcessor`].
1368#[derive(Error, Debug)]
1369pub enum InitializeError {
1370    #[error(transparent)]
1371    FailedToReadSourcePaths(AssetReaderError),
1372    #[error(transparent)]
1373    FailedToReadDestinationPaths(AssetReaderError),
1374    #[error("Failed to validate asset log: {0}")]
1375    ValidateLogError(ValidateLogError),
1376}