iceberg_datafusion/table/
table_provider_factory.rs1use std::borrow::Cow;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
24use datafusion::error::Result as DFResult;
25use datafusion::logical_expr::CreateExternalTable;
26use datafusion::sql::TableReference;
27use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory};
28use iceberg::table::StaticTable;
29use iceberg::{Error, ErrorKind, Result, TableIdent};
30
31use super::IcebergStaticTableProvider;
32use crate::to_datafusion_error;
33
34#[derive(Debug, Default)]
100pub struct IcebergTableProviderFactory {
101 storage_factory: Option<Arc<dyn StorageFactory>>,
102}
103
104impl IcebergTableProviderFactory {
105 pub fn new() -> Self {
106 Self {
107 storage_factory: None,
108 }
109 }
110
111 pub fn new_with_storage_factory(storage_factory: Arc<dyn StorageFactory>) -> Self {
113 Self {
114 storage_factory: Some(storage_factory),
115 }
116 }
117}
118
119#[async_trait]
120impl TableProviderFactory for IcebergTableProviderFactory {
121 async fn create(
122 &self,
123 _state: &dyn Session,
124 cmd: &CreateExternalTable,
125 ) -> DFResult<Arc<dyn TableProvider>> {
126 check_cmd(cmd).map_err(to_datafusion_error)?;
127
128 let table_name = &cmd.name;
129 let metadata_file_path = &cmd.location;
130 let options = &cmd.options;
131
132 let table_name_with_ns = complement_namespace_if_necessary(table_name);
133
134 let storage_factory = self
135 .storage_factory
136 .clone()
137 .unwrap_or_else(|| Arc::new(LocalFsStorageFactory));
138
139 let table = create_static_table(
140 table_name_with_ns,
141 metadata_file_path,
142 options,
143 storage_factory,
144 )
145 .await
146 .map_err(to_datafusion_error)?
147 .into_table();
148
149 let provider = IcebergStaticTableProvider::try_new_from_table(table)
150 .await
151 .map_err(to_datafusion_error)?;
152
153 Ok(Arc::new(provider))
154 }
155}
156
157fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
158 let CreateExternalTable {
159 schema,
160 table_partition_cols,
161 order_exprs,
162 constraints,
163 column_defaults,
164 ..
165 } = cmd;
166
167 let is_invalid = !schema.fields().is_empty()
169 || !table_partition_cols.is_empty()
170 || !order_exprs.is_empty()
171 || !constraints.is_empty()
172 || !column_defaults.is_empty();
173
174 if is_invalid {
175 return Err(Error::new(
176 ErrorKind::FeatureUnsupported,
177 "Currently we only support reading existing icebergs tables in external table command. To create new table, please use catalog provider.",
178 ));
179 }
180
181 Ok(())
182}
183
184fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_, TableReference> {
195 match table_name {
196 TableReference::Bare { table } => {
197 Cow::Owned(TableReference::partial("default", table.as_ref()))
198 }
199 other => Cow::Borrowed(other),
200 }
201}
202
203async fn create_static_table(
204 table_name: Cow<'_, TableReference>,
205 metadata_file_path: &str,
206 props: &HashMap<String, String>,
207 storage_factory: Arc<dyn StorageFactory>,
208) -> Result<StaticTable> {
209 let table_ident = TableIdent::from_strs(table_name.to_vec())?;
210 let file_io = FileIOBuilder::new(storage_factory)
211 .with_props(props)
212 .build();
213 StaticTable::from_metadata_file(metadata_file_path, table_ident, file_io).await
214}
215
216#[cfg(test)]
217mod tests {
218
219 use datafusion::arrow::datatypes::{DataType, Field, Schema};
220 use datafusion::catalog::TableProviderFactory;
221 use datafusion::common::{Constraints, DFSchema};
222 use datafusion::execution::session_state::SessionStateBuilder;
223 use datafusion::logical_expr::CreateExternalTable;
224 use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
225 use datafusion::prelude::SessionContext;
226 use datafusion::sql::TableReference;
227
228 use super::*;
229
230 fn table_metadata_v2_schema() -> Schema {
231 Schema::new(vec![
232 Field::new("x", DataType::Int64, false).with_metadata(HashMap::from([(
233 PARQUET_FIELD_ID_META_KEY.to_string(),
234 "1".to_string(),
235 )])),
236 Field::new("y", DataType::Int64, false).with_metadata(HashMap::from([(
237 PARQUET_FIELD_ID_META_KEY.to_string(),
238 "2".to_string(),
239 )])),
240 Field::new("z", DataType::Int64, false).with_metadata(HashMap::from([(
241 PARQUET_FIELD_ID_META_KEY.to_string(),
242 "3".to_string(),
243 )])),
244 ])
245 }
246
247 fn table_metadata_location() -> String {
248 format!(
249 "{}/testdata/table_metadata/{}",
250 env!("CARGO_MANIFEST_DIR"),
251 "TableMetadataV2.json"
252 )
253 }
254
255 fn create_external_table_cmd() -> CreateExternalTable {
256 let metadata_file_path = table_metadata_location();
257
258 CreateExternalTable {
259 name: TableReference::partial("static_ns", "static_table"),
260 location: metadata_file_path,
261 schema: Arc::new(DFSchema::empty()),
262 file_type: "iceberg".to_string(),
263 options: Default::default(),
264 table_partition_cols: Default::default(),
265 order_exprs: Default::default(),
266 constraints: Constraints::default(),
267 column_defaults: Default::default(),
268 if_not_exists: Default::default(),
269 or_replace: false,
270 temporary: false,
271 definition: Default::default(),
272 unbounded: Default::default(),
273 }
274 }
275
276 #[tokio::test]
277 async fn test_schema_of_created_table() {
278 let factory = IcebergTableProviderFactory::new();
279
280 let state = SessionStateBuilder::new().build();
281 let cmd = create_external_table_cmd();
282
283 let table_provider = factory
284 .create(&state, &cmd)
285 .await
286 .expect("create table failed");
287
288 let expected_schema = table_metadata_v2_schema();
289 let actual_schema = table_provider.schema();
290
291 assert_eq!(actual_schema.as_ref(), &expected_schema);
292 }
293
294 #[tokio::test]
295 async fn test_schema_of_created_external_table_sql() {
296 let mut state = SessionStateBuilder::new().with_default_features().build();
297 state.table_factories_mut().insert(
298 "ICEBERG".to_string(),
299 Arc::new(IcebergTableProviderFactory::new()),
300 );
301 let ctx = SessionContext::new_with_state(state);
302
303 let table_ref = TableReference::bare("static_table");
306
307 let sql = format!(
309 "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
310 table_ref,
311 table_metadata_location()
312 );
313 let _df = ctx.sql(&sql).await.expect("create table failed");
314
315 let table_provider = ctx
317 .table_provider(table_ref)
318 .await
319 .expect("table not found");
320
321 let expected_schema = table_metadata_v2_schema();
323 let actual_schema = table_provider.schema();
324
325 assert_eq!(actual_schema.as_ref(), &expected_schema);
326 }
327}