Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import re
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from enum import Enum
from typing import (
Expand Down Expand Up @@ -607,42 +607,42 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
"""

@abstractmethod
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List views under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of view identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
Expand Down
11 changes: 6 additions & 5 deletions pyiceberg/catalog/bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import json
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

from google.api_core.exceptions import NotFound
Expand Down Expand Up @@ -252,7 +253,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
database_name = self.identifier_to_database(namespace)
iceberg_tables: list[Identifier] = []
try:
Expand All @@ -264,18 +265,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
iceberg_tables.append((database_name, bq_table_list_item.table_id))
except NotFound:
raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None
return iceberg_tables
return iter(iceberg_tables)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
# Since this catalog only supports one-level namespaces, it always returns an empty list unless
# passed an empty namespace to list all namespaces within the catalog.
if namespace:
raise NoSuchNamespaceError(f"Namespace (dataset) '{namespace}' not found.") from None

# List top-level datasets
datasets_iterator = self.client.list_datasets()
return [(dataset.dataset_id,) for dataset in datasets_iterator]
return iter([(dataset.dataset_id,) for dataset in datasets_iterator])

@override
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
Expand Down Expand Up @@ -314,7 +315,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
return self.load_table(identifier=identifier)

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import uuid
from collections.abc import Iterator
from time import time
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -396,8 +397,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
table_identifiers = self.list_tables(namespace=database_name)

if len(table_identifiers) > 0:
try:
next(table_identifiers)
raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
except StopIteration:
pass

try:
self._delete_dynamo_item(
Expand All @@ -409,14 +413,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)

Expand Down Expand Up @@ -451,20 +455,20 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:

table_identifiers.append(self.identifier_to_tuple(identifier_col))

return table_identifiers
return iter(table_identifiers)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List top-level namespaces from the catalog.

We do not support hierarchical namespace.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

paginator = self.dynamodb.get_paginator("query")

Expand Down Expand Up @@ -494,7 +498,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
database_identifiers.append(self.identifier_to_tuple(namespace_col))

return database_identifiers
return iter(database_identifiers)

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down Expand Up @@ -565,7 +569,7 @@ def create_view(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
17 changes: 9 additions & 8 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import logging
from collections.abc import Iterator
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -860,14 +861,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
self.glue.delete_database(Name=database_name)

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
Expand All @@ -889,18 +890,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:

except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]
return iter([(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)])

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

database_list: list[DatabaseTypeDef] = []
next_token: str | None = None
Expand All @@ -912,7 +913,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
if not next_token:
break

return [self.identifier_to_tuple(database["Name"]) for database in database_list]
return iter([self.identifier_to_tuple(database["Name"]) for database in database_list])

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down Expand Up @@ -982,7 +983,7 @@ def create_view(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
31 changes: 17 additions & 14 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import socket
import time
from collections.abc import Iterator
from types import TracebackType
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -479,7 +480,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
return self._convert_hive_into_iceberg(hive_table)

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down Expand Up @@ -760,7 +761,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

When the database doesn't exist, it will just return an empty list.
Expand All @@ -769,34 +770,36 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
namespace: Database to list.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
with self._client as open_client:
return [
(database_name, table.tableName)
for table in open_client.get_table_objects_by_name(
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
)
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]
return iter(
[
(database_name, table.tableName)
for table in open_client.get_table_objects_by_name(
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
)
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]
)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

with self._client as open_client:
return list(map(self.identifier_to_tuple, open_client.get_all_databases()))
return iter(list(map(self.identifier_to_tuple, open_client.get_all_databases())))

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down
7 changes: 4 additions & 3 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

from collections.abc import Iterator
from typing import (
TYPE_CHECKING,
)
Expand Down Expand Up @@ -124,11 +125,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NotImplementedError

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand All @@ -142,7 +143,7 @@ def update_namespace_properties(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
Loading