Skip to content

Commit

Permalink
feat(file-based): sync file acl permissions and identities (#260)
Browse files Browse the repository at this point in the history
We will leverage DefaultFileBased stream and stream reader to reuse most of the logic for scrapping files and let connectors implement the logic from the domain they handle. 

In the UI we will add a new Transfer Mode to Replicate Permissions ACL

- Enhanced file transfer options now support permissions replication, enabling delivery of access permissions along with identity data.
- Introduced an additional delivery method option to mirror source file permission restrictions and identity stream inclusion.
  • Loading branch information
aldogonzalez8 authored Feb 14, 2025
1 parent d44aea8 commit 10c1085
Show file tree
Hide file tree
Showing 14 changed files with 775 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from airbyte_cdk import OneOfOptionConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
from airbyte_cdk.sources.utils import schema_helpers


Expand Down Expand Up @@ -65,7 +66,7 @@ class AbstractFileBasedSpec(BaseModel):
order=10,
)

delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field(
delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
title="Delivery Method",
discriminator="delivery_type",
type="object",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import (
AbstractFileBasedSpec,
DeliverRawFiles,
)
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions

DELIVERY_TYPE_KEY = "delivery_type"
DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE = "use_permissions_transfer"
DELIVERY_TYPE_FILES_TRANSFER_MODE_VALUE = "use_file_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KEY = "preserve_directory_structure"
INCLUDE_IDENTITIES_STREAM_KEY = "include_identities_stream"


def use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
"""Returns `True` if the configuration uses file transfer mode."""
return (
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type == DELIVERY_TYPE_FILES_TRANSFER_MODE_VALUE
)


def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to preserve directory structure during file transfer.
When enabled, files maintain their subdirectory paths in the destination.
When disabled, files are flattened to the root of the destination.
Args:
parsed_config: The parsed configuration containing delivery method settings
Returns:
True if directory structure should be preserved (default), False otherwise
"""
if (
use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, PRESERVE_DIRECTORY_STRUCTURE_KEY)
and isinstance(parsed_config.delivery_method, DeliverRawFiles)
):
return parsed_config.delivery_method.preserve_directory_structure
return True


def use_permissions_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to use permissions transfer to sync ACLs and Identities
Args:
parsed_config: The parsed configuration containing delivery method settings
Returns:
True if permissions transfer should be enabled, False otherwise
"""
return (
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type
== DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE
)


def include_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool:
"""
There are scenarios where user may not have access to identities but still is valuable to get ACLs
Args:
parsed_config: The parsed configuration containing delivery method settings
Returns:
True if we should include Identities stream.
"""
if (
use_permissions_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, INCLUDE_IDENTITIES_STREAM_KEY)
and isinstance(parsed_config.delivery_method, DeliverPermissions)
):
return parsed_config.delivery_method.include_identities_stream
return False
107 changes: 70 additions & 37 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
FileBasedStreamConfig,
ValidationPolicy,
)
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
include_identities_stream,
preserve_directory_structure,
use_file_transfer,
use_permissions_transfer,
)
from airbyte_cdk.sources.file_based.discovery_policy import (
AbstractDiscoveryPolicy,
DefaultDiscoveryPolicy,
Expand All @@ -49,7 +55,12 @@
DEFAULT_SCHEMA_VALIDATION_POLICIES,
AbstractSchemaValidationPolicy,
)
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
from airbyte_cdk.sources.file_based.stream import (
AbstractFileBasedStream,
DefaultFileBasedStream,
FileIdentitiesStream,
PermissionsFileBasedStream,
)
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
AbstractConcurrentFileBasedCursor,
Expand All @@ -66,6 +77,7 @@
DEFAULT_CONCURRENCY = 100
MAX_CONCURRENCY = 100
INITIAL_N_PARTITIONS = MAX_CONCURRENCY // 2
IDENTITIES_STREAM = "identities"


class FileBasedSource(ConcurrentSourceAdapter, ABC):
Expand Down Expand Up @@ -157,13 +169,20 @@ def check_connection(
errors = []
tracebacks = []
for stream in streams:
if isinstance(stream, FileIdentitiesStream):
identity = next(iter(stream.load_identity_groups()))
if not identity:
errors.append(
"Unable to get identities for current configuration, please check your credentials"
)
continue
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
try:
parsed_config = self._get_parsed_config(config)
availability_method = (
stream.availability_strategy.check_availability
if self._use_file_transfer(parsed_config)
if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
else stream.availability_strategy.check_availability_and_parsability
)
(
Expand Down Expand Up @@ -239,7 +258,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
message_repository=self.message_repository,
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand Down Expand Up @@ -270,7 +289,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand All @@ -282,13 +301,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(
stream = self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
)

streams.append(stream)

if include_identities_stream(parsed_config):
identities_stream = self._make_identities_stream()
streams.append(identities_stream)
return streams

except ValidationError as exc:
Expand All @@ -310,8 +333,48 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
preserve_directory_structure=self._preserve_directory_structure(parsed_config),
use_file_transfer=use_file_transfer(parsed_config),
preserve_directory_structure=preserve_directory_structure(parsed_config),
)

def _make_permissions_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
) -> AbstractFileBasedStream:
return PermissionsFileBasedStream(
config=stream_config,
catalog_schema=self.stream_schemas.get(stream_config.name),
stream_reader=self.stream_reader,
availability_strategy=self.availability_strategy,
discovery_policy=self.discovery_policy,
parsers=self.parsers,
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
)

def _make_file_based_stream(
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
"""
Creates different streams depending on the type of the transfer mode selected
"""
if use_permissions_transfer(parsed_config):
return self._make_permissions_stream(stream_config, cursor)
# we should have a stream for File transfer mode to decouple from DefaultFileBasedStream
else:
return self._make_default_stream(stream_config, cursor, parsed_config)

def _make_identities_stream(
self,
) -> Stream:
return FileIdentitiesStream(
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
stream_reader=self.stream_reader,
discovery_policy=self.discovery_policy,
errors_collector=self.errors_collector,
)

def _get_stream_from_catalog(
Expand Down Expand Up @@ -378,33 +441,3 @@ def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
"`input_schema` and `schemaless` options cannot both be set",
model=FileBasedStreamConfig,
)

@staticmethod
def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
use_file_transfer = (
hasattr(parsed_config.delivery_method, "delivery_type")
and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer

@staticmethod
def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to preserve directory structure during file transfer.
When enabled, files maintain their subdirectory paths in the destination.
When disabled, files are flattened to the root of the destination.
Args:
parsed_config: The parsed configuration containing delivery method settings
Returns:
True if directory structure should be preserved (default), False otherwise
"""
if (
FileBasedSource._use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "preserve_directory_structure")
and parsed_config.delivery_method.preserve_directory_structure is not None
):
return parsed_config.delivery_method.preserve_directory_structure
return True
Loading

0 comments on commit 10c1085

Please sign in to comment.