Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change load and save methods on MongoDBPersister to show partition_key argument as optional #511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions burr/integrations/persisters/b_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,21 @@ def list_app_ids(self, partition_key: str, **kwargs) -> list[str]:
return app_ids

def load(
self, partition_key: str, app_id: str, sequence_id: int = None, **kwargs
self, partition_key: Optional[str], app_id: str, sequence_id: int = None, **kwargs
) -> Optional[persistence.PersistedStateData]:
"""Load the state data for a given partition key, app id, and sequence id."""
"""Loads the state data for a given partition key, app_id, and sequence_id.

This method retrieves the most recent state data for the specified (partition_key, app_id) combination.
If a sequence ID is provided, it will attempt to fetch the specific state at that sequence.

:param partition_key: The partition key. Defaults to `None`. **Note:** The partition key defaults to `None`. If a partition key was used during saving, it must be provided
consistently during retrieval, or no results will be returned.
:param app_id: Application UID to read from.
:param sequence_id: (Optional) The sequence ID to retrieve a specific state. If not provided,
the latest state is returned.

:returns: The state data if found, otherwise None.
"""
query = {"partition_key": partition_key, "app_id": app_id}
if sequence_id is not None:
query["sequence_id"] = sequence_id
Expand All @@ -118,15 +130,27 @@ def load(

def save(
self,
partition_key: str,
partition_key: Optional[str],
app_id: str,
sequence_id: int,
position: str,
state: state.State,
status: Literal["completed", "failed"],
**kwargs,
):
"""Save the state data to the MongoDB database."""
"""Save the state data to the MongoDB database.

:param partition_key: the partition key. Note this could be None, but it's up to the persistor to whether
that is a valid value it can handle. If a partition key was used during saving, it must be provided
consistently during retrieval, or no results will be returned.
:param app_id: Application UID to write with.
:param sequence_id: Sequence ID of the last executed step.
:param position: The action name that was implemented.
:param state: The current state of the application.
:param status: The status of this state, either "completed" or "failed". If "failed", the state is what it was
before the action was applied.
:return:
"""
key = {"partition_key": partition_key, "app_id": app_id, "sequence_id": sequence_id}
if self.collection.find_one(key):
raise ValueError(f"partition_key:app_id:sequence_id[{key}] already exists.")
Expand Down
18 changes: 18 additions & 0 deletions tests/integrations/persisters/test_b_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,21 @@ def test_serialization_with_pickle(mongodb_persister):
data = deserialized_persister.load("pk", "app_id_serde", 1)

assert data["state"].get_all() == {"a": 1, "b": 2}


def test_partition_key_is_optional(mongodb_persister):
# 1. Save and load with partition key = None
mongodb_persister.save(
None, "app_id_none", 1, "pos1", state.State({"foo": "bar"}), "in_progress"
)
loaded_data = mongodb_persister.load(None, "app_id_none", 1)
assert loaded_data is not None
assert loaded_data["state"].get_all() == {"foo": "bar"}

# 2. Save and load again (different key/index) with partition key = None
mongodb_persister.save(
None, "app_id_none2", 2, "pos2", state.State({"hello": "world"}), "completed"
)
loaded_data2 = mongodb_persister.load(None, "app_id_none2", 2)
assert loaded_data2 is not None
assert loaded_data2["state"].get_all() == {"hello": "world"}
Loading