bevy_asset/processor/
log.rs

1use crate::AssetPath;
2use async_fs::File;
3use bevy_utils::tracing::error;
4use bevy_utils::HashSet;
5use futures_lite::{AsyncReadExt, AsyncWriteExt};
6use std::path::PathBuf;
7use thiserror::Error;
8
9/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
10#[derive(Debug)]
11pub(crate) enum LogEntry {
12    BeginProcessing(AssetPath<'static>),
13    EndProcessing(AssetPath<'static>),
14    UnrecoverableError,
15}
16
17/// A "write ahead" logger that helps ensure asset importing is transactional.
18/// Prior to processing an asset, we write to the log to indicate it has started
19/// After processing an asset, we write to the log to indicate it has finished.
20/// On startup, the log can be read to determine if any transactions were incomplete.
21// TODO: this should be a trait
22pub struct ProcessorTransactionLog {
23    log_file: File,
24}
25
26/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
27#[derive(Error, Debug)]
28pub enum ReadLogError {
29    #[error("Encountered an invalid log line: '{0}'")]
30    InvalidLine(String),
31    #[error("Failed to read log file: {0}")]
32    Io(#[from] futures_io::Error),
33}
34
35/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
36#[derive(Error, Debug)]
37#[error(
38    "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
39)]
40pub struct WriteLogError {
41    log_entry: LogEntry,
42    error: futures_io::Error,
43}
44
45/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
46#[derive(Error, Debug)]
47pub enum ValidateLogError {
48    #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
49    UnrecoverableError,
50    #[error(transparent)]
51    ReadLogError(#[from] ReadLogError),
52    #[error("Encountered a duplicate process asset transaction: {0:?}")]
53    EntryErrors(Vec<LogEntryError>),
54}
55
56/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
57#[derive(Error, Debug)]
58pub enum LogEntryError {
59    #[error("Encountered a duplicate process asset transaction: {0}")]
60    DuplicateTransaction(AssetPath<'static>),
61    #[error("A transaction was ended that never started {0}")]
62    EndedMissingTransaction(AssetPath<'static>),
63    #[error("An asset started processing but never finished: {0}")]
64    UnfinishedTransaction(AssetPath<'static>),
65}
66
67const LOG_PATH: &str = "imported_assets/log";
68const ENTRY_BEGIN: &str = "Begin ";
69const ENTRY_END: &str = "End ";
70const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
71
72impl ProcessorTransactionLog {
73    fn full_log_path() -> PathBuf {
74        #[cfg(not(target_arch = "wasm32"))]
75        let base_path = crate::io::file::get_base_path();
76        #[cfg(target_arch = "wasm32")]
77        let base_path = PathBuf::new();
78        base_path.join(LOG_PATH)
79    }
80    /// Create a new, fresh log file. This will delete the previous log file if it exists.
81    pub(crate) async fn new() -> Result<Self, futures_io::Error> {
82        let path = Self::full_log_path();
83        match async_fs::remove_file(&path).await {
84            Ok(_) => { /* successfully removed file */ }
85            Err(err) => {
86                // if the log file is not found, we assume we are starting in a fresh (or good) state
87                if err.kind() != futures_io::ErrorKind::NotFound {
88                    error!("Failed to remove previous log file {}", err);
89                }
90            }
91        }
92
93        if let Some(parent_folder) = path.parent() {
94            async_fs::create_dir_all(parent_folder).await?;
95        }
96
97        Ok(Self {
98            log_file: File::create(path).await?,
99        })
100    }
101
102    pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
103        let mut log_lines = Vec::new();
104        let mut file = match File::open(Self::full_log_path()).await {
105            Ok(file) => file,
106            Err(err) => {
107                if err.kind() == futures_io::ErrorKind::NotFound {
108                    // if the log file doesn't exist, this is equivalent to an empty file
109                    return Ok(log_lines);
110                }
111                return Err(err.into());
112            }
113        };
114        let mut string = String::new();
115        file.read_to_string(&mut string).await?;
116        for line in string.lines() {
117            if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
118                log_lines.push(LogEntry::BeginProcessing(
119                    AssetPath::parse(path_str).into_owned(),
120                ));
121            } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
122                log_lines.push(LogEntry::EndProcessing(
123                    AssetPath::parse(path_str).into_owned(),
124                ));
125            } else if line.is_empty() {
126                continue;
127            } else {
128                return Err(ReadLogError::InvalidLine(line.to_string()));
129            }
130        }
131        Ok(log_lines)
132    }
133
134    pub(crate) async fn validate() -> Result<(), ValidateLogError> {
135        let mut transactions: HashSet<AssetPath<'static>> = Default::default();
136        let mut errors: Vec<LogEntryError> = Vec::new();
137        let entries = Self::read().await?;
138        for entry in entries {
139            match entry {
140                LogEntry::BeginProcessing(path) => {
141                    // There should never be duplicate "start transactions" in a log
142                    // Every start should be followed by:
143                    //    * nothing (if there was an abrupt stop)
144                    //    * an End (if the transaction was completed)
145                    if !transactions.insert(path.clone()) {
146                        errors.push(LogEntryError::DuplicateTransaction(path));
147                    }
148                }
149                LogEntry::EndProcessing(path) => {
150                    if !transactions.remove(&path) {
151                        errors.push(LogEntryError::EndedMissingTransaction(path));
152                    }
153                }
154                LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
155            }
156        }
157        for transaction in transactions {
158            errors.push(LogEntryError::UnfinishedTransaction(transaction));
159        }
160        if !errors.is_empty() {
161            return Err(ValidateLogError::EntryErrors(errors));
162        }
163        Ok(())
164    }
165
166    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`ProcessorTransactionLog::end_processing`],
167    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
168    pub(crate) async fn begin_processing(
169        &mut self,
170        path: &AssetPath<'_>,
171    ) -> Result<(), WriteLogError> {
172        self.write(&format!("{ENTRY_BEGIN}{path}\n"))
173            .await
174            .map_err(|e| WriteLogError {
175                log_entry: LogEntry::BeginProcessing(path.clone_owned()),
176                error: e,
177            })
178    }
179
180    /// Logs the end of an asset being successfully processed. See [`ProcessorTransactionLog::begin_processing`].
181    pub(crate) async fn end_processing(
182        &mut self,
183        path: &AssetPath<'_>,
184    ) -> Result<(), WriteLogError> {
185        self.write(&format!("{ENTRY_END}{path}\n"))
186            .await
187            .map_err(|e| WriteLogError {
188                log_entry: LogEntry::EndProcessing(path.clone_owned()),
189                error: e,
190            })
191    }
192
193    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
194    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
195    pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
196        self.write(UNRECOVERABLE_ERROR)
197            .await
198            .map_err(|e| WriteLogError {
199                log_entry: LogEntry::UnrecoverableError,
200                error: e,
201            })
202    }
203
204    async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
205        self.log_file.write_all(line.as_bytes()).await?;
206        self.log_file.flush().await?;
207        Ok(())
208    }
209}