Skip to content

Commit

Permalink
Source Github: Refactor incremental state handling (#39513)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jun 18, 2024
1 parent 262f13d commit 074bc06
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.7.6
dockerImageTag: 1.7.7
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
githubIssueLabel: source-github
Expand Down
484 changes: 388 additions & 96 deletions airbyte-integrations/connectors/source-github/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-github/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.7.6"
version = "1.7.7"
name = "source-github"
description = "Source implementation for GitHub."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_github"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"
sgqlc = "==16.3"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(self, api_url: str = "https://api.github.com", access_token_type: s

self.access_token_type = access_token_type
self.api_url = api_url
self.state = {}

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -255,7 +257,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
return record


class SemiIncrementalMixin:
class SemiIncrementalMixin(CheckpointMixin):
"""
Semi incremental streams are also incremental but with one difference, they:
- read all records;
Expand All @@ -279,6 +281,14 @@ def __init__(self, start_date: str = "", **kwargs):
self._start_date = start_date
self._starting_point_cache = {}

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

@property
def slice_keys(self):
if hasattr(self, "repositories"):
Expand All @@ -295,7 +305,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
if self.is_sorted == "asc":
return self.page_size

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state
object and returning an updated state object.
Expand Down Expand Up @@ -338,6 +348,7 @@ def read_records(
cursor_value = self.convert_cursor_value(record[self.cursor_field])
if not start_point or cursor_value > start_point:
yield record
self.state = self._get_updated_state(self.state, record)
elif self.is_sorted == "desc" and cursor_value < start_point:
break

Expand Down Expand Up @@ -719,7 +730,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,

return record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
branch = latest_record["branch"]
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1004,7 +1015,7 @@ def request_body_json(
# Reactions streams


class ReactionStream(GithubStream, ABC):
class ReactionStream(GithubStream, CheckpointMixin, ABC):

parent_key = "id"
copy_parent_key = "comment_id"
Expand All @@ -1023,6 +1034,14 @@ def parent_entity(self):
Specify the class of the parent stream for which receive reactions
"""

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
parent_path = self._parent_stream.path(stream_slice=stream_slice, **kwargs)
return f"{parent_path}/{stream_slice[self.copy_parent_key]}/reactions"
Expand All @@ -1032,7 +1051,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {self.copy_parent_key: parent_record[self.parent_key], "repository": stream_slice["repository"]}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
parent_id = str(latest_record[self.copy_parent_key])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1066,6 +1085,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record, stream_slice)
Expand Down Expand Up @@ -1321,6 +1341,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
if stream_state:
Expand All @@ -1333,7 +1354,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1391,6 +1412,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
if stream_state:
Expand All @@ -1404,7 +1426,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
column_id = str(latest_record["column_id"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
{
"credentials": {
"personal_access_token": "personal_access_token"
},
"credentials": { "personal_access_token": "personal_access_token" },
"repository": "airbytehq/airbyte airbytehq/airbyte-platform",
"start_date": "2000-01-01T00:00:00Z",
"branch": "airbytehq/airbyte/master airbytehq/airbyte-platform/main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str,
for slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
stream_state = stream_instance._get_updated_state(stream_state, record)
res.append(record)
return res

Expand Down
Loading

0 comments on commit 074bc06

Please sign in to comment.