1mod log;
2mod process;
3
4pub use log::*;
5pub use process::*;
6
7use crate::{
8 io::{
9 AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
10 AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
11 MissingAssetSourceError,
12 },
13 meta::{
14 get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
15 AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
16 },
17 AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
18 MissingAssetLoaderForExtensionError,
19};
20use bevy_ecs::prelude::*;
21use bevy_tasks::IoTaskPool;
22use bevy_utils::tracing::{debug, error, trace, warn};
23#[cfg(feature = "trace")]
24use bevy_utils::{
25 tracing::{info_span, instrument::Instrument},
26 ConditionalSendFuture,
27};
28use bevy_utils::{HashMap, HashSet};
29use futures_io::ErrorKind;
30use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
31use parking_lot::RwLock;
32use std::{
33 collections::VecDeque,
34 path::{Path, PathBuf},
35 sync::Arc,
36};
37use thiserror::Error;
38
39#[allow(unused_imports)]
41use crate::io::{AssetReader, AssetWriter};
42
43#[derive(Resource, Clone)]
59pub struct AssetProcessor {
60 server: AssetServer,
61 pub(crate) data: Arc<AssetProcessorData>,
62}
63
64pub struct AssetProcessorData {
65 pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
66 log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
67 processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
68 default_processors: RwLock<HashMap<Box<str>, &'static str>>,
70 state: async_lock::RwLock<ProcessorState>,
71 sources: AssetSources,
72 initialized_sender: async_broadcast::Sender<()>,
73 initialized_receiver: async_broadcast::Receiver<()>,
74 finished_sender: async_broadcast::Sender<()>,
75 finished_receiver: async_broadcast::Receiver<()>,
76}
77
78impl AssetProcessor {
79 pub fn new(source: &mut AssetSourceBuilders) -> Self {
81 let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
82 let mut sources = source.build_sources(false, false);
84 sources.gate_on_processor(data.clone());
85 let server = AssetServer::new_with_meta_check(
86 sources,
87 AssetServerMode::Processed,
88 AssetMetaCheck::Always,
89 false,
90 );
91 Self { server, data }
92 }
93
94 pub fn server(&self) -> &AssetServer {
97 &self.server
98 }
99
100 async fn set_state(&self, state: ProcessorState) {
101 let mut state_guard = self.data.state.write().await;
102 let last_state = *state_guard;
103 *state_guard = state;
104 if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
105 self.data.finished_sender.broadcast(()).await.unwrap();
106 } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
107 self.data.initialized_sender.broadcast(()).await.unwrap();
108 }
109 }
110
111 pub async fn get_state(&self) -> ProcessorState {
113 *self.data.state.read().await
114 }
115
116 #[inline]
118 pub fn get_source<'a, 'b>(
119 &'a self,
120 id: impl Into<AssetSourceId<'b>>,
121 ) -> Result<&'a AssetSource, MissingAssetSourceError> {
122 self.data.sources.get(id.into())
123 }
124
125 #[inline]
126 pub fn sources(&self) -> &AssetSources {
127 &self.data.sources
128 }
129
130 async fn log_unrecoverable(&self) {
133 let mut log = self.data.log.write().await;
134 let log = log.as_mut().unwrap();
135 log.unrecoverable().await.unwrap();
136 }
137
138 async fn log_begin_processing(&self, path: &AssetPath<'_>) {
141 let mut log = self.data.log.write().await;
142 let log = log.as_mut().unwrap();
143 log.begin_processing(path).await.unwrap();
144 }
145
146 async fn log_end_processing(&self, path: &AssetPath<'_>) {
148 let mut log = self.data.log.write().await;
149 let log = log.as_mut().unwrap();
150 log.end_processing(path).await.unwrap();
151 }
152
153 pub fn start(_processor: Res<Self>) {
155 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
156 error!("Cannot run AssetProcessor in single threaded mode (or WASM) yet.");
157 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
158 {
159 let processor = _processor.clone();
160 std::thread::spawn(move || {
161 processor.process_assets();
162 bevy_tasks::block_on(processor.listen_for_source_change_events());
163 });
164 }
165 }
166
167 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
175 pub fn process_assets(&self) {
176 let start_time = std::time::Instant::now();
177 debug!("Processing Assets");
178 IoTaskPool::get().scope(|scope| {
179 scope.spawn(async move {
180 self.initialize().await.unwrap();
181 for source in self.sources().iter_processed() {
182 self.process_assets_internal(scope, source, PathBuf::from(""))
183 .await
184 .unwrap();
185 }
186 });
187 });
188 bevy_tasks::block_on(self.finish_processing_assets());
191 let end_time = std::time::Instant::now();
192 debug!("Processing finished in {:?}", end_time - start_time);
193 }
194
195 pub async fn listen_for_source_change_events(&self) {
198 debug!("Listening for changes to source assets");
199 loop {
200 let mut started_processing = false;
201
202 for source in self.data.sources.iter_processed() {
203 if let Some(receiver) = source.event_receiver() {
204 for event in receiver.try_iter() {
205 if !started_processing {
206 self.set_state(ProcessorState::Processing).await;
207 started_processing = true;
208 }
209
210 self.handle_asset_source_event(source, event).await;
211 }
212 }
213 }
214
215 if started_processing {
216 self.finish_processing_assets().await;
217 }
218 }
219 }
220
221 async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
222 trace!("{event:?}");
223 match event {
224 AssetSourceEvent::AddedAsset(path)
225 | AssetSourceEvent::AddedMeta(path)
226 | AssetSourceEvent::ModifiedAsset(path)
227 | AssetSourceEvent::ModifiedMeta(path) => {
228 self.process_asset(source, path).await;
229 }
230 AssetSourceEvent::RemovedAsset(path) => {
231 self.handle_removed_asset(source, path).await;
232 }
233 AssetSourceEvent::RemovedMeta(path) => {
234 self.handle_removed_meta(source, path).await;
235 }
236 AssetSourceEvent::AddedFolder(path) => {
237 self.handle_added_folder(source, path).await;
238 }
239 AssetSourceEvent::RemovedFolder(path) => {
243 self.handle_removed_folder(source, &path).await;
244 }
245 AssetSourceEvent::RenamedAsset { old, new } => {
246 if old == new {
249 self.process_asset(source, new).await;
250 } else {
251 self.handle_renamed_asset(source, old, new).await;
252 }
253 }
254 AssetSourceEvent::RenamedMeta { old, new } => {
255 if old == new {
258 self.process_asset(source, new).await;
259 } else {
260 debug!("Meta renamed from {old:?} to {new:?}");
261 let mut infos = self.data.asset_infos.write().await;
262 let new_asset_path = AssetPath::from(new).with_source(source.id());
265 let old_asset_path = AssetPath::from(old).with_source(source.id());
266 infos.check_reprocess_queue.push_back(old_asset_path);
267 infos.check_reprocess_queue.push_back(new_asset_path);
268 }
269 }
270 AssetSourceEvent::RenamedFolder { old, new } => {
271 if old == new {
274 self.handle_added_folder(source, new).await;
275 } else {
276 self.handle_removed_folder(source, &old).await;
279 self.handle_added_folder(source, new).await;
280 }
281 }
282 AssetSourceEvent::RemovedUnknown { path, is_meta } => {
283 let processed_reader = source.processed_reader().unwrap();
284 match processed_reader.is_directory(&path).await {
285 Ok(is_directory) => {
286 if is_directory {
287 self.handle_removed_folder(source, &path).await;
288 } else if is_meta {
289 self.handle_removed_meta(source, path).await;
290 } else {
291 self.handle_removed_asset(source, path).await;
292 }
293 }
294 Err(err) => {
295 match err {
296 AssetReaderError::NotFound(_) => {
297 }
299 AssetReaderError::Io(err) => {
300 error!(
301 "Path '{}' was removed, but the destination reader could not determine if it \
302 was a folder or a file due to the following error: {err}",
303 AssetPath::from_path(&path).with_source(source.id())
304 );
305 }
306 AssetReaderError::HttpError(status) => {
307 error!(
308 "Path '{}' was removed, but the destination reader could not determine if it \
309 was a folder or a file due to receiving an unexpected HTTP Status {status}",
310 AssetPath::from_path(&path).with_source(source.id())
311 );
312 }
313 }
314 }
315 }
316 }
317 }
318 }
319
320 async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
321 debug!(
322 "Folder {} was added. Attempting to re-process",
323 AssetPath::from_path(&path).with_source(source.id())
324 );
325 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
326 error!("AddFolder event cannot be handled in single threaded mode (or WASM) yet.");
327 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
328 IoTaskPool::get().scope(|scope| {
329 scope.spawn(async move {
330 self.process_assets_internal(scope, source, path)
331 .await
332 .unwrap();
333 });
334 });
335 }
336
337 async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
339 debug!(
344 "Meta for asset {:?} was removed. Attempting to re-process",
345 AssetPath::from_path(&path).with_source(source.id())
346 );
347 self.process_asset(source, path).await;
348 }
349
350 async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
352 debug!("Removing folder {:?} because source was removed", path);
353 let processed_reader = source.processed_reader().unwrap();
354 match processed_reader.read_directory(path).await {
355 Ok(mut path_stream) => {
356 while let Some(child_path) = path_stream.next().await {
357 self.handle_removed_asset(source, child_path).await;
358 }
359 }
360 Err(err) => match err {
361 AssetReaderError::NotFound(_err) => {
362 }
364 AssetReaderError::HttpError(status) => {
365 self.log_unrecoverable().await;
366 error!(
367 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
368 in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
369 );
370 }
371 AssetReaderError::Io(err) => {
372 self.log_unrecoverable().await;
373 error!(
374 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
375 in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
376 );
377 }
378 },
379 }
380 let processed_writer = source.processed_writer().unwrap();
381 if let Err(err) = processed_writer.remove_directory(path).await {
382 match err {
383 AssetWriterError::Io(err) => {
384 if err.kind() != ErrorKind::NotFound {
387 let asset_path = AssetPath::from_path(path).with_source(source.id());
388 error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
389 }
390 }
391 }
392 }
393 }
394
395 async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
398 let asset_path = AssetPath::from(path).with_source(source.id());
399 debug!("Removing processed {asset_path} because source was removed");
400 let mut infos = self.data.asset_infos.write().await;
401 if let Some(info) = infos.get(&asset_path) {
402 let _write_lock = info.file_transaction_lock.write();
405 self.remove_processed_asset_and_meta(source, asset_path.path())
406 .await;
407 }
408 infos.remove(&asset_path).await;
409 }
410
411 async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
414 let mut infos = self.data.asset_infos.write().await;
415 let old = AssetPath::from(old).with_source(source.id());
416 let new = AssetPath::from(new).with_source(source.id());
417 let processed_writer = source.processed_writer().unwrap();
418 if let Some(info) = infos.get(&old) {
419 let _write_lock = info.file_transaction_lock.write();
422 processed_writer
423 .rename(old.path(), new.path())
424 .await
425 .unwrap();
426 processed_writer
427 .rename_meta(old.path(), new.path())
428 .await
429 .unwrap();
430 }
431 infos.rename(&old, &new).await;
432 }
433
434 async fn finish_processing_assets(&self) {
435 self.try_reprocessing_queued().await;
436 self.server.data.infos.write().consume_handle_drop_events();
438 self.set_state(ProcessorState::Finished).await;
439 }
440
441 #[allow(unused)]
442 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
443 async fn process_assets_internal<'scope>(
444 &'scope self,
445 scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
446 source: &'scope AssetSource,
447 path: PathBuf,
448 ) -> Result<(), AssetReaderError> {
449 if source.reader().is_directory(&path).await? {
450 let mut path_stream = source.reader().read_directory(&path).await?;
451 while let Some(path) = path_stream.next().await {
452 Box::pin(self.process_assets_internal(scope, source, path)).await?;
453 }
454 } else {
455 let processor = self.clone();
457 scope.spawn(async move {
458 processor.process_asset(source, path).await;
459 });
460 }
461 Ok(())
462 }
463
464 async fn try_reprocessing_queued(&self) {
465 loop {
466 let mut check_reprocess_queue =
467 std::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
468 IoTaskPool::get().scope(|scope| {
469 for path in check_reprocess_queue.drain(..) {
470 let processor = self.clone();
471 let source = self.get_source(path.source()).unwrap();
472 scope.spawn(async move {
473 processor.process_asset(source, path.into()).await;
474 });
475 }
476 });
477 let infos = self.data.asset_infos.read().await;
478 if infos.check_reprocess_queue.is_empty() {
479 break;
480 }
481 }
482 }
483
484 pub fn register_processor<P: Process>(&self, processor: P) {
486 let mut process_plans = self.data.processors.write();
487 #[cfg(feature = "trace")]
488 let processor = InstrumentedAssetProcessor(processor);
489 process_plans.insert(std::any::type_name::<P>(), Arc::new(processor));
490 }
491
492 pub fn set_default_processor<P: Process>(&self, extension: &str) {
494 let mut default_processors = self.data.default_processors.write();
495 default_processors.insert(extension.into(), std::any::type_name::<P>());
496 }
497
498 pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
500 let default_processors = self.data.default_processors.read();
501 let key = default_processors.get(extension)?;
502 self.data.processors.read().get(key).cloned()
503 }
504
505 pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
507 let processors = self.data.processors.read();
508 processors.get(processor_type_name).cloned()
509 }
510
511 #[allow(unused)]
516 async fn initialize(&self) -> Result<(), InitializeError> {
517 self.validate_transaction_log_and_recover().await;
518 let mut asset_infos = self.data.asset_infos.write().await;
519
520 async fn get_asset_paths<'a>(
523 reader: &'a dyn ErasedAssetReader,
524 clean_empty_folders_writer: Option<&'a dyn ErasedAssetWriter>,
525 path: PathBuf,
526 paths: &'a mut Vec<PathBuf>,
527 ) -> Result<bool, AssetReaderError> {
528 if reader.is_directory(&path).await? {
529 let mut path_stream = reader.read_directory(&path).await?;
530 let mut contains_files = false;
531
532 while let Some(child_path) = path_stream.next().await {
533 contains_files |= Box::pin(get_asset_paths(
534 reader,
535 clean_empty_folders_writer,
536 child_path,
537 paths,
538 ))
539 .await?;
540 }
541 if !contains_files && path.parent().is_some() {
542 if let Some(writer) = clean_empty_folders_writer {
543 let _ = writer.remove_empty_directory(&path).await;
545 }
546 }
547 Ok(contains_files)
548 } else {
549 paths.push(path);
550 Ok(true)
551 }
552 }
553
554 for source in self.sources().iter_processed() {
555 let Ok(processed_reader) = source.processed_reader() else {
556 continue;
557 };
558 let Ok(processed_writer) = source.processed_writer() else {
559 continue;
560 };
561 let mut unprocessed_paths = Vec::new();
562 get_asset_paths(
563 source.reader(),
564 None,
565 PathBuf::from(""),
566 &mut unprocessed_paths,
567 )
568 .await
569 .map_err(InitializeError::FailedToReadSourcePaths)?;
570
571 let mut processed_paths = Vec::new();
572 get_asset_paths(
573 processed_reader,
574 Some(processed_writer),
575 PathBuf::from(""),
576 &mut processed_paths,
577 )
578 .await
579 .map_err(InitializeError::FailedToReadDestinationPaths)?;
580
581 for path in unprocessed_paths {
582 asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
583 }
584
585 for path in processed_paths {
586 let mut dependencies = Vec::new();
587 let asset_path = AssetPath::from(path).with_source(source.id());
588 if let Some(info) = asset_infos.get_mut(&asset_path) {
589 match processed_reader.read_meta_bytes(asset_path.path()).await {
590 Ok(meta_bytes) => {
591 match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
592 Ok(minimal) => {
593 trace!(
594 "Populated processed info for asset {asset_path} {:?}",
595 minimal.processed_info
596 );
597
598 if let Some(processed_info) = &minimal.processed_info {
599 for process_dependency_info in
600 &processed_info.process_dependencies
601 {
602 dependencies.push(process_dependency_info.path.clone());
603 }
604 }
605 info.processed_info = minimal.processed_info;
606 }
607 Err(err) => {
608 trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
609 self.remove_processed_asset_and_meta(source, asset_path.path())
610 .await;
611 }
612 }
613 }
614 Err(err) => {
615 trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
616 self.remove_processed_asset_and_meta(source, asset_path.path())
617 .await;
618 }
619 }
620 } else {
621 trace!("Removing processed data for non-existent asset {asset_path}");
622 self.remove_processed_asset_and_meta(source, asset_path.path())
623 .await;
624 }
625
626 for dependency in dependencies {
627 asset_infos.add_dependant(&dependency, asset_path.clone());
628 }
629 }
630 }
631
632 self.set_state(ProcessorState::Processing).await;
633
634 Ok(())
635 }
636
637 async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
640 if let Err(err) = source.processed_writer().unwrap().remove(path).await {
641 warn!("Failed to remove non-existent asset {path:?}: {err}");
642 }
643
644 if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
645 warn!("Failed to remove non-existent meta {path:?}: {err}");
646 }
647
648 self.clean_empty_processed_ancestor_folders(source, path)
649 .await;
650 }
651
652 async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
653 if path.is_absolute() {
655 error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
656 return;
657 }
658 while let Some(parent) = path.parent() {
659 if parent == Path::new("") {
660 break;
661 }
662 if source
663 .processed_writer()
664 .unwrap()
665 .remove_empty_directory(parent)
666 .await
667 .is_err()
668 {
669 break;
671 }
672 }
673 }
674
675 async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
683 let asset_path = AssetPath::from(path).with_source(source.id());
684 let result = self.process_asset_internal(source, &asset_path).await;
685 let mut infos = self.data.asset_infos.write().await;
686 infos.finish_processing(asset_path, result).await;
687 }
688
689 async fn process_asset_internal(
690 &self,
691 source: &AssetSource,
692 asset_path: &AssetPath<'static>,
693 ) -> Result<ProcessResult, ProcessError> {
694 debug!("Processing {:?}", asset_path);
697 let server = &self.server;
698 let path = asset_path.path();
699 let reader = source.reader();
700
701 let reader_err = |err| ProcessError::AssetReaderError {
702 path: asset_path.clone(),
703 err,
704 };
705 let writer_err = |err| ProcessError::AssetWriterError {
706 path: asset_path.clone(),
707 err,
708 };
709
710 let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
712
713 let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
714 Ok(meta_bytes) => {
715 let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
716 ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
717 })?;
718 let (meta, processor) = match minimal.asset {
719 AssetActionMinimal::Load { loader } => {
720 let loader = server.get_asset_loader_with_type_name(&loader).await?;
721 let meta = loader.deserialize_meta(&meta_bytes)?;
722 (meta, None)
723 }
724 AssetActionMinimal::Process { processor } => {
725 let processor = self
726 .get_processor(&processor)
727 .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
728 let meta = processor.deserialize_meta(&meta_bytes)?;
729 (meta, Some(processor))
730 }
731 AssetActionMinimal::Ignore => {
732 return Ok(ProcessResult::Ignored);
733 }
734 };
735 (meta, meta_bytes, processor)
736 }
737 Err(AssetReaderError::NotFound(_path)) => {
738 let (meta, processor) = if let Some(processor) = asset_path
739 .get_full_extension()
740 .and_then(|ext| self.get_default_processor(&ext))
741 {
742 let meta = processor.default_meta();
743 (meta, Some(processor))
744 } else {
745 match server.get_path_asset_loader(asset_path.clone()).await {
746 Ok(loader) => (loader.default_meta(), None),
747 Err(MissingAssetLoaderForExtensionError { .. }) => {
748 let meta: Box<dyn AssetMetaDyn> =
749 Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
750 (meta, None)
751 }
752 }
753 };
754 let meta_bytes = meta.serialize();
755 source
757 .writer()?
758 .write_meta_bytes(path, &meta_bytes)
759 .await
760 .map_err(writer_err)?;
761 (meta, meta_bytes, processor)
762 }
763 Err(err) => {
764 return Err(ProcessError::ReadAssetMetaError {
765 path: asset_path.clone(),
766 err,
767 })
768 }
769 };
770
771 let processed_writer = source.processed_writer()?;
772
773 let mut asset_bytes = Vec::new();
774 byte_reader
775 .read_to_end(&mut asset_bytes)
776 .await
777 .map_err(|e| ProcessError::AssetReaderError {
778 path: asset_path.clone(),
779 err: AssetReaderError::Io(e.into()),
780 })?;
781
782 let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
786 let mut new_processed_info = ProcessedInfo {
787 hash: new_hash,
788 full_hash: new_hash,
789 process_dependencies: Vec::new(),
790 };
791
792 {
793 let infos = self.data.asset_infos.read().await;
794 if let Some(current_processed_info) = infos
795 .get(asset_path)
796 .and_then(|i| i.processed_info.as_ref())
797 {
798 if current_processed_info.hash == new_hash {
799 let mut dependency_changed = false;
800 for current_dep_info in ¤t_processed_info.process_dependencies {
801 let live_hash = infos
802 .get(¤t_dep_info.path)
803 .and_then(|i| i.processed_info.as_ref())
804 .map(|i| i.full_hash);
805 if live_hash != Some(current_dep_info.full_hash) {
806 dependency_changed = true;
807 break;
808 }
809 }
810 if !dependency_changed {
811 return Ok(ProcessResult::SkippedNotChanged);
812 }
813 }
814 }
815 }
816 let _transaction_lock = {
819 let mut infos = self.data.asset_infos.write().await;
820 let info = infos.get_or_insert(asset_path.clone());
821 info.file_transaction_lock.write_arc().await
822 };
823
824 self.log_begin_processing(asset_path).await;
828 if let Some(processor) = processor {
829 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
830 let mut processed_meta = {
831 let mut context =
832 ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
833 processor
834 .process(&mut context, source_meta, &mut *writer)
835 .await?
836 };
837
838 writer
839 .flush()
840 .await
841 .map_err(|e| ProcessError::AssetWriterError {
842 path: asset_path.clone(),
843 err: AssetWriterError::Io(e),
844 })?;
845
846 let full_hash = get_full_asset_hash(
847 new_hash,
848 new_processed_info
849 .process_dependencies
850 .iter()
851 .map(|i| i.full_hash),
852 );
853 new_processed_info.full_hash = full_hash;
854 *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
855 let meta_bytes = processed_meta.serialize();
856 processed_writer
857 .write_meta_bytes(path, &meta_bytes)
858 .await
859 .map_err(writer_err)?;
860 } else {
861 processed_writer
862 .write_bytes(path, &asset_bytes)
863 .await
864 .map_err(writer_err)?;
865 *source_meta.processed_info_mut() = Some(new_processed_info.clone());
866 let meta_bytes = source_meta.serialize();
867 processed_writer
868 .write_meta_bytes(path, &meta_bytes)
869 .await
870 .map_err(writer_err)?;
871 }
872 self.log_end_processing(asset_path).await;
873
874 Ok(ProcessResult::Processed(new_processed_info))
875 }
876
877 async fn validate_transaction_log_and_recover(&self) {
878 if let Err(err) = ProcessorTransactionLog::validate().await {
879 let state_is_valid = match err {
880 ValidateLogError::ReadLogError(err) => {
881 error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
882 false
883 }
884 ValidateLogError::UnrecoverableError => {
885 error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
886 false
887 }
888 ValidateLogError::EntryErrors(entry_errors) => {
889 let mut state_is_valid = true;
890 for entry_error in entry_errors {
891 match entry_error {
892 LogEntryError::DuplicateTransaction(_)
893 | LogEntryError::EndedMissingTransaction(_) => {
894 error!("{}", entry_error);
895 state_is_valid = false;
896 break;
897 }
898 LogEntryError::UnfinishedTransaction(path) => {
899 debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
900 let mut unrecoverable_err = |message: &dyn std::fmt::Display| {
901 error!("Failed to remove asset {path:?}: {message}");
902 state_is_valid = false;
903 };
904 let Ok(source) = self.get_source(path.source()) else {
905 unrecoverable_err(&"AssetSource does not exist");
906 continue;
907 };
908 let Ok(processed_writer) = source.processed_writer() else {
909 unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
910 continue;
911 };
912
913 if let Err(err) = processed_writer.remove(path.path()).await {
914 match err {
915 AssetWriterError::Io(err) => {
916 if err.kind() != ErrorKind::NotFound {
918 unrecoverable_err(&err);
919 }
920 }
921 }
922 }
923 if let Err(err) = processed_writer.remove_meta(path.path()).await {
924 match err {
925 AssetWriterError::Io(err) => {
926 if err.kind() != ErrorKind::NotFound {
928 unrecoverable_err(&err);
929 }
930 }
931 }
932 }
933 }
934 }
935 }
936 state_is_valid
937 }
938 };
939
940 if !state_is_valid {
941 error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
942 for source in self.sources().iter_processed() {
943 let Ok(processed_writer) = source.processed_writer() else {
944 continue;
945 };
946 if let Err(err) = processed_writer
947 .remove_assets_in_directory(Path::new(""))
948 .await
949 {
950 panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
951 }
952 }
953 }
954 }
955 let mut log = self.data.log.write().await;
956 *log = match ProcessorTransactionLog::new().await {
957 Ok(log) => Some(log),
958 Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
959 };
960 }
961}
962
963impl AssetProcessorData {
964 pub fn new(source: AssetSources) -> Self {
965 let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
966 let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
967 finished_sender.set_overflow(true);
970 initialized_sender.set_overflow(true);
971
972 AssetProcessorData {
973 sources: source,
974 finished_sender,
975 finished_receiver,
976 initialized_sender,
977 initialized_receiver,
978 state: async_lock::RwLock::new(ProcessorState::Initializing),
979 log: Default::default(),
980 processors: Default::default(),
981 asset_infos: Default::default(),
982 default_processors: Default::default(),
983 }
984 }
985
986 pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
988 self.wait_until_initialized().await;
989 let mut receiver = {
990 let infos = self.asset_infos.write().await;
991 let info = infos.get(&path);
992 match info {
993 Some(info) => match info.status {
994 Some(result) => return result,
995 None => info.status_receiver.clone(),
997 },
998 None => return ProcessStatus::NonExistent,
999 }
1000 };
1001 receiver.recv().await.unwrap()
1002 }
1003
1004 pub async fn wait_until_initialized(&self) {
1006 let receiver = {
1007 let state = self.state.read().await;
1008 match *state {
1009 ProcessorState::Initializing => {
1010 Some(self.initialized_receiver.clone())
1012 }
1013 _ => None,
1014 }
1015 };
1016
1017 if let Some(mut receiver) = receiver {
1018 receiver.recv().await.unwrap();
1019 }
1020 }
1021
1022 pub async fn wait_until_finished(&self) {
1024 let receiver = {
1025 let state = self.state.read().await;
1026 match *state {
1027 ProcessorState::Initializing | ProcessorState::Processing => {
1028 Some(self.finished_receiver.clone())
1030 }
1031 ProcessorState::Finished => None,
1032 }
1033 };
1034
1035 if let Some(mut receiver) = receiver {
1036 receiver.recv().await.unwrap();
1037 }
1038 }
1039}
1040
1041#[cfg(feature = "trace")]
1042struct InstrumentedAssetProcessor<T>(T);
1043
1044#[cfg(feature = "trace")]
1045impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1046 type Settings = T::Settings;
1047 type OutputLoader = T::OutputLoader;
1048
1049 fn process<'a>(
1050 &'a self,
1051 context: &'a mut ProcessContext,
1052 meta: AssetMeta<(), Self>,
1053 writer: &'a mut crate::io::Writer,
1054 ) -> impl ConditionalSendFuture<
1055 Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1056 > {
1057 let meta = AssetMeta {
1059 meta_format_version: meta.meta_format_version,
1060 processed_info: meta.processed_info,
1061 asset: meta.asset,
1062 };
1063 let span = info_span!(
1064 "asset processing",
1065 processor = std::any::type_name::<T>(),
1066 asset = context.path().to_string(),
1067 );
1068 self.0.process(context, meta, writer).instrument(span)
1069 }
1070}
1071
1072#[derive(Debug, Clone)]
1074pub enum ProcessResult {
1075 Processed(ProcessedInfo),
1076 SkippedNotChanged,
1077 Ignored,
1078}
1079
1080#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1082pub enum ProcessStatus {
1083 Processed,
1084 Failed,
1085 NonExistent,
1086}
1087
1088#[derive(Debug)]
1090pub(crate) struct ProcessorAssetInfo {
1091 processed_info: Option<ProcessedInfo>,
1092 dependants: HashSet<AssetPath<'static>>,
1094 status: Option<ProcessStatus>,
1095 pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1104 status_sender: async_broadcast::Sender<ProcessStatus>,
1105 status_receiver: async_broadcast::Receiver<ProcessStatus>,
1106}
1107
1108impl Default for ProcessorAssetInfo {
1109 fn default() -> Self {
1110 let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1111 status_sender.set_overflow(true);
1114 Self {
1115 processed_info: Default::default(),
1116 dependants: Default::default(),
1117 file_transaction_lock: Default::default(),
1118 status: None,
1119 status_sender,
1120 status_receiver,
1121 }
1122 }
1123}
1124
1125impl ProcessorAssetInfo {
1126 async fn update_status(&mut self, status: ProcessStatus) {
1127 if self.status != Some(status) {
1128 self.status = Some(status);
1129 self.status_sender.broadcast(status).await.unwrap();
1130 }
1131 }
1132}
1133
1134#[derive(Default, Debug)]
1138pub struct ProcessorAssetInfos {
1139 infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1144 non_existent_dependants: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1149 check_reprocess_queue: VecDeque<AssetPath<'static>>,
1150}
1151
1152impl ProcessorAssetInfos {
1153 fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1154 self.infos.entry(asset_path.clone()).or_insert_with(|| {
1155 let mut info = ProcessorAssetInfo::default();
1156 if let Some(dependants) = self.non_existent_dependants.remove(&asset_path) {
1158 info.dependants = dependants;
1159 }
1160 info
1161 })
1162 }
1163
1164 pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1165 self.infos.get(asset_path)
1166 }
1167
1168 fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1169 self.infos.get_mut(asset_path)
1170 }
1171
1172 fn add_dependant(&mut self, asset_path: &AssetPath<'static>, dependant: AssetPath<'static>) {
1173 if let Some(info) = self.get_mut(asset_path) {
1174 info.dependants.insert(dependant);
1175 } else {
1176 let dependants = self
1177 .non_existent_dependants
1178 .entry(asset_path.clone())
1179 .or_default();
1180 dependants.insert(dependant);
1181 }
1182 }
1183
1184 async fn finish_processing(
1186 &mut self,
1187 asset_path: AssetPath<'static>,
1188 result: Result<ProcessResult, ProcessError>,
1189 ) {
1190 match result {
1191 Ok(ProcessResult::Processed(processed_info)) => {
1192 debug!("Finished processing \"{:?}\"", asset_path);
1193 let old_processed_info = self
1195 .infos
1196 .get_mut(&asset_path)
1197 .and_then(|i| i.processed_info.take());
1198 if let Some(old_processed_info) = old_processed_info {
1199 self.clear_dependencies(&asset_path, old_processed_info);
1200 }
1201
1202 for process_dependency_info in &processed_info.process_dependencies {
1204 self.add_dependant(&process_dependency_info.path, asset_path.to_owned());
1205 }
1206 let info = self.get_or_insert(asset_path);
1207 info.processed_info = Some(processed_info);
1208 info.update_status(ProcessStatus::Processed).await;
1209 let dependants = info.dependants.iter().cloned().collect::<Vec<_>>();
1210 for path in dependants {
1211 self.check_reprocess_queue.push_back(path);
1212 }
1213 }
1214 Ok(ProcessResult::SkippedNotChanged) => {
1215 debug!("Skipping processing (unchanged) \"{:?}\"", asset_path);
1216 let info = self.get_mut(&asset_path).expect("info should exist");
1217 info.update_status(ProcessStatus::Processed).await;
1224 }
1225 Ok(ProcessResult::Ignored) => {
1226 debug!("Skipping processing (ignored) \"{:?}\"", asset_path);
1227 }
1228 Err(ProcessError::ExtensionRequired) => {
1229 }
1231 Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1232 trace!("No loader found for {asset_path}");
1233 }
1234 Err(ProcessError::AssetReaderError {
1235 err: AssetReaderError::NotFound(_),
1236 ..
1237 }) => {
1238 trace!("No need to process asset {asset_path} because it does not exist");
1240 }
1241 Err(err) => {
1242 error!("Failed to process asset {asset_path}: {err}");
1243 if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1245 err
1246 {
1247 let info = self.get_mut(&asset_path).expect("info should exist");
1248 info.processed_info = Some(ProcessedInfo {
1249 hash: AssetHash::default(),
1250 full_hash: AssetHash::default(),
1251 process_dependencies: vec![],
1252 });
1253 self.add_dependant(dependency.path(), asset_path.to_owned());
1254 }
1255
1256 let info = self.get_mut(&asset_path).expect("info should exist");
1257 info.update_status(ProcessStatus::Failed).await;
1258 }
1259 }
1260 }
1261
1262 async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1264 let info = self.infos.remove(asset_path);
1265 if let Some(info) = info {
1266 if let Some(processed_info) = info.processed_info {
1267 self.clear_dependencies(asset_path, processed_info);
1268 }
1269 info.status_sender
1271 .broadcast(ProcessStatus::NonExistent)
1272 .await
1273 .unwrap();
1274 if !info.dependants.is_empty() {
1275 error!(
1276 "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1277 info.dependants
1278 );
1279 self.non_existent_dependants
1280 .insert(asset_path.clone(), info.dependants);
1281 }
1282 }
1283 }
1284
1285 async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1287 let info = self.infos.remove(old);
1288 if let Some(mut info) = info {
1289 if !info.dependants.is_empty() {
1290 error!(
1298 "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1299 info.dependants
1300 );
1301 self.non_existent_dependants
1302 .insert(old.clone(), std::mem::take(&mut info.dependants));
1303 }
1304 if let Some(processed_info) = &info.processed_info {
1305 for dep in &processed_info.process_dependencies {
1307 if let Some(info) = self.infos.get_mut(&dep.path) {
1308 info.dependants.remove(old);
1309 info.dependants.insert(new.clone());
1310 } else if let Some(dependants) = self.non_existent_dependants.get_mut(&dep.path)
1311 {
1312 dependants.remove(old);
1313 dependants.insert(new.clone());
1314 }
1315 }
1316 }
1317 info.status_sender
1319 .broadcast(ProcessStatus::NonExistent)
1320 .await
1321 .unwrap();
1322 let dependants: Vec<AssetPath<'static>> = {
1323 let new_info = self.get_or_insert(new.clone());
1324 new_info.processed_info = info.processed_info;
1325 new_info.status = info.status;
1326 if let Some(status) = new_info.status {
1328 new_info.status_sender.broadcast(status).await.unwrap();
1329 }
1330 new_info.dependants.iter().cloned().collect()
1331 };
1332 self.check_reprocess_queue.push_back(new.clone());
1334 for dependant in dependants {
1335 self.check_reprocess_queue.push_back(dependant);
1337 }
1338 }
1339 }
1340
1341 fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1342 for old_load_dep in removed_info.process_dependencies {
1343 if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1344 info.dependants.remove(asset_path);
1345 } else if let Some(dependants) =
1346 self.non_existent_dependants.get_mut(&old_load_dep.path)
1347 {
1348 dependants.remove(asset_path);
1349 }
1350 }
1351 }
1352}
1353
1354#[derive(Copy, Clone, PartialEq, Eq)]
1356pub enum ProcessorState {
1357 Initializing,
1361 Processing,
1363 Finished,
1365}
1366
1367#[derive(Error, Debug)]
1369pub enum InitializeError {
1370 #[error(transparent)]
1371 FailedToReadSourcePaths(AssetReaderError),
1372 #[error(transparent)]
1373 FailedToReadDestinationPaths(AssetReaderError),
1374 #[error("Failed to validate asset log: {0}")]
1375 ValidateLogError(ValidateLogError),
1376}