use std::collections::{hash_map, HashMap};
use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use itertools::Itertools;
#[derive(Debug, Clone, Default)]
pub(crate) struct NamespaceState {
properties: HashMap<String, String>,
namespaces: HashMap<String, NamespaceState>,
table_metadata_locations: HashMap<String, String>,
}
fn no_such_namespace_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> {
Err(Error::new(
ErrorKind::Unexpected,
format!("No such namespace: {:?}", namespace_ident),
))
}
fn no_such_table_err<T>(table_ident: &TableIdent) -> Result<T> {
Err(Error::new(
ErrorKind::Unexpected,
format!("No such table: {:?}", table_ident),
))
}
fn namespace_already_exists_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> {
Err(Error::new(
ErrorKind::Unexpected,
format!(
"Cannot create namespace {:?}. Namespace already exists.",
namespace_ident
),
))
}
fn table_already_exists_err<T>(table_ident: &TableIdent) -> Result<T> {
Err(Error::new(
ErrorKind::Unexpected,
format!(
"Cannot create table {:?}. Table already exists.",
table_ident
),
))
}
impl NamespaceState {
fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<&NamespaceState> {
let mut acc_name_parts = vec![];
let mut namespace_state = self;
for next_name in namespace_ident.iter() {
acc_name_parts.push(next_name);
match namespace_state.namespaces.get(next_name) {
None => {
let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?;
return no_such_namespace_err(&namespace_ident);
}
Some(intermediate_namespace) => {
namespace_state = intermediate_namespace;
}
}
}
Ok(namespace_state)
}
fn get_mut_namespace(
&mut self,
namespace_ident: &NamespaceIdent,
) -> Result<&mut NamespaceState> {
let mut acc_name_parts = vec![];
let mut namespace_state = self;
for next_name in namespace_ident.iter() {
acc_name_parts.push(next_name);
match namespace_state.namespaces.get_mut(next_name) {
None => {
let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?;
return no_such_namespace_err(&namespace_ident);
}
Some(intermediate_namespace) => {
namespace_state = intermediate_namespace;
}
}
}
Ok(namespace_state)
}
fn get_mut_parent_namespace_of(
&mut self,
namespace_ident: &NamespaceIdent,
) -> Result<(&mut NamespaceState, String)> {
match namespace_ident.split_last() {
None => Err(Error::new(
ErrorKind::DataInvalid,
"Namespace identifier can't be empty!",
)),
Some((child_namespace_name, parent_name_parts)) => {
let parent_namespace_state = if parent_name_parts.is_empty() {
Ok(self)
} else {
let parent_namespace_ident = NamespaceIdent::from_strs(parent_name_parts)?;
self.get_mut_namespace(&parent_namespace_ident)
}?;
Ok((parent_namespace_state, child_namespace_name.clone()))
}
}
}
pub(crate) fn list_top_level_namespaces(&self) -> Vec<&String> {
self.namespaces.keys().collect_vec()
}
pub(crate) fn list_namespaces_under(
&self,
namespace_ident: &NamespaceIdent,
) -> Result<Vec<&String>> {
let nested_namespace_names = self
.get_namespace(namespace_ident)?
.namespaces
.keys()
.collect_vec();
Ok(nested_namespace_names)
}
pub(crate) fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> bool {
self.get_namespace(namespace_ident).is_ok()
}
pub(crate) fn insert_new_namespace(
&mut self,
namespace_ident: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
let (parent_namespace_state, child_namespace_name) =
self.get_mut_parent_namespace_of(namespace_ident)?;
match parent_namespace_state
.namespaces
.entry(child_namespace_name)
{
hash_map::Entry::Occupied(_) => namespace_already_exists_err(namespace_ident),
hash_map::Entry::Vacant(entry) => {
let _ = entry.insert(NamespaceState {
properties,
namespaces: HashMap::new(),
table_metadata_locations: HashMap::new(),
});
Ok(())
}
}
}
pub(crate) fn remove_existing_namespace(
&mut self,
namespace_ident: &NamespaceIdent,
) -> Result<()> {
let (parent_namespace_state, child_namespace_name) =
self.get_mut_parent_namespace_of(namespace_ident)?;
match parent_namespace_state
.namespaces
.remove(&child_namespace_name)
{
None => no_such_namespace_err(namespace_ident),
Some(_) => Ok(()),
}
}
pub(crate) fn get_properties(
&self,
namespace_ident: &NamespaceIdent,
) -> Result<&HashMap<String, String>> {
let properties = &self.get_namespace(namespace_ident)?.properties;
Ok(properties)
}
fn get_mut_properties(
&mut self,
namespace_ident: &NamespaceIdent,
) -> Result<&mut HashMap<String, String>> {
let properties = &mut self.get_mut_namespace(namespace_ident)?.properties;
Ok(properties)
}
pub(crate) fn replace_properties(
&mut self,
namespace_ident: &NamespaceIdent,
new_properties: HashMap<String, String>,
) -> Result<()> {
let properties = self.get_mut_properties(namespace_ident)?;
*properties = new_properties;
Ok(())
}
pub(crate) fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<&String>> {
let table_names = self
.get_namespace(namespace_ident)?
.table_metadata_locations
.keys()
.collect_vec();
Ok(table_names)
}
pub(crate) fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
let namespace_state = self.get_namespace(table_ident.namespace())?;
let table_exists = namespace_state
.table_metadata_locations
.contains_key(&table_ident.name);
Ok(table_exists)
}
pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent) -> Result<&String> {
let namespace = self.get_namespace(table_ident.namespace())?;
match namespace.table_metadata_locations.get(table_ident.name()) {
None => no_such_table_err(table_ident),
Some(table_metadadata_location) => Ok(table_metadadata_location),
}
}
pub(crate) fn insert_new_table(
&mut self,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<()> {
let namespace = self.get_mut_namespace(table_ident.namespace())?;
match namespace
.table_metadata_locations
.entry(table_ident.name().to_string())
{
hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident),
hash_map::Entry::Vacant(entry) => {
let _ = entry.insert(metadata_location);
Ok(())
}
}
}
pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent) -> Result<String> {
let namespace = self.get_mut_namespace(table_ident.namespace())?;
match namespace
.table_metadata_locations
.remove(table_ident.name())
{
None => no_such_table_err(table_ident),
Some(metadata_location) => Ok(metadata_location),
}
}
}