1use std::backtrace::{Backtrace, BacktraceStatus};
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21
22use chrono::{DateTime, TimeZone as _, Utc};
23
24pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum ErrorKind {
31 PreconditionFailed,
33
34 Unexpected,
38
39 DataInvalid,
46
47 NamespaceAlreadyExists,
49
50 TableAlreadyExists,
52
53 NamespaceNotFound,
55
56 TableNotFound,
58
59 FeatureUnsupported,
63
64 CatalogCommitConflicts,
66}
67
68impl ErrorKind {
69 pub fn into_static(self) -> &'static str {
71 self.into()
72 }
73}
74
75impl From<ErrorKind> for &'static str {
76 fn from(v: ErrorKind) -> &'static str {
77 match v {
78 ErrorKind::Unexpected => "Unexpected",
79 ErrorKind::DataInvalid => "DataInvalid",
80 ErrorKind::FeatureUnsupported => "FeatureUnsupported",
81 ErrorKind::TableAlreadyExists => "TableAlreadyExists",
82 ErrorKind::TableNotFound => "TableNotFound",
83 ErrorKind::NamespaceAlreadyExists => "NamespaceAlreadyExists",
84 ErrorKind::NamespaceNotFound => "NamespaceNotFound",
85 ErrorKind::PreconditionFailed => "PreconditionFailed",
86 ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts",
87 }
88 }
89}
90
91impl Display for ErrorKind {
92 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
93 write!(f, "{}", self.into_static())
94 }
95}
96
97pub struct Error {
134 kind: ErrorKind,
135 message: String,
136
137 context: Vec<(&'static str, String)>,
138
139 source: Option<anyhow::Error>,
140 backtrace: Backtrace,
141
142 retryable: bool,
143}
144
145impl Display for Error {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 write!(f, "{}", self.kind)?;
148
149 if !self.context.is_empty() {
150 write!(f, ", context: {{ ")?;
151 write!(
152 f,
153 "{}",
154 self.context
155 .iter()
156 .map(|(k, v)| format!("{k}: {v}"))
157 .collect::<Vec<_>>()
158 .join(", ")
159 )?;
160 write!(f, " }}")?;
161 }
162
163 if !self.message.is_empty() {
164 write!(f, " => {}", self.message)?;
165 }
166
167 if let Some(source) = &self.source {
168 write!(f, ", source: {source}")?;
169 }
170
171 Ok(())
172 }
173}
174
175impl Debug for Error {
176 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
177 if f.alternate() {
179 let mut de = f.debug_struct("Error");
180 de.field("kind", &self.kind);
181 de.field("message", &self.message);
182 de.field("context", &self.context);
183 de.field("source", &self.source);
184 de.field("backtrace", &self.backtrace);
185 return de.finish();
186 }
187
188 write!(f, "{}", self.kind)?;
189 if !self.message.is_empty() {
190 write!(f, " => {}", self.message)?;
191 }
192 writeln!(f)?;
193
194 if !self.context.is_empty() {
195 writeln!(f)?;
196 writeln!(f, "Context:")?;
197 for (k, v) in self.context.iter() {
198 writeln!(f, " {k}: {v}")?;
199 }
200 }
201 if let Some(source) = &self.source {
202 writeln!(f)?;
203 writeln!(f, "Source: {source:#}")?;
204 }
205
206 if self.backtrace.status() == BacktraceStatus::Captured {
207 writeln!(f)?;
208 writeln!(f, "Backtrace:")?;
209 writeln!(f, "{}", self.backtrace)?;
210 }
211
212 Ok(())
213 }
214}
215
216impl std::error::Error for Error {
217 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218 self.source.as_ref().map(|v| v.as_ref())
219 }
220}
221
222impl Error {
223 pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
225 Self {
226 kind,
227 message: message.into(),
228 context: Vec::default(),
229
230 source: None,
231 backtrace: Backtrace::capture(),
234
235 retryable: false,
236 }
237 }
238
239 pub fn with_retryable(mut self, retryable: bool) -> Self {
241 self.retryable = retryable;
242 self
243 }
244
245 pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
247 self.context.push((key, value.into()));
248 self
249 }
250
251 pub fn with_source(mut self, src: impl Into<anyhow::Error>) -> Self {
257 debug_assert!(self.source.is_none(), "the source error has been set");
258
259 self.source = Some(src.into());
260 self
261 }
262
263 #[cfg(test)]
268 fn with_backtrace(mut self, backtrace: Backtrace) -> Self {
269 self.backtrace = backtrace;
270 self
271 }
272
273 pub fn backtrace(&self) -> &Backtrace {
313 &self.backtrace
314 }
315
316 pub fn kind(&self) -> ErrorKind {
320 self.kind
321 }
322
323 pub fn retryable(&self) -> bool {
325 self.retryable
326 }
327
328 #[inline]
330 pub fn message(&self) -> &str {
331 self.message.as_str()
332 }
333}
334
335macro_rules! define_from_err {
336 ($source: path, $error_kind: path, $msg: expr) => {
337 impl std::convert::From<$source> for crate::error::Error {
338 fn from(v: $source) -> Self {
339 Self::new($error_kind, $msg).with_source(v)
340 }
341 }
342 };
343}
344
345define_from_err!(
346 std::str::Utf8Error,
347 ErrorKind::Unexpected,
348 "handling invalid utf-8 characters"
349);
350
351define_from_err!(
352 core::num::ParseIntError,
353 ErrorKind::Unexpected,
354 "parsing integer from string"
355);
356
357define_from_err!(
358 std::array::TryFromSliceError,
359 ErrorKind::DataInvalid,
360 "failed to convert byte slice to array"
361);
362
363define_from_err!(
364 std::num::TryFromIntError,
365 ErrorKind::DataInvalid,
366 "failed to convert integer"
367);
368
369define_from_err!(
370 chrono::ParseError,
371 ErrorKind::DataInvalid,
372 "Failed to parse string to date or time"
373);
374
375define_from_err!(
376 uuid::Error,
377 ErrorKind::DataInvalid,
378 "Failed to convert between uuid und iceberg value"
379);
380
381define_from_err!(
382 apache_avro::Error,
383 ErrorKind::DataInvalid,
384 "Failure in conversion with avro"
385);
386
387define_from_err!(
388 url::ParseError,
389 ErrorKind::DataInvalid,
390 "Failed to parse url"
391);
392
393define_from_err!(
394 reqwest::Error,
395 ErrorKind::Unexpected,
396 "Failed to execute http request"
397);
398
399define_from_err!(
400 serde_json::Error,
401 ErrorKind::DataInvalid,
402 "Failed to parse json string"
403);
404
405define_from_err!(
406 parquet::errors::ParquetError,
407 ErrorKind::Unexpected,
408 "Failed to read a Parquet file"
409);
410
411define_from_err!(
412 futures::channel::mpsc::SendError,
413 ErrorKind::Unexpected,
414 "Failed to send a message to a channel"
415);
416
417define_from_err!(
418 arrow_schema::ArrowError,
419 ErrorKind::Unexpected,
420 "Arrow Schema Error"
421);
422
423define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
424
425pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
436 match Utc.timestamp_millis_opt(timestamp_ms) {
437 chrono::LocalResult::Single(t) => Ok(t),
438 chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
439 ErrorKind::Unexpected,
440 "Ambiguous timestamp with two possible results",
441 )),
442 chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, "Invalid timestamp")),
443 }
444 .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
445}
446
447#[macro_export]
458macro_rules! ensure_data_valid {
459 ($cond: expr, $fmt: literal, $($arg:tt)*) => {
460 if !$cond {
461 return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
462 }
463 };
464}
465
466#[cfg(test)]
467mod tests {
468 use anyhow::anyhow;
469 use pretty_assertions::assert_eq;
470
471 use super::*;
472
473 fn generate_error_with_backtrace_disabled() -> Error {
474 Error::new(
475 ErrorKind::Unexpected,
476 "something wrong happened".to_string(),
477 )
478 .with_context("path", "/path/to/file".to_string())
479 .with_context("called", "send_async".to_string())
480 .with_source(anyhow!("networking error"))
481 .with_backtrace(Backtrace::disabled())
482 }
483
484 fn generate_error_with_backtrace_enabled() -> Error {
485 Error::new(
486 ErrorKind::Unexpected,
487 "something wrong happened".to_string(),
488 )
489 .with_context("path", "/path/to/file".to_string())
490 .with_context("called", "send_async".to_string())
491 .with_source(anyhow!("networking error"))
492 .with_backtrace(Backtrace::force_capture())
493 }
494
495 #[test]
496 fn test_error_display_without_backtrace() {
497 let s = format!("{}", generate_error_with_backtrace_disabled());
498 assert_eq!(
499 s,
500 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
501 )
502 }
503
504 #[test]
505 fn test_error_display_with_backtrace() {
506 let s = format!("{}", generate_error_with_backtrace_enabled());
507 assert_eq!(
508 s,
509 r#"Unexpected, context: { path: /path/to/file, called: send_async } => something wrong happened, source: networking error"#
510 )
511 }
512
513 #[test]
514 fn test_error_debug_without_backtrace() {
515 let s = format!("{:?}", generate_error_with_backtrace_disabled());
516 assert_eq!(
517 s,
518 r#"Unexpected => something wrong happened
519
520Context:
521 path: /path/to/file
522 called: send_async
523
524Source: networking error
525"#
526 )
527 }
528
529 #[test]
531 fn test_error_debug_with_backtrace() {
532 let s = format!("{:?}", generate_error_with_backtrace_enabled());
533
534 let expected = r#"Unexpected => something wrong happened
535
536Context:
537 path: /path/to/file
538 called: send_async
539
540Source: networking error
541
542Backtrace:
543 0:"#;
544 assert_eq!(&s[..expected.len()], expected,);
545 }
546}