Skip to content

Commit

Permalink
integrated implicit parents with syntax resources.parent_resource.par…
Browse files Browse the repository at this point in the history
…ent_field for path and json
  • Loading branch information
root committed Jan 17, 2025
1 parent fae5ae3 commit ecd0f62
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 24 deletions.
113 changes: 97 additions & 16 deletions dlt/sources/rest_api/config_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,20 @@ def build_resource_dependency_graph(
# find resolved parameters to connect dependent resources
resolved_params = _find_resolved_params(endpoint_resource["endpoint"])

# extract more resolved params from path expressions
# path_expressions = _extract_expressions(endpoint_resource["endpoint"]["path"], "resources.")
# resolved_params += _expressions_to_resolved_params(path_expressions)

# extract expressions from parameters that are strings
params_expressions = []
for param_value in endpoint_resource["endpoint"].get("params", {}).values():
# If param_value is a plain string (e.g. "{resources.berry.a_property}")
if isinstance(param_value, str):
extracted = _extract_expressions(param_value, "resources.")
params_expressions.extend(extracted)

resolved_params += _expressions_to_resolved_params(params_expressions)

# set of resources in resolved params
named_resources = {rp.resolve_config["resource"] for rp in resolved_params}

Expand Down Expand Up @@ -390,6 +404,15 @@ def _make_endpoint_resource(
return _merge_resource_endpoints(default_config, resource)


def _replace_expression(template: str, params: Dict[str, Any]):
"""This method is used to replace the expression in the templates
because the the str.format() doesn't like placeholders with dots.
"""
for p in params:
template = template.replace(f"{{{p}}}", str(params[p]))
return template


def _bind_path_params(resource: EndpointResource) -> None:
"""Binds params declared in path to params available in `params`. Pops the
bound params but. Params of type `resolve` and `incremental` are skipped
Expand All @@ -398,10 +421,16 @@ def _bind_path_params(resource: EndpointResource) -> None:
path_params: Dict[str, Any] = {}
assert isinstance(resource["endpoint"], dict) # type guard
resolve_params = [r.param_name for r in _find_resolved_params(resource["endpoint"])]

params = resource["endpoint"].get("params", {})
path = resource["endpoint"]["path"]
for name in _get_placeholders(path):
params = resource["endpoint"].get("params", {})
if name not in params and name not in path_params:

for name in _extract_expressions(path):
if (
name not in params
and name not in path_params
and name not in resolve_params
):
raise ValueError(
f"The path {path} defined in resource {resource['name']} requires param with"
f" name {name} but it is not found in {params}"
Expand All @@ -423,7 +452,8 @@ def _bind_path_params(resource: EndpointResource) -> None:
# resolved params are bound later
path_params[name] = "{" + name + "}"

resource["endpoint"]["path"] = path.format(**path_params)
# resource["endpoint"]["path"] = path.format(**path_params)
resource["endpoint"]["path"] = _replace_expression(path, path_params)


def _setup_single_entity_endpoint(endpoint: Endpoint) -> Endpoint:
Expand All @@ -449,12 +479,27 @@ def _find_resolved_params(endpoint_config: Endpoint) -> List[ResolvedParam]:
Resolved params are of type ResolveParamConfig (bound param with a key "type" set to "resolve".)
"""
return [

resolved_params = [
ResolvedParam(key, value) # type: ignore[arg-type]
for key, value in endpoint_config.get("params", {}).items()
if (isinstance(value, dict) and value.get("type") == "resolve")
]

path_expressions = _extract_expressions(endpoint_config["path"], "resources.")

json_expressions = (
_extract_expressions(endpoint_config["json"], "resources.")
if endpoint_config.get("json")
else []
)

resolved_params += _expressions_to_resolved_params(
path_expressions
) + _expressions_to_resolved_params(json_expressions)

return resolved_params


def _action_type_unless_custom_hook(
action_type: Optional[str], custom_hook: Optional[List[Callable[..., Any]]]
Expand Down Expand Up @@ -579,36 +624,72 @@ def remove_field(response: Response, *args, **kwargs) -> Response:
return None


def _get_placeholders(template: str) -> List[str]:
def _extract_expressions(
template_string: Union[str, Dict], prefix: str = ""
) -> List[str]:
"""Takes a template string and extracts expressions that start with a prefix.
Args:
template_string (str): A string with expressions to extract
prefix (str): A string that marks the beginning of an expression
Example:
>>> _extract_expressions("blog/{resources.blog.id}/comments", "resources.")
["resources.blog.id"]
"""

# to use a dict with Formatter.parse we need to add curly brackets
if isinstance(template_string, dict):
template_string = "{" + json.dumps(template_string) + "}"

return [
field_name
for _, field_name, _, _ in string.Formatter().parse(template)
if field_name
for _, field_name, _, _ in string.Formatter().parse(template_string)
if field_name and field_name.startswith(prefix)
]


def _expressions_to_resolved_params(expressions: List[str]) -> List[ResolvedParam]:
resolved_params = []
# We assume that the expressions are in the format 'resources.<resource>.<field>'
# and not more complex expressions
for expression in expressions:
parts = expression.strip().split(".")
if len(parts) != 3:
raise ValueError(
f"Invalid definition of {expression}. Expected format:"
" 'resources.<resource>.<field>'"
)
resolved_params.append(
ResolvedParam(
expression,
{
"type": "resolve",
"resource": parts[1],
"field": parts[2],
},
)
)
return resolved_params


def _bound_path_parameters(
path: str,
param_values: Dict[str, Any],
):
path_params = _get_placeholders(path)
bound_path = path.format(**param_values)
path_params = _extract_expressions(path)
bound_path = _replace_expression(path, param_values)

for param in path_params:
param_values.pop(param)

return bound_path, param_values


def _bound_json_parameters(
request_json: Optional[Dict[str, Any]],
request_json: Dict[str, Any],
param_values: Dict[str, Any],
):
json_as_template = "{" + json.dumps(request_json) + "}"
json_params = _get_placeholders(json_as_template)

bound_json = json_as_template.format(**param_values)
json_params = _extract_expressions(request_json)

bound_json = _replace_expression(json.dumps(request_json), param_values)
for param in json_params:
param_values.pop(param)

Expand Down
16 changes: 8 additions & 8 deletions tests/sources/rest_api/integration/test_offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def test_load_mock_api_with_json_resolved(mock_api_server):
"path": "posts/search_by_id",
"method": "POST",
"json": {
"post_id": "{post_id}",
},
"params": {
"post_id": {
"type": "resolve",
"resource": "posts",
"field": "id",
}
"post_id": "{resources.posts.id}",
},
# "params": {
# "posts__id": {
# "type": "resolve",
# "resource": "posts",
# "field": "id",
# }
# },
},
},
],
Expand Down
68 changes: 68 additions & 0 deletions tests/sources/rest_api/integration/test_processing_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,36 @@ def test_rest_api_source_filtered_child(mock_api_server) -> None:
assert len(data) == 2


def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> None:
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item]
],
},
{
"name": "comments",
"endpoint": {
"path": "/posts/{resources.posts.id}/comments",
},
"processing_steps": [
{"filter": lambda x: x["id"] == 1}, # type: ignore[typeddict-item]
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("comments"))
assert len(data) == 2


def test_rest_api_source_filtered_and_map_child(mock_api_server) -> None:
def extend_body(row):
row["body"] = f"{row['_posts_title']} - {row['body']}"
Expand Down Expand Up @@ -243,3 +273,41 @@ def extend_body(row):

data = list(mock_source.with_resources("comments"))
assert data[0]["body"] == "Post 2 - Comment 0 for post 2"


def test_rest_api_source_filtered_and_map_child_with_implicit_param(
mock_api_server,
) -> None:
def extend_body(row):
row["body"] = f"{row['_posts_title']} - {row['body']}"
return row

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item]
],
},
{
"name": "comments",
"endpoint": {
"path": "/posts/{resources.posts.id}/comments",
},
"include_from_parent": ["title"],
"processing_steps": [
{"map": extend_body}, # type: ignore[typeddict-item]
{"filter": lambda x: x["body"].startswith("Post 2")}, # type: ignore[typeddict-item]
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("comments"))
assert data[0]["body"] == "Post 2 - Comment 0 for post 2"

0 comments on commit ecd0f62

Please sign in to comment.