1use std::backtrace::{Backtrace, BacktraceStatus};
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21
22use chrono::{DateTime, TimeZone as _, Utc};
23
24pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum ErrorKind {
31 PreconditionFailed,
33
34 Unexpected,
38
39 DataInvalid,
46
47 NamespaceAlreadyExists,
49
50 TableAlreadyExists,
52
53 NamespaceNotFound,
55
56 TableNotFound,
58
59 FeatureUnsupported,
63
64 CatalogCommitConflicts,
66}
67
68impl ErrorKind {
69 pub fn into_static(self) -> &'static str {
71 self.into()
72 }
73}
74
75impl From<ErrorKind> for &'static str {
76 fn from(v: ErrorKind) -> &'static str {
77 match v {
78 ErrorKind::Unexpected => "Unexpected",
79 ErrorKind::DataInvalid => "DataInvalid",
80 ErrorKind::FeatureUnsupported => "FeatureUnsupported",
81 ErrorKind::TableAlreadyExists => "TableAlreadyExists",
82 ErrorKind::TableNotFound => "TableNotFound",
83 ErrorKind::NamespaceAlreadyExists => "NamespaceAlreadyExists",
84 ErrorKind::NamespaceNotFound => "NamespaceNotFound",
85 ErrorKind::PreconditionFailed => "PreconditionFailed",
86 ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
87 }
88 }
89}
90
91impl Display for ErrorKind {
92 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
93 write!(f, "{}", self.into_static())
94 }
95}
96
97pub struct Error {
134 kind: ErrorKind,
135 message: String,
136
137 context: Vec<(&'static str, String)>,
138
139 source: Option<anyhow::Error>,
140 backtrace: Backtrace,
141
142 retryable: bool,
143}
144
145impl Display for Error {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 write!(f, "{}", self.kind)?;
148
149 if !self.context.is_empty() {
150 write!(f, ", context: {{ ")?;
151 write!(
152 f,
153 "{}",
154 self.context
155 .iter()
156 .map(|(k, v)| format!("{k}: {v}"))
157 .collect::<Vec<_>>()
158 .join(", ")
159 )?;
160 write!(f, " }}")?;
161 }
162
163 if !self.message.is_empty() {
164 write!(f, " => {}", self.message)?;
165 }
166
167 if let Some(source) = &self.source {
168 write!(f, ", source: {source}")?;
169 }
170
171 Ok(())
172 }
173}
174
175impl Debug for Error {
176 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
177 if f.alternate() {
179 let mut de = f.debug_struct("Error");
180 de.field("kind", &self.kind);
181 de.field("message", &self.message);
182 de.field("context", &self.context);
183 de.field("source", &self.source);
184 de.field("backtrace", &self.backtrace);
185 return de.finish();
186 }
187
188 write!(f, "{}", self.kind)?;
189 if !self.message.is_empty() {
190 write!(f, " => {}", self.message)?;
191 }
192 writeln!(f)?;
193
194 if !self.context.is_empty() {
195 writeln!(f)?;
196 writeln!(f, "Context:")?;
197 for (k, v) in self.context.iter() {
198 writeln!(f, " {k}: {v}")?;
199 }
200 }
201 if let Some(source) = &self.source {
202 writeln!(f)?;
203 writeln!(f, "Source: {source:#}")?;
204 }
205
206 if self.backtrace.status() == BacktraceStatus::Captured {
207 writeln!(f)?;
208 writeln!(f, "Backtrace:")?;
209 writeln!(f, "{}", self.backtrace)?;
210 }
211
212 Ok(())
213 }
214}
215
216impl std::error::Error for Error {
217 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218 self.source.as_ref().map(|v| v.as_ref())
219 }
220}
221
222impl Error {
223 pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
225 Self {
226 kind,
227 message: message.into(),
228 context: Vec::default(),
229
230 source: None,
231 backtrace: Backtrace::capture(),
234
235 retryable: false,
236 }
237 }
238
239 pub fn with_retryable(mut self, retryable: bool) -> Self {
241 self.retryable = retryable;
242 self
243 }
244
245 pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
247 self.context.push((key, value.into()));
248 self
249 }
250
251 pub fn with_source(mut self, src: impl Into<anyhow::Error>) -> Self {
257 debug_assert!(self.source.is_none(), "the source error has been set");
258
259 self.source = Some(src.into());
260 self
261 }
262
263 #[cfg(test)]
268 fn with_backtrace(mut self, backtrace: Backtrace) -> Self {
269 self.backtrace = backtrace;
270 self
271 }
272
273 pub fn backtrace(&self) -> &Backtrace {
313 &self.backtrace
314 }
315
316 pub fn kind(&self) -> ErrorKind {
320 self.kind
321 }
322
323 pub fn retryable(&self) -> bool {
325 self.retryable
326 }
327
328 #[inline]
330 pub fn message(&self) -> &str {
331 self.message.as_str()
332 }
333}
334
335macro_rules! define_from_err {
336 ($source: path, $error_kind: path, $msg: expr) => {
337 impl std::convert::From<$source> for crate::error::Error {
338 fn from(v: $source) -> Self {
339 Self::new($error_kind, $msg).with_source(v)
340 }
341 }
342 };
343}
344
345define_from_err!(
346 std::str::Utf8Error,
347 ErrorKind::Unexpected,
348 "handling invalid utf-8 characters"
349);
350
351define_from_err!(
352 core::num::ParseIntError,
353 ErrorKind::Unexpected,
354 "parsing integer from string"
355);
356
357define_from_err!(
358 std::array::TryFromSliceError,
359 ErrorKind::DataInvalid,
360 "failed to convert byte slice to array"
361);
362
363define_from_err!(
364 std::num::TryFromIntError,
365 ErrorKind::DataInvalid,
366 "failed to convert integer"
367);
368
369define_from_err!(
370 chrono::ParseError,
371 ErrorKind::DataInvalid,
372 "Failed to parse string to date or time"
373);
374
375define_from_err!(
376 uuid::Error,
377 ErrorKind::DataInvalid,
378 "Failed to convert between uuid und iceberg value"
379);
380
381define_from_err!(
382 apache_avro::Error,
383 ErrorKind::DataInvalid,
384 "Failure in conversion with avro"
385);
386
387define_from_err!(
388 opendal::Error,
389 ErrorKind::Unexpected,
390 "Failure in doing io operation"
391);
392
393define_from_err!(
394 url::ParseError,
395 ErrorKind::DataInvalid,
396 "Failed to parse url"
397);
398
399define_from_err!(
400 reqwest::Error,
401 ErrorKind::Unexpected,
402 "Failed to execute http request"
403);
404
405define_from_err!(
406 serde_json::Error,
407 ErrorKind::DataInvalid,
408 "Failed to parse json string"
409);
410
411define_from_err!(
412 parquet::errors::ParquetError,
413 ErrorKind::Unexpected,
414 "Failed to read a Parquet file"
415);
416
417define_from_err!(
418 futures::channel::mpsc::SendError,
419 ErrorKind::Unexpected,
420 "Failed to send a message to a channel"
421);
422
423define_from_err!(
424 arrow_schema::ArrowError,
425 ErrorKind::Unexpected,
426 "Arrow Schema Error"
427);
428
429define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
430
431pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
442 match Utc.timestamp_millis_opt(timestamp_ms) {
443 chrono::LocalResult::Single(t) => Ok(t),
444 chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
445 ErrorKind::Unexpected,
446 "Ambiguous timestamp with two possible results",
447 )),
448 chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, "Invalid timestamp")),
449 }
450 .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
451}
452
453#[macro_export]
464macro_rules! ensure_data_valid {
465 ($cond: expr, $fmt: literal, $($arg:tt)*) => {
466 if !$cond {
467 return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
468 }
469 };
470}
471
472#[cfg(test)]
473mod tests {
474 use anyhow::anyhow;
475 use pretty_assertions::assert_eq;
476
477 use super::*;
478
479 fn generate_error_with_backtrace_disabled() -> Error {
480 Error::new(
481 ErrorKind::Unexpected,
482 "something wrong happened".to_string(),
483 )
484 .with_context("path", "/path/to/file".to_string())
485 .with_context("called", "send_async".to_string())
486 .with_source(anyhow!("networking error"))
487 .with_backtrace(Backtrace::disabled())
488 }
489
490 fn generate_error_with_backtrace_enabled() -> Error {
491 Error::new(
492 ErrorKind::Unexpected,
493 "something wrong happened".to_string(),
494 )
495 .with_context("path", "/path/to/file".to_string())
496 .with_context("called", "send_async".to_string())
497 .with_source(anyhow!("networking error"))
498 .with_backtrace(Backtrace::force_capture())
499 }
500
501 #[test]
502 fn test_error_display_without_backtrace() {
503 let s = format!("{}", generate_error_with_backtrace_disabled());
504 assert_eq!(
505 s,
506 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
507 )
508 }
509
510 #[test]
511 fn test_error_display_with_backtrace() {
512 let s = format!("{}", generate_error_with_backtrace_enabled());
513 assert_eq!(
514 s,
515 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
516 )
517 }
518
519 #[test]
520 fn test_error_debug_without_backtrace() {
521 let s = format!("{:?}", generate_error_with_backtrace_disabled());
522 assert_eq!(
523 s,
524 r#"Unexpected => something wrong happened
525
526Context:
527 path: /path/to/file
528 called: send_async
529
530Source: networking error
531"#
532 )
533 }
534
535 #[test]
537 fn test_error_debug_with_backtrace() {
538 let s = format!("{:?}", generate_error_with_backtrace_enabled());
539
540 let expected = r#"Unexpected => something wrong happened
541
542Context:
543 path: /path/to/file
544 called: send_async
545
546Source: networking error
547
548Backtrace:
549 0:"#;
550 assert_eq!(&s[..expected.len()], expected,);
551 }
552}