1use std::backtrace::{Backtrace, BacktraceStatus};
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21
22use chrono::{DateTime, TimeZone as _, Utc};
23
24pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum ErrorKind {
31 PreconditionFailed,
33
34 Unexpected,
38
39 DataInvalid,
46
47 NamespaceAlreadyExists,
49
50 TableAlreadyExists,
52
53 NamespaceNotFound,
55
56 TableNotFound,
58
59 FeatureUnsupported,
63
64 CatalogCommitConflicts,
66}
67
68impl ErrorKind {
69 pub fn into_static(self) -> &'static str {
71 self.into()
72 }
73}
74
75impl From<ErrorKind> for &'static str {
76 fn from(v: ErrorKind) -> &'static str {
77 match v {
78 ErrorKind::Unexpected => "Unexpected",
79 ErrorKind::DataInvalid => "DataInvalid",
80 ErrorKind::FeatureUnsupported => "FeatureUnsupported",
81 ErrorKind::TableAlreadyExists => "TableAlreadyExists",
82 ErrorKind::TableNotFound => "TableNotFound",
83 ErrorKind::NamespaceAlreadyExists => "NamespaceAlreadyExists",
84 ErrorKind::NamespaceNotFound => "NamespaceNotFound",
85 ErrorKind::PreconditionFailed => "PreconditionFailed",
86 ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
87 }
88 }
89}
90
91impl Display for ErrorKind {
92 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
93 write!(f, "{}", self.into_static())
94 }
95}
96
97pub struct Error {
134 kind: ErrorKind,
135 message: String,
136
137 context: Vec<(&'static str, String)>,
138
139 source: Option<anyhow::Error>,
140 backtrace: Backtrace,
141
142 retryable: bool,
143}
144
145impl Display for Error {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 write!(f, "{}", self.kind)?;
148
149 if !self.context.is_empty() {
150 write!(f, ", context: {{ ")?;
151 write!(
152 f,
153 "{}",
154 self.context
155 .iter()
156 .map(|(k, v)| format!("{k}: {v}"))
157 .collect::<Vec<_>>()
158 .join(", ")
159 )?;
160 write!(f, " }}")?;
161 }
162
163 if !self.message.is_empty() {
164 write!(f, " => {}", self.message)?;
165 }
166
167 if let Some(source) = &self.source {
168 write!(f, ", source: {source}")?;
169 }
170
171 Ok(())
172 }
173}
174
175impl Debug for Error {
176 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
177 if f.alternate() {
179 let mut de = f.debug_struct("Error");
180 de.field("kind", &self.kind);
181 de.field("message", &self.message);
182 de.field("context", &self.context);
183 de.field("source", &self.source);
184 de.field("backtrace", &self.backtrace);
185 return de.finish();
186 }
187
188 write!(f, "{}", self.kind)?;
189 if !self.message.is_empty() {
190 write!(f, " => {}", self.message)?;
191 }
192 writeln!(f)?;
193
194 if !self.context.is_empty() {
195 writeln!(f)?;
196 writeln!(f, "Context:")?;
197 for (k, v) in self.context.iter() {
198 writeln!(f, " {k}: {v}")?;
199 }
200 }
201 if let Some(source) = &self.source {
202 writeln!(f)?;
203 writeln!(f, "Source: {source:#}")?;
204 }
205
206 if self.backtrace.status() == BacktraceStatus::Captured {
207 writeln!(f)?;
208 writeln!(f, "Backtrace:")?;
209 writeln!(f, "{}", self.backtrace)?;
210 }
211
212 Ok(())
213 }
214}
215
216impl std::error::Error for Error {
217 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218 self.source.as_ref().map(|v| v.as_ref())
219 }
220}
221
222impl Error {
223 pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
225 Self {
226 kind,
227 message: message.into(),
228 context: Vec::default(),
229
230 source: None,
231 backtrace: Backtrace::capture(),
234
235 retryable: false,
236 }
237 }
238
239 pub fn with_retryable(mut self, retryable: bool) -> Self {
241 self.retryable = retryable;
242 self
243 }
244
245 pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
247 self.context.push((key, value.into()));
248 self
249 }
250
251 pub fn with_source(mut self, src: impl Into<anyhow::Error>) -> Self {
257 debug_assert!(self.source.is_none(), "the source error has been set");
258
259 self.source = Some(src.into());
260 self
261 }
262
263 #[cfg(test)]
268 fn with_backtrace(mut self, backtrace: Backtrace) -> Self {
269 self.backtrace = backtrace;
270 self
271 }
272
273 pub fn backtrace(&self) -> &Backtrace {
313 &self.backtrace
314 }
315
316 pub fn kind(&self) -> ErrorKind {
320 self.kind
321 }
322
323 pub fn retryable(&self) -> bool {
325 self.retryable
326 }
327
328 #[inline]
330 pub fn message(&self) -> &str {
331 self.message.as_str()
332 }
333}
334
335macro_rules! define_from_err {
336 ($source: path, $error_kind: path, $msg: expr) => {
337 impl std::convert::From<$source> for crate::error::Error {
338 fn from(v: $source) -> Self {
339 Self::new($error_kind, $msg).with_source(v)
340 }
341 }
342 };
343}
344
345define_from_err!(
346 std::str::Utf8Error,
347 ErrorKind::Unexpected,
348 "handling invalid utf-8 characters"
349);
350
351define_from_err!(
352 core::num::ParseIntError,
353 ErrorKind::Unexpected,
354 "parsing integer from string"
355);
356
357define_from_err!(
358 std::array::TryFromSliceError,
359 ErrorKind::DataInvalid,
360 "failed to convert byte slice to array"
361);
362
363define_from_err!(
364 std::num::TryFromIntError,
365 ErrorKind::DataInvalid,
366 "failed to convert integer"
367);
368
369define_from_err!(
370 chrono::ParseError,
371 ErrorKind::DataInvalid,
372 "Failed to parse string to date or time"
373);
374
375define_from_err!(
376 uuid::Error,
377 ErrorKind::DataInvalid,
378 "Failed to convert between uuid und iceberg value"
379);
380
381define_from_err!(
382 apache_avro::Error,
383 ErrorKind::DataInvalid,
384 "Failure in conversion with avro"
385);
386
387define_from_err!(
388 opendal::Error,
389 ErrorKind::Unexpected,
390 "Failure in doing io operation"
391);
392
393define_from_err!(
394 url::ParseError,
395 ErrorKind::DataInvalid,
396 "Failed to parse url"
397);
398
399define_from_err!(
400 reqwest::Error,
401 ErrorKind::Unexpected,
402 "Failed to execute http request"
403);
404
405define_from_err!(
406 serde_json::Error,
407 ErrorKind::DataInvalid,
408 "Failed to parse json string"
409);
410
411define_from_err!(
412 rust_decimal::Error,
413 ErrorKind::DataInvalid,
414 "Failed to convert decimal literal to rust decimal"
415);
416
417define_from_err!(
418 parquet::errors::ParquetError,
419 ErrorKind::Unexpected,
420 "Failed to read a Parquet file"
421);
422
423define_from_err!(
424 futures::channel::mpsc::SendError,
425 ErrorKind::Unexpected,
426 "Failed to send a message to a channel"
427);
428
429define_from_err!(
430 arrow_schema::ArrowError,
431 ErrorKind::Unexpected,
432 "Arrow Schema Error"
433);
434
435define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
436
437pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
448 match Utc.timestamp_millis_opt(timestamp_ms) {
449 chrono::LocalResult::Single(t) => Ok(t),
450 chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
451 ErrorKind::Unexpected,
452 "Ambiguous timestamp with two possible results",
453 )),
454 chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, "Invalid timestamp")),
455 }
456 .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
457}
458
459#[macro_export]
470macro_rules! ensure_data_valid {
471 ($cond: expr, $fmt: literal, $($arg:tt)*) => {
472 if !$cond {
473 return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
474 }
475 };
476}
477
478#[cfg(test)]
479mod tests {
480 use anyhow::anyhow;
481 use pretty_assertions::assert_eq;
482
483 use super::*;
484
485 fn generate_error_with_backtrace_disabled() -> Error {
486 Error::new(
487 ErrorKind::Unexpected,
488 "something wrong happened".to_string(),
489 )
490 .with_context("path", "/path/to/file".to_string())
491 .with_context("called", "send_async".to_string())
492 .with_source(anyhow!("networking error"))
493 .with_backtrace(Backtrace::disabled())
494 }
495
496 fn generate_error_with_backtrace_enabled() -> Error {
497 Error::new(
498 ErrorKind::Unexpected,
499 "something wrong happened".to_string(),
500 )
501 .with_context("path", "/path/to/file".to_string())
502 .with_context("called", "send_async".to_string())
503 .with_source(anyhow!("networking error"))
504 .with_backtrace(Backtrace::force_capture())
505 }
506
507 #[test]
508 fn test_error_display_without_backtrace() {
509 let s = format!("{}", generate_error_with_backtrace_disabled());
510 assert_eq!(
511 s,
512 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
513 )
514 }
515
516 #[test]
517 fn test_error_display_with_backtrace() {
518 let s = format!("{}", generate_error_with_backtrace_enabled());
519 assert_eq!(
520 s,
521 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
522 )
523 }
524
525 #[test]
526 fn test_error_debug_without_backtrace() {
527 let s = format!("{:?}", generate_error_with_backtrace_disabled());
528 assert_eq!(
529 s,
530 r#"Unexpected => something wrong happened
531
532Context:
533 path: /path/to/file
534 called: send_async
535
536Source: networking error
537"#
538 )
539 }
540
541 #[test]
543 fn test_error_debug_with_backtrace() {
544 let s = format!("{:?}", generate_error_with_backtrace_enabled());
545
546 let expected = r#"Unexpected => something wrong happened
547
548Context:
549 path: /path/to/file
550 called: send_async
551
552Source: networking error
553
554Backtrace:
555 0:"#;
556 assert_eq!(&s[..expected.len()], expected,);
557 }
558}