Skip to main content

iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod expire_snapshots;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_schema;
63mod update_statistics;
64mod upgrade_format_version;
65
66use std::sync::Arc;
67use std::time::Duration;
68
69use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
70pub use update_schema::AddColumn;
71
72use crate::error::Result;
73use crate::spec::TableProperties;
74use crate::table::Table;
75use crate::transaction::action::BoxedTransactionAction;
76use crate::transaction::append::FastAppendAction;
77use crate::transaction::expire_snapshots::ExpireSnapshotsAction;
78use crate::transaction::sort_order::ReplaceSortOrderAction;
79use crate::transaction::update_location::UpdateLocationAction;
80use crate::transaction::update_properties::UpdatePropertiesAction;
81use crate::transaction::update_schema::UpdateSchemaAction;
82use crate::transaction::update_statistics::UpdateStatisticsAction;
83use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
84use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
85
86/// Table transaction.
87#[derive(Clone)]
88pub struct Transaction {
89    table: Table,
90    actions: Vec<BoxedTransactionAction>,
91}
92
93impl Transaction {
94    /// Creates a new transaction.
95    pub fn new(table: &Table) -> Self {
96        Self {
97            table: table.clone(),
98            actions: vec![],
99        }
100    }
101
102    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
103        let mut metadata_builder = table.metadata().clone().into_builder(None);
104        for update in updates {
105            metadata_builder = update.clone().apply(metadata_builder)?;
106        }
107
108        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
109    }
110
111    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
112    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
113    fn apply(
114        table: Table,
115        mut action_commit: ActionCommit,
116        existing_updates: &mut Vec<TableUpdate>,
117        existing_requirements: &mut Vec<TableRequirement>,
118    ) -> Result<Table> {
119        let updates = action_commit.take_updates();
120        let requirements = action_commit.take_requirements();
121
122        for requirement in &requirements {
123            requirement.check(Some(table.metadata()))?;
124        }
125
126        let updated_table = Self::update_table_metadata(table, &updates)?;
127
128        existing_updates.extend(updates);
129        existing_requirements.extend(requirements);
130
131        Ok(updated_table)
132    }
133
134    /// Sets table to a new version.
135    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
136        UpgradeFormatVersionAction::new()
137    }
138
139    /// Update table's property.
140    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
141        UpdatePropertiesAction::new()
142    }
143
144    /// Creates an update schema action.
145    pub fn update_schema(&self) -> UpdateSchemaAction {
146        UpdateSchemaAction::new()
147    }
148
149    /// Creates a fast append action.
150    pub fn fast_append(&self) -> FastAppendAction {
151        FastAppendAction::new()
152    }
153
154    /// Creates replace sort order action.
155    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
156        ReplaceSortOrderAction::new()
157    }
158
159    /// Set the location of table
160    pub fn update_location(&self) -> UpdateLocationAction {
161        UpdateLocationAction::new()
162    }
163
164    /// Update the statistics of table
165    pub fn update_statistics(&self) -> UpdateStatisticsAction {
166        UpdateStatisticsAction::new()
167    }
168
169    /// Expire snapshots from the table metadata.
170    pub fn expire_snapshots(&self) -> ExpireSnapshotsAction {
171        ExpireSnapshotsAction::new()
172    }
173
174    /// Commit transaction.
175    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
176        if self.actions.is_empty() {
177            // nothing to commit
178            return Ok(self.table);
179        }
180
181        let table_props = self.table.metadata().table_properties()?;
182
183        // TODO(https://github.com/apache/iceberg-rust/issues/2034): remove once encrypted writes are supported
184        if table_props.encryption_key_id.is_some() {
185            return Err(Error::new(
186                ErrorKind::FeatureUnsupported,
187                "Cannot commit to an encrypted table: encrypted writes are not yet supported",
188            ));
189        }
190
191        let backoff = Self::build_backoff(table_props)?;
192        let tx = self;
193
194        (|mut tx: Transaction| async {
195            let result = tx.do_commit(catalog).await;
196            (tx, result)
197        })
198        .retry(backoff)
199        .sleep(tokio::time::sleep)
200        .context(tx)
201        .when(|e| e.retryable())
202        .await
203        .1
204    }
205
206    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
207        Ok(ExponentialBuilder::new()
208            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
209            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
210            .with_total_delay(Some(Duration::from_millis(
211                props.commit_total_retry_timeout_ms,
212            )))
213            .with_max_times(props.commit_num_retries)
214            .with_factor(2.0)
215            .build())
216    }
217
218    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
219        let refreshed = catalog.load_table(self.table.identifier()).await?;
220
221        if self.table.metadata() != refreshed.metadata()
222            || self.table.metadata_location() != refreshed.metadata_location()
223        {
224            // current base is stale, use refreshed as base and re-apply transaction actions
225            self.table = refreshed.clone();
226        }
227
228        let mut current_table = self.table.clone();
229        let mut existing_updates: Vec<TableUpdate> = vec![];
230        let mut existing_requirements: Vec<TableRequirement> = vec![];
231
232        for action in &self.actions {
233            let action_commit = Arc::clone(action).commit(&current_table).await?;
234            // apply action commit to current_table
235            current_table = Self::apply(
236                current_table,
237                action_commit,
238                &mut existing_updates,
239                &mut existing_requirements,
240            )?;
241        }
242
243        let table_commit = TableCommit::builder()
244            .ident(self.table.identifier().to_owned())
245            .updates(existing_updates)
246            .requirements(existing_requirements)
247            .build();
248
249        catalog.update_table(table_commit).await
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use std::collections::HashMap;
256    use std::fs::File;
257    use std::io::BufReader;
258    use std::sync::Arc;
259    use std::sync::atomic::{AtomicU32, Ordering};
260
261    use crate::catalog::MockCatalog;
262    use crate::encryption::SensitiveBytes;
263    use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient};
264    use crate::io::FileIO;
265    use crate::memory::tests::new_memory_catalog;
266    use crate::spec::{
267        DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, TableMetadata,
268    };
269    use crate::table::Table;
270    use crate::test_utils::test_runtime;
271    use crate::transaction::{ApplyTransactionAction, Transaction};
272    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
273
274    pub fn make_v1_table() -> Table {
275        let file = File::open(format!(
276            "{}/testdata/table_metadata/{}",
277            env!("CARGO_MANIFEST_DIR"),
278            "TableMetadataV1Valid.json"
279        ))
280        .unwrap();
281        let reader = BufReader::new(file);
282        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
283
284        Table::builder()
285            .metadata(resp)
286            .metadata_location("s3://bucket/test/location/metadata/v1.json")
287            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
288            .file_io(FileIO::new_with_memory())
289            .runtime(test_runtime())
290            .build()
291            .unwrap()
292    }
293
294    pub fn make_v2_table() -> Table {
295        let file = File::open(format!(
296            "{}/testdata/table_metadata/{}",
297            env!("CARGO_MANIFEST_DIR"),
298            "TableMetadataV2Valid.json"
299        ))
300        .unwrap();
301        let reader = BufReader::new(file);
302        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
303
304        Table::builder()
305            .metadata(resp)
306            .metadata_location("s3://bucket/test/location/metadata/v1.json")
307            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
308            .file_io(FileIO::new_with_memory())
309            .runtime(test_runtime())
310            .build()
311            .unwrap()
312    }
313
314    pub fn make_v2_minimal_table() -> Table {
315        let file = File::open(format!(
316            "{}/testdata/table_metadata/{}",
317            env!("CARGO_MANIFEST_DIR"),
318            "TableMetadataV2ValidMinimal.json"
319        ))
320        .unwrap();
321        let reader = BufReader::new(file);
322        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
323
324        Table::builder()
325            .metadata(resp)
326            .metadata_location("s3://bucket/test/location/metadata/v1.json")
327            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
328            .file_io(FileIO::new_with_memory())
329            .runtime(test_runtime())
330            .build()
331            .unwrap()
332    }
333
334    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
335        let table_ident =
336            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
337                .unwrap();
338
339        catalog
340            .create_namespace(table_ident.namespace(), HashMap::new())
341            .await
342            .unwrap();
343
344        let file = File::open(format!(
345            "{}/testdata/table_metadata/{}",
346            env!("CARGO_MANIFEST_DIR"),
347            "TableMetadataV3ValidMinimal.json"
348        ))
349        .unwrap();
350        let reader = BufReader::new(file);
351        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
352
353        let table_creation = TableCreation::builder()
354            .schema((**base_metadata.current_schema()).clone())
355            .partition_spec((**base_metadata.default_partition_spec()).clone())
356            .sort_order((**base_metadata.default_sort_order()).clone())
357            .name(table_ident.name().to_string())
358            .format_version(crate::spec::FormatVersion::V3)
359            .build();
360
361        catalog
362            .create_table(table_ident.namespace(), table_creation)
363            .await
364            .unwrap()
365    }
366
367    /// Helper function to create a test table with retry properties
368    pub(super) fn setup_test_table(num_retries: &str) -> Table {
369        let table = make_v2_table();
370
371        // Set retry properties
372        let mut props = HashMap::new();
373        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
374        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
375        props.insert(
376            "commit.retry.total-timeout-ms".to_string(),
377            "1000".to_string(),
378        );
379        props.insert(
380            "commit.retry.num-retries".to_string(),
381            num_retries.to_string(),
382        );
383
384        // Update table properties
385        let metadata = table
386            .metadata()
387            .clone()
388            .into_builder(None)
389            .set_properties(props)
390            .unwrap()
391            .build()
392            .unwrap()
393            .metadata;
394
395        table.with_metadata(Arc::new(metadata))
396    }
397
398    /// Helper function to create a transaction with a simple update action
399    fn create_test_transaction(table: &Table) -> Transaction {
400        let tx = Transaction::new(table);
401        tx.update_table_properties()
402            .set("test.key".to_string(), "test.value".to_string())
403            .apply(tx)
404            .unwrap()
405    }
406
407    /// Helper function to set up a mock catalog with retryable errors
408    fn setup_mock_catalog_with_retryable_errors(
409        success_after_attempts: Option<u32>,
410        expected_calls: usize,
411    ) -> MockCatalog {
412        let mut mock_catalog = MockCatalog::new();
413
414        mock_catalog
415            .expect_load_table()
416            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
417
418        let attempts = AtomicU32::new(0);
419        mock_catalog
420            .expect_update_table()
421            .times(expected_calls)
422            .returning_st(move |_| {
423                if let Some(success_after_attempts) = success_after_attempts {
424                    attempts.fetch_add(1, Ordering::SeqCst);
425                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
426                        Box::pin(async move {
427                            Err(
428                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
429                                    .with_retryable(true),
430                            )
431                        })
432                    } else {
433                        Box::pin(async move { Ok(make_v2_table()) })
434                    }
435                } else {
436                    // Always fail with retryable error
437                    Box::pin(async move {
438                        Err(
439                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
440                                .with_retryable(true),
441                        )
442                    })
443                }
444            });
445
446        mock_catalog
447    }
448
449    /// Helper function to set up a mock catalog with non-retryable error
450    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
451        let mut mock_catalog = MockCatalog::new();
452
453        mock_catalog
454            .expect_load_table()
455            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
456
457        mock_catalog
458            .expect_update_table()
459            .times(1) // Should only be called once since error is not retryable
460            .returning_st(move |_| {
461                Box::pin(async move {
462                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
463                        .with_retryable(false))
464                })
465            });
466
467        mock_catalog
468    }
469
470    #[tokio::test]
471    async fn test_commit_retryable_error() {
472        // Create a test table with retry properties
473        let table = setup_test_table("3");
474
475        // Create a transaction with a simple update action
476        let tx = create_test_transaction(&table);
477
478        // Create a mock catalog that fails twice then succeeds
479        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
480
481        // Commit the transaction
482        let result = tx.commit(&mock_catalog).await;
483
484        // Verify the result
485        assert!(result.is_ok(), "Transaction should eventually succeed");
486    }
487
488    #[tokio::test]
489    async fn test_commit_non_retryable_error() {
490        // Create a test table with retry properties
491        let table = setup_test_table("3");
492
493        // Create a transaction with a simple update action
494        let tx = create_test_transaction(&table);
495
496        // Create a mock catalog that fails with non-retryable error
497        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
498
499        // Commit the transaction
500        let result = tx.commit(&mock_catalog).await;
501
502        // Verify the result
503        assert!(result.is_err(), "Transaction should fail immediately");
504        if let Err(err) = result {
505            assert_eq!(err.kind(), ErrorKind::Unexpected);
506            assert_eq!(err.message(), "Non-retryable error");
507            assert!(!err.retryable(), "Error should not be retryable");
508        }
509    }
510
511    #[tokio::test]
512    async fn test_commit_max_retries_exceeded() {
513        // Create a test table with retry properties (only allow 2 retries)
514        let table = setup_test_table("2");
515
516        // Create a transaction with a simple update action
517        let tx = create_test_transaction(&table);
518
519        // Create a mock catalog that always fails with retryable error
520        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
521
522        // Commit the transaction
523        let result = tx.commit(&mock_catalog).await;
524
525        // Verify the result
526        assert!(result.is_err(), "Transaction should fail after max retries");
527        if let Err(err) = result {
528            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
529            assert_eq!(err.message(), "Commit conflict");
530            assert!(err.retryable(), "Error should be retryable");
531        }
532    }
533
534    #[tokio::test]
535    async fn test_transaction_snapshot_summary() {
536        let catalog = new_memory_catalog().await;
537        let table = make_v3_minimal_table_in_catalog(&catalog).await;
538
539        let mut file_seq = 0u32;
540        let mut append_file = |table: &crate::table::Table, record_count: u64, file_size: u64| {
541            file_seq += 1;
542            let file = DataFileBuilder::default()
543                .content(DataContentType::Data)
544                .file_path(format!("test/{file_seq}.parquet"))
545                .file_format(DataFileFormat::Parquet)
546                .file_size_in_bytes(file_size)
547                .record_count(record_count)
548                .partition(Struct::from_iter([Some(Literal::long(1))]))
549                .partition_spec_id(0)
550                .build()
551                .unwrap();
552            let tx = Transaction::new(table);
553            tx.fast_append()
554                .add_data_files(vec![file])
555                .apply(tx)
556                .unwrap()
557        };
558
559        let table = append_file(&table, /*record_count=*/ 10, /*file_size=*/ 100)
560            .commit(&catalog)
561            .await
562            .unwrap();
563        let table = append_file(&table, /*record_count=*/ 20, /*file_size=*/ 200)
564            .commit(&catalog)
565            .await
566            .unwrap();
567
568        let summary = &table
569            .metadata()
570            .current_snapshot()
571            .unwrap()
572            .summary()
573            .additional_properties;
574
575        assert_eq!(summary.get("total-records").unwrap(), "30");
576        assert_eq!(summary.get("total-data-files").unwrap(), "2");
577        assert_eq!(summary.get("total-files-size").unwrap(), "300");
578    }
579
580    #[tokio::test]
581    async fn test_commit_rejects_encrypted_table() {
582        let file = File::open(format!(
583            "{}/testdata/table_metadata/{}",
584            env!("CARGO_MANIFEST_DIR"),
585            "TableMetadataV3ValidEncryption.json"
586        ))
587        .unwrap();
588        let reader = BufReader::new(file);
589        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
590
591        let kms: Arc<dyn KeyManagementClient> = {
592            let k = MemoryKeyManagementClient::new();
593            k.add_master_key_bytes(
594                "master-1",
595                SensitiveBytes::new([
596                    0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
597                    0x0d, 0x0e, 0x0f,
598                ]),
599            )
600            .unwrap();
601            Arc::new(k)
602        };
603
604        let table = Table::builder()
605            .metadata(resp)
606            .metadata_location("s3://bucket/test/location/metadata/v1.json")
607            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
608            .file_io(FileIO::new_with_memory())
609            .kms_client(kms)
610            .runtime(crate::test_utils::test_runtime())
611            .build()
612            .unwrap();
613
614        let tx = Transaction::new(&table);
615        let tx = tx
616            .update_table_properties()
617            .set("test.key".to_string(), "test.value".to_string())
618            .apply(tx)
619            .unwrap();
620
621        let mock_catalog = MockCatalog::new();
622        let result = tx.commit(&mock_catalog).await;
623
624        assert!(result.is_err());
625        let err = result.unwrap_err();
626        assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
627        assert!(
628            err.message()
629                .contains("encrypted writes are not yet supported"),
630            "unexpected error message: {}",
631            err.message()
632        );
633    }
634}
635
636#[cfg(test)]
637mod test_row_lineage {
638    use crate::memory::tests::new_memory_catalog;
639    use crate::spec::{
640        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
641    };
642    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
643    use crate::transaction::{ApplyTransactionAction, Transaction};
644
645    #[tokio::test]
646    async fn test_fast_append_with_row_lineage() {
647        // Helper function to create a data file with specified number of rows
648        fn file_with_rows(record_count: u64) -> DataFile {
649            DataFileBuilder::default()
650                .content(DataContentType::Data)
651                .file_path(format!("test/{record_count}.parquet"))
652                .file_format(DataFileFormat::Parquet)
653                .file_size_in_bytes(100)
654                .record_count(record_count)
655                .partition(Struct::from_iter([Some(Literal::long(0))]))
656                .partition_spec_id(0)
657                .build()
658                .unwrap()
659        }
660        let catalog = new_memory_catalog().await;
661
662        let table = make_v3_minimal_table_in_catalog(&catalog).await;
663
664        // Check initial state - next_row_id should be 0
665        assert_eq!(table.metadata().next_row_id(), 0);
666
667        // First fast append with 30 rows
668        let tx = Transaction::new(&table);
669        let data_file_30 = file_with_rows(30);
670        let action = tx.fast_append().add_data_files(vec![data_file_30]);
671        let tx = action.apply(tx).unwrap();
672        let table = tx.commit(&catalog).await.unwrap();
673
674        // Check snapshot and table state after first append
675        let snapshot = table.metadata().current_snapshot().unwrap();
676        assert_eq!(snapshot.first_row_id(), Some(0));
677        assert_eq!(table.metadata().next_row_id(), 30);
678
679        // Check written manifest for first_row_id
680        let snapshot = table.metadata().current_snapshot().unwrap();
681        let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();
682
683        assert_eq!(manifest_list.entries().len(), 1);
684        let manifest_file = &manifest_list.entries()[0];
685        assert_eq!(manifest_file.first_row_id, Some(0));
686
687        // Second fast append with 17 and 11 rows
688        let tx = Transaction::new(&table);
689        let data_file_17 = file_with_rows(17);
690        let data_file_11 = file_with_rows(11);
691        let action = tx
692            .fast_append()
693            .add_data_files(vec![data_file_17, data_file_11]);
694        let tx = action.apply(tx).unwrap();
695        let table = tx.commit(&catalog).await.unwrap();
696
697        // Check snapshot and table state after second append
698        let snapshot = table.metadata().current_snapshot().unwrap();
699        assert_eq!(snapshot.first_row_id(), Some(30));
700        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
701
702        // Check written manifest for first_row_id
703        let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();
704        assert_eq!(manifest_list.entries().len(), 2);
705        let manifest_file = &manifest_list.entries()[1];
706        assert_eq!(manifest_file.first_row_id, Some(30));
707    }
708}