iceberg/spec/manifest_list/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use apache_avro::Writer;
21use bytes::Bytes;
22
23use super::_const_schema::{
24    MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2, MANIFEST_LIST_AVRO_SCHEMA_V3,
25};
26use super::_serde::{ManifestFileV1, ManifestFileV2, ManifestFileV3};
27use super::{FormatVersion, ManifestContentType, ManifestFile, UNASSIGNED_SEQUENCE_NUMBER};
28use crate::error::Result;
29use crate::io::FileWrite;
30use crate::{Error, ErrorKind};
31
32/// A manifest list writer.
33pub struct ManifestListWriter {
34    format_version: FormatVersion,
35    writer: Box<dyn FileWrite>,
36    avro_writer: Writer<'static, Vec<u8>>,
37    sequence_number: i64,
38    snapshot_id: i64,
39    next_row_id: Option<u64>,
40}
41
42impl std::fmt::Debug for ManifestListWriter {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("ManifestListWriter")
45            .field("format_version", &self.format_version)
46            .field("avro_writer", &self.avro_writer.schema())
47            .finish_non_exhaustive()
48    }
49}
50
51impl ManifestListWriter {
52    /// Get the next row ID that will be assigned to the next data manifest added.
53    pub fn next_row_id(&self) -> Option<u64> {
54        self.next_row_id
55    }
56
57    /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
58    pub fn v1(
59        writer: Box<dyn FileWrite>,
60        snapshot_id: i64,
61        parent_snapshot_id: Option<i64>,
62    ) -> Self {
63        let mut metadata = HashMap::from_iter([
64            ("snapshot-id".to_string(), snapshot_id.to_string()),
65            ("format-version".to_string(), "1".to_string()),
66        ]);
67        if let Some(parent_snapshot_id) = parent_snapshot_id {
68            metadata.insert(
69                "parent-snapshot-id".to_string(),
70                parent_snapshot_id.to_string(),
71            );
72        }
73        Self::new(FormatVersion::V1, writer, metadata, 0, snapshot_id, None)
74    }
75
76    /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
77    pub fn v2(
78        writer: Box<dyn FileWrite>,
79        snapshot_id: i64,
80        parent_snapshot_id: Option<i64>,
81        sequence_number: i64,
82    ) -> Self {
83        let mut metadata = HashMap::from_iter([
84            ("snapshot-id".to_string(), snapshot_id.to_string()),
85            ("sequence-number".to_string(), sequence_number.to_string()),
86            ("format-version".to_string(), "2".to_string()),
87        ]);
88        metadata.insert(
89            "parent-snapshot-id".to_string(),
90            parent_snapshot_id
91                .map(|v| v.to_string())
92                .unwrap_or("null".to_string()),
93        );
94        Self::new(
95            FormatVersion::V2,
96            writer,
97            metadata,
98            sequence_number,
99            snapshot_id,
100            None,
101        )
102    }
103
104    /// Construct a v3 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
105    pub fn v3(
106        writer: Box<dyn FileWrite>,
107        snapshot_id: i64,
108        parent_snapshot_id: Option<i64>,
109        sequence_number: i64,
110        first_row_id: Option<u64>, // Always None for delete manifests
111    ) -> Self {
112        let mut metadata = HashMap::from_iter([
113            ("snapshot-id".to_string(), snapshot_id.to_string()),
114            ("sequence-number".to_string(), sequence_number.to_string()),
115            ("format-version".to_string(), "3".to_string()),
116        ]);
117        metadata.insert(
118            "parent-snapshot-id".to_string(),
119            parent_snapshot_id
120                .map(|v| v.to_string())
121                .unwrap_or("null".to_string()),
122        );
123        metadata.insert(
124            "first-row-id".to_string(),
125            first_row_id
126                .map(|v| v.to_string())
127                .unwrap_or("null".to_string()),
128        );
129        Self::new(
130            FormatVersion::V3,
131            writer,
132            metadata,
133            sequence_number,
134            snapshot_id,
135            first_row_id,
136        )
137    }
138
139    fn new(
140        format_version: FormatVersion,
141        writer: Box<dyn FileWrite>,
142        metadata: HashMap<String, String>,
143        sequence_number: i64,
144        snapshot_id: i64,
145        first_row_id: Option<u64>,
146    ) -> Self {
147        let avro_schema = match format_version {
148            FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
149            FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
150            FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
151        };
152        let mut avro_writer = Writer::new(avro_schema, Vec::new());
153        for (key, value) in metadata {
154            avro_writer
155                .add_user_metadata(key, value)
156                .expect("Avro metadata should be added to the writer before the first record.");
157        }
158        Self {
159            format_version,
160            writer,
161            avro_writer,
162            sequence_number,
163            snapshot_id,
164            next_row_id: first_row_id,
165        }
166    }
167
168    /// Append manifests to be written.
169    ///
170    /// If V3 Manifests are added and the `first_row_id` of any data manifest is unassigned,
171    /// it will be assigned based on the `next_row_id` of the writer, and the `next_row_id` of the writer will be updated accordingly.
172    /// If `first_row_id` is already assigned, it will be validated against the `next_row_id` of the writer.
173    pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
174        match self.format_version {
175            FormatVersion::V1 => {
176                for manifest in manifests {
177                    let manifests: ManifestFileV1 = manifest.try_into()?;
178                    self.avro_writer.append_ser(manifests)?;
179                }
180            }
181            FormatVersion::V2 | FormatVersion::V3 => {
182                for mut manifest in manifests {
183                    self.assign_sequence_numbers(&mut manifest)?;
184
185                    if self.format_version == FormatVersion::V2 {
186                        let manifest_entry: ManifestFileV2 = manifest.try_into()?;
187                        self.avro_writer.append_ser(manifest_entry)?;
188                    } else if self.format_version == FormatVersion::V3 {
189                        self.assign_first_row_id(&mut manifest)?;
190                        let manifest_entry: ManifestFileV3 = manifest.try_into()?;
191                        self.avro_writer.append_ser(manifest_entry)?;
192                    }
193                }
194            }
195        }
196        Ok(())
197    }
198
199    /// Write the manifest list to the output file.
200    pub async fn close(mut self) -> Result<()> {
201        let data = self.avro_writer.into_inner()?;
202        self.writer.write(Bytes::from(data)).await?;
203        self.writer.close().await?;
204        Ok(())
205    }
206
207    /// Assign sequence numbers to manifest if they are unassigned
208    fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> Result<()> {
209        if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
210            if manifest.added_snapshot_id != self.snapshot_id {
211                return Err(Error::new(
212                    ErrorKind::DataInvalid,
213                    format!(
214                        "Found unassigned sequence number for a manifest from snapshot {}.",
215                        manifest.added_snapshot_id
216                    ),
217                ));
218            }
219            manifest.sequence_number = self.sequence_number;
220        }
221
222        if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
223            if manifest.added_snapshot_id != self.snapshot_id {
224                return Err(Error::new(
225                    ErrorKind::DataInvalid,
226                    format!(
227                        "Found unassigned sequence number for a manifest from snapshot {}.",
228                        manifest.added_snapshot_id
229                    ),
230                ));
231            }
232            manifest.min_sequence_number = self.sequence_number;
233        }
234
235        Ok(())
236    }
237
238    /// Returns number of newly assigned first-row-ids, if any.
239    fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> Result<()> {
240        match manifest.content {
241            ManifestContentType::Data => {
242                match (self.next_row_id, manifest.first_row_id) {
243                    (Some(_), Some(_)) => {
244                        // Case: Manifest with already assigned first row ID.
245                        // No need to increase next_row_id, as this manifest is already assigned.
246                    }
247                    (None, Some(manifest_first_row_id)) => {
248                        // Case: Assigned first row ID for data manifest, but the writer does not have a next-row-id assigned.
249                        return Err(Error::new(
250                            ErrorKind::Unexpected,
251                            format!(
252                                "Found invalid first-row-id assignment for Manifest {}. Writer does not have a next-row-id assigned, but the manifest has first-row-id assigned to {}.",
253                                manifest.manifest_path, manifest_first_row_id,
254                            ),
255                        ));
256                    }
257                    (Some(writer_next_row_id), None) => {
258                        // Case: Unassigned first row ID for data manifest. This is either a new
259                        // manifest, or a manifest from a pre-v3 snapshot. We need to assign one.
260                        let (existing_rows_count, added_rows_count) =
261                            require_row_counts_in_manifest(manifest)?;
262                        manifest.first_row_id = Some(writer_next_row_id);
263
264                        self.next_row_id = writer_next_row_id
265                        .checked_add(existing_rows_count)
266                        .and_then(|sum| sum.checked_add(added_rows_count))
267                        .ok_or_else(|| {
268                            Error::new(
269                                ErrorKind::DataInvalid,
270                                format!(
271                                    "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}",
272                                    manifest.manifest_path
273                                ),
274                            )
275                        }).map(Some)?;
276                    }
277                    (None, None) => {
278                        // Case: Table without row lineage. No action needed.
279                    }
280                }
281            }
282            ManifestContentType::Deletes => {
283                // Deletes never have a first-row-id assigned.
284                manifest.first_row_id = None;
285            }
286        };
287
288        Ok(())
289    }
290}
291
292fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> {
293    let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
294        Error::new(
295            ErrorKind::DataInvalid,
296            format!(
297                "Cannot include a Manifest without existing-rows-count to a table with row lineage enabled. Manifest path: {}",
298                manifest.manifest_path,
299            ),
300        )
301    })?;
302    let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
303        Error::new(
304            ErrorKind::DataInvalid,
305            format!(
306                "Cannot include a Manifest without added-rows-count to a table with row lineage enabled. Manifest path: {}",
307                manifest.manifest_path,
308            ),
309        )
310    })?;
311    Ok((existing_rows_count, added_rows_count))
312}
313
314#[cfg(test)]
315mod test {
316    use std::fs;
317    use std::path::Path;
318
319    use tempfile::TempDir;
320
321    use super::ManifestListWriter;
322    use crate::io::{FileIO, FileWrite};
323    use crate::spec::{
324        Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList,
325        UNASSIGNED_SEQUENCE_NUMBER,
326    };
327
328    #[tokio::test]
329    async fn test_manifest_list_writer_v1() {
330        let expected_manifest_list = ManifestList {
331            entries: vec![ManifestFile {
332                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
333                manifest_length: 5806,
334                partition_spec_id: 1,
335                content: ManifestContentType::Data,
336                sequence_number: 0,
337                min_sequence_number: 0,
338                added_snapshot_id: 1646658105718557341,
339                added_files_count: Some(3),
340                existing_files_count: Some(0),
341                deleted_files_count: Some(0),
342                added_rows_count: Some(3),
343                existing_rows_count: Some(0),
344                deleted_rows_count: Some(0),
345                partitions: Some(
346                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
347                ),
348                key_metadata: None,
349                first_row_id: None,
350            }]
351        };
352
353        let temp_dir = TempDir::new().unwrap();
354        let path = temp_dir.path().join("manifest_list_v1.avro");
355        let io = FileIO::new_with_fs();
356        let file_writer = file_writer(&path, io).await;
357
358        let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
359        writer
360            .add_manifests(expected_manifest_list.entries.clone().into_iter())
361            .unwrap();
362        writer.close().await.unwrap();
363
364        let bs = fs::read(path).unwrap();
365
366        let manifest_list =
367            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
368        assert_eq!(manifest_list, expected_manifest_list);
369
370        temp_dir.close().unwrap();
371    }
372
373    #[tokio::test]
374    async fn test_manifest_list_writer_v2() {
375        let snapshot_id = 377075049360453639;
376        let seq_num = 1;
377        let mut expected_manifest_list = ManifestList {
378            entries: vec![ManifestFile {
379                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
380                manifest_length: 6926,
381                partition_spec_id: 1,
382                content: ManifestContentType::Data,
383                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
384                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
385                added_snapshot_id: snapshot_id,
386                added_files_count: Some(1),
387                existing_files_count: Some(0),
388                deleted_files_count: Some(0),
389                added_rows_count: Some(3),
390                existing_rows_count: Some(0),
391                deleted_rows_count: Some(0),
392                partitions: Some(
393                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
394                ),
395                key_metadata: None,
396                first_row_id: None,
397            }]
398        };
399
400        let temp_dir = TempDir::new().unwrap();
401        let path = temp_dir.path().join("manifest_list_v2.avro");
402        let io = FileIO::new_with_fs();
403        let file_writer = file_writer(&path, io).await;
404
405        let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
406        writer
407            .add_manifests(expected_manifest_list.entries.clone().into_iter())
408            .unwrap();
409        writer.close().await.unwrap();
410
411        let bs = fs::read(path).unwrap();
412        let manifest_list =
413            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
414        expected_manifest_list.entries[0].sequence_number = seq_num;
415        expected_manifest_list.entries[0].min_sequence_number = seq_num;
416        assert_eq!(manifest_list, expected_manifest_list);
417
418        temp_dir.close().unwrap();
419    }
420
421    #[tokio::test]
422    async fn test_manifest_list_writer_v3() {
423        let snapshot_id = 377075049360453639;
424        let seq_num = 1;
425        let mut expected_manifest_list = ManifestList {
426            entries: vec![ManifestFile {
427                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
428                manifest_length: 6926,
429                partition_spec_id: 1,
430                content: ManifestContentType::Data,
431                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
432                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
433                added_snapshot_id: snapshot_id,
434                added_files_count: Some(1),
435                existing_files_count: Some(0),
436                deleted_files_count: Some(0),
437                added_rows_count: Some(3),
438                existing_rows_count: Some(0),
439                deleted_rows_count: Some(0),
440                partitions: Some(
441                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
442                ),
443                key_metadata: None,
444                first_row_id: Some(10),
445            }]
446        };
447
448        let temp_dir = TempDir::new().unwrap();
449        let path = temp_dir.path().join("manifest_list_v2.avro");
450        let io = FileIO::new_with_fs();
451        let file_writer = file_writer(&path, io).await;
452
453        let mut writer =
454            ManifestListWriter::v3(file_writer, snapshot_id, Some(0), seq_num, Some(10));
455        writer
456            .add_manifests(expected_manifest_list.entries.clone().into_iter())
457            .unwrap();
458        writer.close().await.unwrap();
459
460        let bs = fs::read(path).unwrap();
461        let manifest_list =
462            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
463        expected_manifest_list.entries[0].sequence_number = seq_num;
464        expected_manifest_list.entries[0].min_sequence_number = seq_num;
465        expected_manifest_list.entries[0].first_row_id = Some(10);
466        assert_eq!(manifest_list, expected_manifest_list);
467
468        temp_dir.close().unwrap();
469    }
470
471    #[tokio::test]
472    async fn test_manifest_list_writer_v1_as_v2() {
473        let expected_manifest_list = ManifestList {
474            entries: vec![ManifestFile {
475                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
476                manifest_length: 5806,
477                partition_spec_id: 1,
478                content: ManifestContentType::Data,
479                sequence_number: 0,
480                min_sequence_number: 0,
481                added_snapshot_id: 1646658105718557341,
482                added_files_count: Some(3),
483                existing_files_count: Some(0),
484                deleted_files_count: Some(0),
485                added_rows_count: Some(3),
486                existing_rows_count: Some(0),
487                deleted_rows_count: Some(0),
488                partitions: Some(
489                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
490                ),
491                key_metadata: None,
492                first_row_id: None,
493            }]
494        };
495
496        let temp_dir = TempDir::new().unwrap();
497        let path = temp_dir.path().join("manifest_list_v1.avro");
498        let io = FileIO::new_with_fs();
499        let file_writer = file_writer(&path, io).await;
500
501        let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
502        writer
503            .add_manifests(expected_manifest_list.entries.clone().into_iter())
504            .unwrap();
505        writer.close().await.unwrap();
506
507        let bs = fs::read(path).unwrap();
508
509        let manifest_list =
510            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
511        assert_eq!(manifest_list, expected_manifest_list);
512
513        temp_dir.close().unwrap();
514    }
515
516    #[tokio::test]
517    async fn test_manifest_list_writer_v1_as_v3() {
518        let expected_manifest_list = ManifestList {
519            entries: vec![ManifestFile {
520                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
521                manifest_length: 5806,
522                partition_spec_id: 1,
523                content: ManifestContentType::Data,
524                sequence_number: 0,
525                min_sequence_number: 0,
526                added_snapshot_id: 1646658105718557341,
527                added_files_count: Some(3),
528                existing_files_count: Some(0),
529                deleted_files_count: Some(0),
530                added_rows_count: Some(3),
531                existing_rows_count: Some(0),
532                deleted_rows_count: Some(0),
533                partitions: Some(
534                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
535                ),
536                key_metadata: None,
537                first_row_id: None,
538            }]
539        };
540
541        let temp_dir = TempDir::new().unwrap();
542        let path = temp_dir.path().join("manifest_list_v1.avro");
543        let io = FileIO::new_with_fs();
544        let file_writer = file_writer(&path, io).await;
545
546        let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
547        writer
548            .add_manifests(expected_manifest_list.entries.clone().into_iter())
549            .unwrap();
550        writer.close().await.unwrap();
551
552        let bs = fs::read(path).unwrap();
553
554        let manifest_list =
555            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
556        assert_eq!(manifest_list, expected_manifest_list);
557
558        temp_dir.close().unwrap();
559    }
560
561    #[tokio::test]
562    async fn test_manifest_list_writer_v2_as_v3() {
563        let snapshot_id = 377075049360453639;
564        let seq_num = 1;
565        let mut expected_manifest_list = ManifestList {
566            entries: vec![ManifestFile {
567                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
568                manifest_length: 6926,
569                partition_spec_id: 1,
570                content: ManifestContentType::Data,
571                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
572                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
573                added_snapshot_id: snapshot_id,
574                added_files_count: Some(1),
575                existing_files_count: Some(0),
576                deleted_files_count: Some(0),
577                added_rows_count: Some(3),
578                existing_rows_count: Some(0),
579                deleted_rows_count: Some(0),
580                partitions: Some(
581                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
582                ),
583                key_metadata: None,
584                first_row_id: None,
585            }]
586        };
587
588        let temp_dir = TempDir::new().unwrap();
589        let path = temp_dir.path().join("manifest_list_v2.avro");
590        let io = FileIO::new_with_fs();
591        let file_writer = file_writer(&path, io).await;
592
593        let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
594        writer
595            .add_manifests(expected_manifest_list.entries.clone().into_iter())
596            .unwrap();
597        writer.close().await.unwrap();
598
599        let bs = fs::read(path).unwrap();
600
601        let manifest_list =
602            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
603        expected_manifest_list.entries[0].sequence_number = seq_num;
604        expected_manifest_list.entries[0].min_sequence_number = seq_num;
605        assert_eq!(manifest_list, expected_manifest_list);
606
607        temp_dir.close().unwrap();
608    }
609
610    async fn file_writer(path: &Path, io: FileIO) -> Box<dyn FileWrite> {
611        io.new_output(path.to_str().unwrap())
612            .unwrap()
613            .writer()
614            .await
615            .unwrap()
616    }
617}