Skip to content

Commit

Permalink
Supports federation 2.3 in the router
Browse files Browse the repository at this point in the history
The query planner in Federation 2.3 adds a new concept of "data
rewrites" for fetches in order to support both `@interfaceObject` and to
support the fix for apollographql/federation#1257.

Those "rewrites" describe simple updates that need to be performed
either on the inputs (the "representations" passed to `_entities`; need
to rewrite the `__typename` when sending queries to an `@interfaceObject`)
or the output of a fetch (needed when a field has been aliased to permit
the subgraph query to be valid, but that field needs to be "un-aliased"
to its original name after the fetch).

This commit implements those rewrites.
  • Loading branch information
pcmanus committed Jan 27, 2023
1 parent 7e7cf4f commit 36651cc
Show file tree
Hide file tree
Showing 9 changed files with 750 additions and 10 deletions.
75 changes: 75 additions & 0 deletions apollo-router/src/json_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,21 @@ pub(crate) trait ValueExt {
where
F: FnMut(&Path, &'a Value);

/// Select all values matching a `Path`, and allows to mutate those values.
///
/// The behavior of the method is otherwise the same as it's non-mutable counterpart
#[track_caller]
fn select_values_and_paths_mut<'a, F>(&'a mut self, schema: &Schema, path: &'a Path, f: F)
where
F: FnMut(&Path, &'a mut Value);

#[track_caller]
fn is_valid_float_input(&self) -> bool;

#[track_caller]
fn is_valid_int_input(&self) -> bool;


/// Returns whether this value is an object that matches the provided type.
///
/// More precisely, this checks that this value is an object, looks at
Expand Down Expand Up @@ -376,6 +385,14 @@ impl ValueExt for Value {
iterate_path(schema, &mut Path::default(), &path.0, self, &mut f)
}

#[track_caller]
fn select_values_and_paths_mut<'a, F>(&'a mut self, schema: &Schema, path: &'a Path, mut f: F)
where
F: FnMut(&Path, &'a mut Value),
{
iterate_path_mut(schema, &mut Path::default(), &path.0, self, &mut f)
}

#[track_caller]
fn is_valid_float_input(&self) -> bool {
// https://spec.graphql.org/draft/#sec-Float.Input-Coercion
Expand Down Expand Up @@ -475,6 +492,64 @@ fn iterate_path<'a, F>(
}
}

fn iterate_path_mut<'a, F>(
schema: &Schema,
parent: &mut Path,
path: &'a [PathElement],
data: &'a mut Value,
f: &mut F,
) where
F: FnMut(&Path, &'a mut Value),
{
match path.get(0) {
None => f(parent, data),
Some(PathElement::Flatten) => {
if let Some(array) = data.as_array_mut() {
for (i, value) in array.iter_mut().enumerate() {
parent.push(PathElement::Index(i));
iterate_path_mut(schema, parent, &path[1..], value, f);
parent.pop();
}
}
}
Some(PathElement::Index(i)) => {
if let Value::Array(a) = data {
if let Some(value) = a.get_mut(*i) {
parent.push(PathElement::Index(*i));
iterate_path_mut(schema, parent, &path[1..], value, f);
parent.pop();
}
}
}
Some(PathElement::Key(k)) => {
if let Value::Object(o) = data {
if let Some(value) = o.get_mut(k.as_str()) {
parent.push(PathElement::Key(k.to_string()));
iterate_path_mut(schema, parent, &path[1..], value, f);
parent.pop();
}
} else if let Value::Array(array) = data {
for (i, value) in array.iter_mut().enumerate() {
parent.push(PathElement::Index(i));
iterate_path_mut(schema, parent, path, value, f);
parent.pop();
}
}
}
Some(PathElement::Fragment(name)) => {
if data.is_object_of_type(schema, name) {
iterate_path_mut(schema, parent, &path[1..], data, f);
} else if let Value::Array(array) = data {
for (i, value) in array.iter_mut().enumerate() {
parent.push(PathElement::Index(i));
iterate_path_mut(schema, parent, path, value, f);
parent.pop();
}
}
}
}
}

/// A GraphQL path element that is composes of strings or numbers.
/// e.g `/book/3/name`
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
Expand Down
31 changes: 23 additions & 8 deletions apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::instrument;
use tracing::Instrument;

use super::execution::ExecutionParameters;
use super::rewrites;
use super::selection::select_object;
use super::selection::Selection;
use crate::error::Error;
Expand Down Expand Up @@ -83,6 +84,12 @@ pub(crate) struct FetchNode {

/// Optional id used by Deferred nodes
pub(crate) id: Option<String>,

// Optionally describes a number of "rewrites" that query plan executors should apply to the data that is sent as input of this fetch.
pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,

// Optionally describes a number of "rewrites" to apply to the data that received from a fetch (and before it is applied to the current in-memory results).
pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
}

struct Variables {
Expand All @@ -92,6 +99,7 @@ struct Variables {

impl Variables {
#[instrument(skip_all, level = "debug", name = "make_variables")]
#[allow(clippy::too_many_arguments)]
async fn new(
requires: &[Selection],
variable_usages: &[String],
Expand All @@ -100,6 +108,7 @@ impl Variables {
request: &Arc<http::Request<Request>>,
schema: &Schema,
enable_deduplicate_variables: bool,
input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
) -> Option<Variables> {
let body = request.body();
if !requires.is_empty() {
Expand All @@ -116,7 +125,8 @@ impl Variables {
let mut values: IndexSet<Value> = IndexSet::new();
data.select_values_and_paths(schema, current_dir, |path, value| {
if let Value::Object(content) = value {
if let Ok(Some(value)) = select_object(content, requires, schema) {
if let Ok(Some(mut value)) = select_object(content, requires, schema) {
rewrites::apply_rewrites(schema, &mut value, input_rewrites);
match values.get_index_of(&value) {
Some(index) => {
paths.insert(path.clone(), index);
Expand All @@ -139,7 +149,8 @@ impl Variables {
let mut values: Vec<Value> = Vec::new();
data.select_values_and_paths(schema, current_dir, |path, value| {
if let Value::Object(content) = value {
if let Ok(Some(value)) = select_object(content, requires, schema) {
if let Ok(Some(mut value)) = select_object(content, requires, schema) {
rewrites::apply_rewrites(schema, &mut value, input_rewrites);
paths.insert(path.clone(), values.len());
values.push(value);
}
Expand Down Expand Up @@ -210,6 +221,7 @@ impl FetchNode {
parameters.supergraph_request,
parameters.schema,
parameters.options.enable_deduplicate_variables,
&self.input_rewrites,
)
.await
{
Expand Down Expand Up @@ -280,7 +292,8 @@ impl FetchNode {
});
}

let (value, errors) = self.response_at_path(current_dir, paths, response);
let (value, errors) =
self.response_at_path(parameters.schema, current_dir, paths, response);
if let Some(id) = &self.id {
if let Some(sender) = parameters.deferred_fetches.get(id.as_str()) {
if let Err(e) = sender.clone().send((value.clone(), errors.clone())) {
Expand All @@ -294,6 +307,7 @@ impl FetchNode {
#[instrument(skip_all, level = "debug", name = "response_insert")]
fn response_at_path<'a>(
&'a self,
schema: &Schema,
current_dir: &'a Path,
paths: HashMap<Path, usize>,
response: graphql::Response,
Expand Down Expand Up @@ -356,7 +370,9 @@ impl FetchNode {

for (path, entity_idx) in paths {
if let Some(entity) = array.get(entity_idx) {
let _ = value.insert(&path, entity.clone());
let mut data = entity.clone();
rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
let _ = value.insert(&path, data);
}
}
return (value, errors);
Expand Down Expand Up @@ -399,10 +415,9 @@ impl FetchNode {
}
})
.collect();
(
Value::from_path(current_dir, response.data.unwrap_or_default()),
errors,
)
let mut data = response.data.unwrap_or_default();
rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
(Value::from_path(current_dir, data), errors)
}
}

Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod caching_query_planner;
mod execution;
pub(crate) mod fetch;
mod plan;
pub(crate) mod rewrites;
mod selection;
pub use plan::*;

Expand Down
94 changes: 94 additions & 0 deletions apollo-router/src/query_planner/rewrites.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Declares data structure for the "data rewrites" that the query planner can include in some `FetchNode`,
//! and implements those rewrites.
//!
//! Note that on the typescript side, the query planner currently declare the rewrites that applies
//! to "inputs" and those applying to "outputs" separatly. This is due to simplify the current
//! implementation on the typescript side (for ... reasons), but it does not simplify anything to make that
//! distinction here. All this means is that, as of this writing, some kind of rewrites will only
//! every appear on the input side, while other will only appear on outputs, but it does not hurt
//! to be future-proof by supporting all types of rewrites on both "sides".
use serde::Deserialize;
use serde::Serialize;

use crate::json_ext::Path;
use crate::json_ext::PathElement;
use crate::json_ext::Value;
use crate::json_ext::ValueExt;
use crate::spec::Schema;

/// Given a path, separates the last element of path and the rest of it and return them as a pair.
/// This will return `None` if the path is empty.
fn split_path_last_element(path: &Path) -> Option<(Path, &PathElement)> {
// If we have a `last()`, then we have a `parent()` too, so unwrapping shoud be safe.
path.last().map(|last| (path.parent().unwrap(), last))
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase", tag = "kind")]
pub(crate) enum DataRewrite {
ValueSetter(DataValueSetter),
KeyRenamer(DataKeyRenamer),
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct DataValueSetter {
pub(crate) path: Path,
pub(crate) set_value_to: Value,
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct DataKeyRenamer {
pub(crate) path: Path,
pub(crate) rename_key_to: String,
}

impl DataRewrite {
fn maybe_apply(&self, schema: &Schema, data: &mut Value) {
match self {
DataRewrite::ValueSetter(setter) => {
// The `path` of rewrites can only be either `Key` or `Fragment`, and so far
// we only ever rewrite the value of fields, so the last element will be a
// `Key` and we ignore other cases (in theory, it could be `Fragment` needs
// to be supported someday if we ever need to rewrite full object values,
// but that can be added then).
if let Some((parent, PathElement::Key(k))) = split_path_last_element(&setter.path) {
data.select_values_and_paths_mut(schema, &parent, |_path, obj| {
if let Some(value) = obj.get_mut(k) {
*value = setter.set_value_to.clone()
}
});
}
}
DataRewrite::KeyRenamer(renamer) => {
// As the name implies, this only applies to renaming "keys", so we're
// guaranteed the last element is one and can ignore other cases.
if let Some((parent, PathElement::Key(k))) = split_path_last_element(&renamer.path)
{
data.select_values_and_paths_mut(schema, &parent, |_path, selected| {
if let Some(obj) = selected.as_object_mut() {
if let Some(value) = obj.remove(k.as_str()) {
obj.insert(renamer.rename_key_to.clone(), value);
}
}
});
}
}
}
}
}

/// Modifies he provided `value` by applying any of the rewrites provided that match.
pub(crate) fn apply_rewrites(
schema: &Schema,
value: &mut Value,
maybe_rewrites: &Option<Vec<DataRewrite>>,
) {
if let Some(rewrites) = maybe_rewrites {
for rewrite in rewrites {
rewrite.maybe_apply(schema, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ Fetch(
operation_name: None,
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
source: apollo-router/src/query_planner/mod.rs
source: apollo-router/src/query_planner/tests.rs
expression: query_plan
---
Sequence {
Expand All @@ -15,6 +15,8 @@ Sequence {
),
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
),
Parallel {
Expand Down Expand Up @@ -66,6 +68,8 @@ Sequence {
operation_name: None,
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
),
},
Expand Down Expand Up @@ -127,6 +131,8 @@ Sequence {
operation_name: None,
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
),
},
Expand Down Expand Up @@ -177,6 +183,8 @@ Sequence {
operation_name: None,
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
),
},
Expand Down Expand Up @@ -237,6 +245,8 @@ Sequence {
operation_name: None,
operation_kind: Query,
id: None,
input_rewrites: None,
output_rewrites: None,
},
),
},
Expand Down
4 changes: 4 additions & 0 deletions apollo-router/src/query_planner/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ async fn defer() {
operation_name: Some("t".to_string()),
operation_kind: OperationKind::Query,
id: Some("fetch1".to_string()),
input_rewrites: None,
output_rewrites: None,
}))),
},
deferred: vec![DeferredNode {
Expand Down Expand Up @@ -289,6 +291,8 @@ async fn defer() {
operation_name: None,
operation_kind: OperationKind::Query,
id: Some("fetch2".to_string()),
input_rewrites: None,
output_rewrites: None,
})),
}))),
}],
Expand Down
Loading

0 comments on commit 36651cc

Please sign in to comment.