iceberg/runtime/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This module contains the async runtime abstraction for iceberg.
19
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use tokio::task;
26
27use crate::{Error, ErrorKind, Result};
28
29/// Wrapper around tokio's `JoinHandle` that converts task failures into
30/// [`iceberg::Error`].
31///
32/// Tokio's `JoinHandle<T>` resolves to `Result<T, JoinError>`, where a
33/// `JoinError` means the task either panicked or was cancelled (typically from
34/// runtime shutdown or `abort`). Both are surfaced here as
35/// `ErrorKind::Unexpected` with the original `JoinError` preserved as the
36/// source.
37pub struct JoinHandle<T>(task::JoinHandle<T>);
38
39impl<T> Unpin for JoinHandle<T> {}
40
41impl<T: Send + 'static> Future for JoinHandle<T> {
42    type Output = crate::Result<T>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        Pin::new(&mut self.get_mut().0).poll(cx).map(|r| {
46            r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task failed").with_source(e))
47        })
48    }
49}
50
51/// Handle to a single tokio runtime.
52///
53/// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is
54/// responsible for keeping the underlying runtime alive while this handle is
55/// in use; spawning on a shut-down runtime will surface as a `JoinError` via
56/// [`JoinHandle`].
57#[derive(Clone)]
58pub struct RuntimeHandle {
59    handle: tokio::runtime::Handle,
60}
61
62impl fmt::Debug for RuntimeHandle {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("RuntimeHandle").finish()
65    }
66}
67
68impl RuntimeHandle {
69    fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
70        Self { handle }
71    }
72
73    /// Spawn an async task.
74    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
75    where
76        F: Future + Send + 'static,
77        F::Output: Send + 'static,
78    {
79        JoinHandle(self.handle.spawn(future))
80    }
81
82    /// Spawn a blocking task.
83    pub fn spawn_blocking<F, T>(&self, f: F) -> JoinHandle<T>
84    where
85        F: FnOnce() -> T + Send + 'static,
86        T: Send + 'static,
87    {
88        JoinHandle(self.handle.spawn_blocking(f))
89    }
90}
91
92/// Iceberg's runtime abstraction.
93///
94/// Contains separate handles for IO-bound and CPU-bound work. When constructed
95/// with a single tokio runtime, both `io()` and `cpu()` route to the same one.
96/// Use [`Runtime::new_with_split`] to provide dedicated runtimes for each
97/// category.
98///
99/// # Lifetime
100///
101/// A `Runtime` stores only `tokio::runtime::Handle`s (weak references). The
102/// caller owns the tokio runtime's lifetime. If the underlying runtime is
103/// dropped while iceberg is still using it, subsequent spawns will surface as
104/// task cancellation errors via [`JoinHandle`].
105///
106/// Cloning is cheap.
107#[derive(Clone)]
108pub struct Runtime {
109    io: RuntimeHandle,
110    cpu: RuntimeHandle,
111}
112
113impl fmt::Debug for Runtime {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_struct("Runtime").finish()
116    }
117}
118
119impl Runtime {
120    /// Create a Runtime backed by a single tokio runtime for all work.
121    pub fn new(runtime: &tokio::runtime::Runtime) -> Self {
122        let handle = RuntimeHandle::from_tokio_handle(runtime.handle().clone());
123        Self {
124            io: handle.clone(),
125            cpu: handle,
126        }
127    }
128
129    /// Create a Runtime with separate tokio runtimes for IO and CPU work.
130    pub fn new_with_split(
131        io_runtime: &tokio::runtime::Runtime,
132        cpu_runtime: &tokio::runtime::Runtime,
133    ) -> Self {
134        Self {
135            io: RuntimeHandle::from_tokio_handle(io_runtime.handle().clone()),
136            cpu: RuntimeHandle::from_tokio_handle(cpu_runtime.handle().clone()),
137        }
138    }
139
140    /// Borrows the tokio runtime the caller is currently running in.
141    ///
142    /// Panics if called outside a tokio runtime context. Use
143    /// [`Runtime::try_current`] for a fallible version.
144    ///
145    /// Iceberg never implicitly spawns its own runtime; callers outside a
146    /// tokio context must construct one explicitly via [`Runtime::new`] or
147    /// [`Runtime::new_with_split`].
148    pub fn current() -> Self {
149        Self::try_current().expect(
150            "Runtime::current() called outside a tokio runtime context. \
151             Call it from within #[tokio::main] / #[tokio::test], or construct \
152             a Runtime explicitly via Runtime::new / Runtime::new_with_split.",
153        )
154    }
155
156    /// Fallible variant of [`Runtime::current`]. Returns an error if no tokio
157    /// runtime is available in the current context.
158    pub fn try_current() -> Result<Self> {
159        let handle = tokio::runtime::Handle::try_current().map_err(|e| {
160            Error::new(
161                ErrorKind::Unexpected,
162                "no tokio runtime in context; call Runtime::try_current() \
163                 from within a tokio runtime, or construct a Runtime explicitly \
164                 via Runtime::new / Runtime::new_with_split",
165            )
166            .with_source(e)
167        })?;
168        let rh = RuntimeHandle::from_tokio_handle(handle);
169        Ok(Self {
170            io: rh.clone(),
171            cpu: rh,
172        })
173    }
174
175    /// Handle for IO-bound work (network fetches, file reads).
176    pub fn io(&self) -> &RuntimeHandle {
177        &self.io
178    }
179
180    /// Handle for CPU-bound work (decoding, predicate eval, projection).
181    pub fn cpu(&self) -> &RuntimeHandle {
182        &self.cpu
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    /// A test harness that owns a tokio runtime and exposes a `Runtime` handle
191    /// plus a `block_on` helper for sync test bodies.
192    struct TestRuntime {
193        tokio: tokio::runtime::Runtime,
194        rt: Runtime,
195    }
196
197    impl TestRuntime {
198        fn new() -> Self {
199            let tokio = tokio::runtime::Builder::new_multi_thread()
200                .enable_all()
201                .build()
202                .expect("Failed to build tokio runtime");
203            let rt = Runtime::new(&tokio);
204            Self { tokio, rt }
205        }
206
207        fn block_on<F: Future>(&self, f: F) -> F::Output {
208            self.tokio.block_on(f)
209        }
210    }
211
212    #[test]
213    fn test_runtime_spawn_io() {
214        let h = TestRuntime::new();
215        let handle = h.rt.io().spawn(async { 1 + 1 });
216        assert_eq!(h.block_on(handle).unwrap(), 2);
217    }
218
219    #[test]
220    fn test_runtime_spawn_cpu() {
221        let h = TestRuntime::new();
222        let handle = h.rt.cpu().spawn(async { 3 + 4 });
223        assert_eq!(h.block_on(handle).unwrap(), 7);
224    }
225
226    #[test]
227    fn test_runtime_spawn_blocking() {
228        let h = TestRuntime::new();
229        let handle = h.rt.cpu().spawn_blocking(|| 1 + 1);
230        assert_eq!(h.block_on(handle).unwrap(), 2);
231    }
232
233    #[test]
234    fn test_runtime_new_with_custom_runtime() {
235        let h = TestRuntime::new();
236        let handle = h.rt.io().spawn(async { 42 });
237        assert_eq!(h.block_on(handle).unwrap(), 42);
238    }
239
240    #[test]
241    fn test_runtime_split_uses_separate_handles() {
242        let io_rt = tokio::runtime::Builder::new_multi_thread()
243            .enable_all()
244            .build()
245            .unwrap();
246        let cpu_rt = tokio::runtime::Builder::new_multi_thread()
247            .enable_all()
248            .build()
249            .unwrap();
250        let rt = Runtime::new_with_split(&io_rt, &cpu_rt);
251        // Spawn on each and confirm both are distinct live runtimes. We use
252        // `io_rt`/`cpu_rt` directly to `block_on` since our `Runtime` doesn't
253        // expose one.
254        let io_result = io_rt.block_on(async { rt.io().spawn(async { "io" }).await.unwrap() });
255        let cpu_result = cpu_rt.block_on(async { rt.cpu().spawn(async { "cpu" }).await.unwrap() });
256        assert_eq!(io_result, "io");
257        assert_eq!(cpu_result, "cpu");
258    }
259
260    #[test]
261    fn test_runtime_clone() {
262        let h = TestRuntime::new();
263        let rt2 = h.rt.clone();
264        let handle = rt2.io().spawn(async { 5 });
265        assert_eq!(h.block_on(handle).unwrap(), 5);
266    }
267
268    #[test]
269    fn test_runtime_debug() {
270        let h = TestRuntime::new();
271        let debug_str = format!("{:?}", h.rt);
272        assert!(debug_str.contains("Runtime"));
273    }
274
275    #[tokio::test(flavor = "multi_thread")]
276    async fn test_try_current_in_runtime() {
277        let rt = Runtime::try_current().expect("should find current runtime");
278        let result = rt.io().spawn(async { 7 }).await.unwrap();
279        assert_eq!(result, 7);
280    }
281
282    #[test]
283    fn test_try_current_outside_runtime() {
284        let err = Runtime::try_current().expect_err("must fail outside runtime");
285        assert_eq!(err.kind(), ErrorKind::Unexpected);
286    }
287
288    /// Verifies that when the caller drops the underlying tokio runtime, a
289    /// subsequent spawn surfaces as a `JoinError` via our `JoinHandle` rather
290    /// than hanging or misbehaving.
291    #[test]
292    fn test_spawn_after_runtime_drop_errors() {
293        let driver = tokio::runtime::Builder::new_current_thread()
294            .enable_all()
295            .build()
296            .unwrap();
297        let owned = tokio::runtime::Builder::new_multi_thread()
298            .enable_all()
299            .build()
300            .unwrap();
301        let rt = Runtime::new(&owned);
302        // Drop the caller's owned runtime. Our `Runtime` only holds a `Handle`,
303        // so this shuts down the underlying tokio runtime.
304        drop(owned);
305
306        // Spawning after shutdown returns a handle that resolves to a cancelled
307        // JoinError, which our wrapper maps to iceberg::Error.
308        let handle = rt.io().spawn(async { 1 });
309        let result = driver.block_on(handle);
310        assert!(result.is_err(), "expected error after runtime shutdown");
311    }
312}