1use std::backtrace::{Backtrace, BacktraceStatus};
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21use std::sync::PoisonError;
22
23use chrono::{DateTime, TimeZone as _, Utc};
24
25pub type Result<T> = std::result::Result<T, Error>;
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum ErrorKind {
32 PreconditionFailed,
34
35 Unexpected,
39
40 DataInvalid,
47
48 NamespaceAlreadyExists,
50
51 TableAlreadyExists,
53
54 NamespaceNotFound,
56
57 TableNotFound,
59
60 FeatureUnsupported,
64
65 CatalogCommitConflicts,
67}
68
69impl ErrorKind {
70 pub fn into_static(self) -> &'static str {
72 self.into()
73 }
74}
75
76impl From<ErrorKind> for &'static str {
77 fn from(v: ErrorKind) -> &'static str {
78 match v {
79 ErrorKind::Unexpected => "Unexpected",
80 ErrorKind::DataInvalid => "DataInvalid",
81 ErrorKind::FeatureUnsupported => "FeatureUnsupported",
82 ErrorKind::TableAlreadyExists => "TableAlreadyExists",
83 ErrorKind::TableNotFound => "TableNotFound",
84 ErrorKind::NamespaceAlreadyExists => "NamespaceAlreadyExists",
85 ErrorKind::NamespaceNotFound => "NamespaceNotFound",
86 ErrorKind::PreconditionFailed => "PreconditionFailed",
87 ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
88 }
89 }
90}
91
92impl Display for ErrorKind {
93 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
94 write!(f, "{}", self.into_static())
95 }
96}
97
98pub struct Error {
135 kind: ErrorKind,
136 message: String,
137
138 context: Vec<(&'static str, String)>,
139
140 source: Option<anyhow::Error>,
141 backtrace: Backtrace,
142
143 retryable: bool,
144}
145
146impl Display for Error {
147 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
148 write!(f, "{}", self.kind)?;
149
150 if !self.context.is_empty() {
151 write!(f, ", context: {{ ")?;
152 write!(
153 f,
154 "{}",
155 self.context
156 .iter()
157 .map(|(k, v)| format!("{k}: {v}"))
158 .collect::<Vec<_>>()
159 .join(", ")
160 )?;
161 write!(f, " }}")?;
162 }
163
164 if !self.message.is_empty() {
165 write!(f, " => {}", self.message)?;
166 }
167
168 if let Some(source) = &self.source {
169 write!(f, ", source: {source}")?;
170 }
171
172 Ok(())
173 }
174}
175
176impl Debug for Error {
177 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
178 if f.alternate() {
180 let mut de = f.debug_struct("Error");
181 de.field("kind", &self.kind);
182 de.field("message", &self.message);
183 de.field("context", &self.context);
184 de.field("source", &self.source);
185 de.field("backtrace", &self.backtrace);
186 return de.finish();
187 }
188
189 write!(f, "{}", self.kind)?;
190 if !self.message.is_empty() {
191 write!(f, " => {}", self.message)?;
192 }
193 writeln!(f)?;
194
195 if !self.context.is_empty() {
196 writeln!(f)?;
197 writeln!(f, "Context:")?;
198 for (k, v) in self.context.iter() {
199 writeln!(f, " {k}: {v}")?;
200 }
201 }
202 if let Some(source) = &self.source {
203 writeln!(f)?;
204 writeln!(f, "Source: {source:#}")?;
205 }
206
207 if self.backtrace.status() == BacktraceStatus::Captured {
208 writeln!(f)?;
209 writeln!(f, "Backtrace:")?;
210 writeln!(f, "{}", self.backtrace)?;
211 }
212
213 Ok(())
214 }
215}
216
217impl std::error::Error for Error {
218 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
219 self.source.as_ref().map(|v| v.as_ref())
220 }
221}
222
223impl Error {
224 pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
226 Self {
227 kind,
228 message: message.into(),
229 context: Vec::default(),
230
231 source: None,
232 backtrace: Backtrace::capture(),
235
236 retryable: false,
237 }
238 }
239
240 pub fn with_retryable(mut self, retryable: bool) -> Self {
242 self.retryable = retryable;
243 self
244 }
245
246 pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
248 self.context.push((key, value.into()));
249 self
250 }
251
252 pub fn with_source(mut self, src: impl Into<anyhow::Error>) -> Self {
258 debug_assert!(self.source.is_none(), "the source error has been set");
259
260 self.source = Some(src.into());
261 self
262 }
263
264 #[cfg(test)]
269 fn with_backtrace(mut self, backtrace: Backtrace) -> Self {
270 self.backtrace = backtrace;
271 self
272 }
273
274 pub fn backtrace(&self) -> &Backtrace {
314 &self.backtrace
315 }
316
317 pub fn kind(&self) -> ErrorKind {
321 self.kind
322 }
323
324 pub fn retryable(&self) -> bool {
326 self.retryable
327 }
328
329 #[inline]
331 pub fn message(&self) -> &str {
332 self.message.as_str()
333 }
334}
335
336macro_rules! define_from_err {
337 ($source: path, $error_kind: path, $msg: expr) => {
338 impl std::convert::From<$source> for crate::error::Error {
339 fn from(v: $source) -> Self {
340 Self::new($error_kind, $msg).with_source(v)
341 }
342 }
343 };
344}
345
346define_from_err!(
347 std::str::Utf8Error,
348 ErrorKind::Unexpected,
349 "handling invalid utf-8 characters"
350);
351
352define_from_err!(
353 core::num::ParseIntError,
354 ErrorKind::Unexpected,
355 "parsing integer from string"
356);
357
358define_from_err!(
359 std::array::TryFromSliceError,
360 ErrorKind::DataInvalid,
361 "failed to convert byte slice to array"
362);
363
364define_from_err!(
365 std::num::TryFromIntError,
366 ErrorKind::DataInvalid,
367 "failed to convert integer"
368);
369
370define_from_err!(
371 chrono::ParseError,
372 ErrorKind::DataInvalid,
373 "Failed to parse string to date or time"
374);
375
376define_from_err!(
377 uuid::Error,
378 ErrorKind::DataInvalid,
379 "Failed to convert between uuid und iceberg value"
380);
381
382define_from_err!(
383 apache_avro::Error,
384 ErrorKind::DataInvalid,
385 "Failure in conversion with avro"
386);
387
388define_from_err!(
389 url::ParseError,
390 ErrorKind::DataInvalid,
391 "Failed to parse url"
392);
393
394define_from_err!(
395 reqwest::Error,
396 ErrorKind::Unexpected,
397 "Failed to execute http request"
398);
399
400define_from_err!(
401 serde_json::Error,
402 ErrorKind::DataInvalid,
403 "Failed to parse json string"
404);
405
406define_from_err!(
407 parquet::errors::ParquetError,
408 ErrorKind::Unexpected,
409 "Failed to read a Parquet file"
410);
411
412define_from_err!(
413 futures::channel::mpsc::SendError,
414 ErrorKind::Unexpected,
415 "Failed to send a message to a channel"
416);
417
418define_from_err!(
419 arrow_schema::ArrowError,
420 ErrorKind::Unexpected,
421 "Arrow Schema Error"
422);
423
424define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
425
426pub(crate) fn lock_error<T>(e: PoisonError<T>) -> Error {
428 Error::new(ErrorKind::Unexpected, format!("Lock poisoned: {e}"))
429}
430
431pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
442 match Utc.timestamp_millis_opt(timestamp_ms) {
443 chrono::LocalResult::Single(t) => Ok(t),
444 chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
445 ErrorKind::Unexpected,
446 "Ambiguous timestamp with two possible results",
447 )),
448 chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, "Invalid timestamp")),
449 }
450 .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
451}
452
453#[macro_export]
464macro_rules! ensure_data_valid {
465 ($cond: expr, $fmt: literal, $($arg:tt)*) => {
466 if !$cond {
467 return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
468 }
469 };
470}
471
472#[cfg(test)]
473mod tests {
474 use anyhow::anyhow;
475 use pretty_assertions::assert_eq;
476
477 use super::*;
478
479 fn generate_error_with_backtrace_disabled() -> Error {
480 Error::new(
481 ErrorKind::Unexpected,
482 "something wrong happened".to_string(),
483 )
484 .with_context("path", "/path/to/file".to_string())
485 .with_context("called", "send_async".to_string())
486 .with_source(anyhow!("networking error"))
487 .with_backtrace(Backtrace::disabled())
488 }
489
490 fn generate_error_with_backtrace_enabled() -> Error {
491 Error::new(
492 ErrorKind::Unexpected,
493 "something wrong happened".to_string(),
494 )
495 .with_context("path", "/path/to/file".to_string())
496 .with_context("called", "send_async".to_string())
497 .with_source(anyhow!("networking error"))
498 .with_backtrace(Backtrace::force_capture())
499 }
500
501 #[test]
502 fn test_error_display_without_backtrace() {
503 let s = format!("{}", generate_error_with_backtrace_disabled());
504 assert_eq!(
505 s,
506 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
507 )
508 }
509
510 #[test]
511 fn test_error_display_with_backtrace() {
512 let s = format!("{}", generate_error_with_backtrace_enabled());
513 assert_eq!(
514 s,
515 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
516 )
517 }
518
519 #[test]
520 fn test_error_debug_without_backtrace() {
521 let s = format!("{:?}", generate_error_with_backtrace_disabled());
522 assert_eq!(
523 s,
524 r#"Unexpected => something wrong happened
525
526Context:
527 path: /path/to/file
528 called: send_async
529
530Source: networking error
531"#
532 )
533 }
534
535 #[test]
537 fn test_error_debug_with_backtrace() {
538 let s = format!("{:?}", generate_error_with_backtrace_enabled());
539
540 let expected = r#"Unexpected => something wrong happened
541
542Context:
543 path: /path/to/file
544 called: send_async
545
546Source: networking error
547
548Backtrace:
549 0:"#;
550 assert_eq!(&s[..expected.len()], expected,);
551 }
552}