bevy_asset/processor/
log.rs1use crate::AssetPath;
2use async_fs::File;
3use bevy_utils::tracing::error;
4use bevy_utils::HashSet;
5use futures_lite::{AsyncReadExt, AsyncWriteExt};
6use std::path::PathBuf;
7use thiserror::Error;
8
9#[derive(Debug)]
11pub(crate) enum LogEntry {
12 BeginProcessing(AssetPath<'static>),
13 EndProcessing(AssetPath<'static>),
14 UnrecoverableError,
15}
16
17pub struct ProcessorTransactionLog {
23 log_file: File,
24}
25
26#[derive(Error, Debug)]
28pub enum ReadLogError {
29 #[error("Encountered an invalid log line: '{0}'")]
30 InvalidLine(String),
31 #[error("Failed to read log file: {0}")]
32 Io(#[from] futures_io::Error),
33}
34
35#[derive(Error, Debug)]
37#[error(
38 "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
39)]
40pub struct WriteLogError {
41 log_entry: LogEntry,
42 error: futures_io::Error,
43}
44
45#[derive(Error, Debug)]
47pub enum ValidateLogError {
48 #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
49 UnrecoverableError,
50 #[error(transparent)]
51 ReadLogError(#[from] ReadLogError),
52 #[error("Encountered a duplicate process asset transaction: {0:?}")]
53 EntryErrors(Vec<LogEntryError>),
54}
55
56#[derive(Error, Debug)]
58pub enum LogEntryError {
59 #[error("Encountered a duplicate process asset transaction: {0}")]
60 DuplicateTransaction(AssetPath<'static>),
61 #[error("A transaction was ended that never started {0}")]
62 EndedMissingTransaction(AssetPath<'static>),
63 #[error("An asset started processing but never finished: {0}")]
64 UnfinishedTransaction(AssetPath<'static>),
65}
66
67const LOG_PATH: &str = "imported_assets/log";
68const ENTRY_BEGIN: &str = "Begin ";
69const ENTRY_END: &str = "End ";
70const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
71
72impl ProcessorTransactionLog {
73 fn full_log_path() -> PathBuf {
74 #[cfg(not(target_arch = "wasm32"))]
75 let base_path = crate::io::file::get_base_path();
76 #[cfg(target_arch = "wasm32")]
77 let base_path = PathBuf::new();
78 base_path.join(LOG_PATH)
79 }
80 pub(crate) async fn new() -> Result<Self, futures_io::Error> {
82 let path = Self::full_log_path();
83 match async_fs::remove_file(&path).await {
84 Ok(_) => { }
85 Err(err) => {
86 if err.kind() != futures_io::ErrorKind::NotFound {
88 error!("Failed to remove previous log file {}", err);
89 }
90 }
91 }
92
93 if let Some(parent_folder) = path.parent() {
94 async_fs::create_dir_all(parent_folder).await?;
95 }
96
97 Ok(Self {
98 log_file: File::create(path).await?,
99 })
100 }
101
102 pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
103 let mut log_lines = Vec::new();
104 let mut file = match File::open(Self::full_log_path()).await {
105 Ok(file) => file,
106 Err(err) => {
107 if err.kind() == futures_io::ErrorKind::NotFound {
108 return Ok(log_lines);
110 }
111 return Err(err.into());
112 }
113 };
114 let mut string = String::new();
115 file.read_to_string(&mut string).await?;
116 for line in string.lines() {
117 if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
118 log_lines.push(LogEntry::BeginProcessing(
119 AssetPath::parse(path_str).into_owned(),
120 ));
121 } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
122 log_lines.push(LogEntry::EndProcessing(
123 AssetPath::parse(path_str).into_owned(),
124 ));
125 } else if line.is_empty() {
126 continue;
127 } else {
128 return Err(ReadLogError::InvalidLine(line.to_string()));
129 }
130 }
131 Ok(log_lines)
132 }
133
134 pub(crate) async fn validate() -> Result<(), ValidateLogError> {
135 let mut transactions: HashSet<AssetPath<'static>> = Default::default();
136 let mut errors: Vec<LogEntryError> = Vec::new();
137 let entries = Self::read().await?;
138 for entry in entries {
139 match entry {
140 LogEntry::BeginProcessing(path) => {
141 if !transactions.insert(path.clone()) {
146 errors.push(LogEntryError::DuplicateTransaction(path));
147 }
148 }
149 LogEntry::EndProcessing(path) => {
150 if !transactions.remove(&path) {
151 errors.push(LogEntryError::EndedMissingTransaction(path));
152 }
153 }
154 LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
155 }
156 }
157 for transaction in transactions {
158 errors.push(LogEntryError::UnfinishedTransaction(transaction));
159 }
160 if !errors.is_empty() {
161 return Err(ValidateLogError::EntryErrors(errors));
162 }
163 Ok(())
164 }
165
166 pub(crate) async fn begin_processing(
169 &mut self,
170 path: &AssetPath<'_>,
171 ) -> Result<(), WriteLogError> {
172 self.write(&format!("{ENTRY_BEGIN}{path}\n"))
173 .await
174 .map_err(|e| WriteLogError {
175 log_entry: LogEntry::BeginProcessing(path.clone_owned()),
176 error: e,
177 })
178 }
179
180 pub(crate) async fn end_processing(
182 &mut self,
183 path: &AssetPath<'_>,
184 ) -> Result<(), WriteLogError> {
185 self.write(&format!("{ENTRY_END}{path}\n"))
186 .await
187 .map_err(|e| WriteLogError {
188 log_entry: LogEntry::EndProcessing(path.clone_owned()),
189 error: e,
190 })
191 }
192
193 pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
196 self.write(UNRECOVERABLE_ERROR)
197 .await
198 .map_err(|e| WriteLogError {
199 log_entry: LogEntry::UnrecoverableError,
200 error: e,
201 })
202 }
203
204 async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
205 self.log_file.write_all(line.as_bytes()).await?;
206 self.log_file.flush().await?;
207 Ok(())
208 }
209}