Skip to content

Commit

Permalink
all: GraphQL API Versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilkisiela committed Aug 23, 2022
1 parent 784fbfa commit 1b0d091
Show file tree
Hide file tree
Showing 27 changed files with 293 additions and 75 deletions.
2 changes: 1 addition & 1 deletion core/tests/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn insert_and_query(
insert_entities(&deployment, entities).await?;

let document = graphql_parser::parse_query(query).unwrap().into_static();
let target = QueryTarget::Deployment(subgraph_id);
let target = QueryTarget::Deployment(subgraph_id, Default::default());
let query = Query::new(document, None);
Ok(execute_subgraph_query(query, target)
.await
Expand Down
3 changes: 3 additions & 0 deletions graph/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub mod trigger_processor;
/// Components dealing with collecting metrics
pub mod metrics;

/// Components dealing with versioning
pub mod versions;

/// A component that receives events of type `T`.
pub trait EventConsumer<E> {
/// Get the event sink.
Expand Down
7 changes: 6 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::*;
use crate::blockchain::block_stream::FirehoseCursor;
use crate::components::server::index_node::VersionInfo;
use crate::components::transaction_receipt;
use crate::components::versions::ApiVersion;
use crate::data::query::Trace;
use crate::data::subgraph::status;
use crate::data::value::Word;
Expand Down Expand Up @@ -112,7 +113,11 @@ pub trait SubgraphStore: Send + Sync + 'static {

/// Return the GraphQL schema that was derived from the user's schema by
/// adding a root query type etc. to it
fn api_schema(&self, subgraph_id: &DeploymentHash) -> Result<Arc<ApiSchema>, StoreError>;
fn api_schema(
&self,
subgraph_id: &DeploymentHash,
api_version: &ApiVersion,
) -> Result<Arc<ApiSchema>, StoreError>;

/// Return a `SubgraphFork`, derived from the user's `debug-fork` deployment argument,
/// that is used for debugging purposes only.
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/versions/features.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[derive(Clone, PartialEq, Eq, Debug, Ord, PartialOrd, Hash)]
pub enum FeatureFlag {}
5 changes: 5 additions & 0 deletions graph/src/components/versions/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod features;
mod registry;

pub use features::FeatureFlag;
pub use registry::{ApiVersion, VERSIONS};
71 changes: 71 additions & 0 deletions graph/src/components/versions/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::prelude::FeatureFlag;
use itertools::Itertools;
use lazy_static::lazy_static;
use semver::{Version, VersionReq};
use std::collections::HashMap;

lazy_static! {
static ref VERSION_COLLECTION: HashMap<Version, Vec<FeatureFlag>> = {
vec![
// baseline version
(Version::new(1, 0, 0), vec![]),
].into_iter().collect()
};

// Sorted vector of versions. From higher to lower.
pub static ref VERSIONS: Vec<&'static Version> = {
let mut versions = VERSION_COLLECTION.keys().collect_vec().clone();
versions.sort_by(|a, b| b.partial_cmp(a).unwrap());
versions
};
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ApiVersion {
pub version: Version,
features: Vec<FeatureFlag>,
}

impl ApiVersion {
pub fn new(version_requirement: &VersionReq) -> Result<Self, String> {
let version = Self::resolve(&version_requirement)?;

Ok(Self {
version: version.clone(),
features: VERSION_COLLECTION
.get(&version)
.expect(format!("Version {:?} is not supported", version).as_str())
.to_vec(),
})
}

pub fn from_version(version: &Version) -> Result<ApiVersion, String> {
ApiVersion::new(
&VersionReq::parse(version.to_string().as_str())
.map_err(|error| format!("Invalid version requirement: {}", error))?,
)
}

pub fn supports(&self, feature: FeatureFlag) -> bool {
self.features.contains(&feature)
}

fn resolve(version_requirement: &VersionReq) -> Result<&Version, String> {
for version in VERSIONS.iter() {
if version_requirement.matches(version) {
return Ok(version.clone());
}
}

Err("Could not resolve the version".to_string())
}
}

impl Default for ApiVersion {
fn default() -> Self {
// Default to the latest version.
// The `VersionReq::default()` returns `*` which means "any version".
// The first matching version is the latest version.
ApiVersion::new(&VersionReq::default()).unwrap()
}
}
20 changes: 8 additions & 12 deletions graph/src/data/query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

use crate::{
data::graphql::shape_hash::shape_hash,
prelude::{q, r, DeploymentHash, SubgraphName, ENV_VARS},
prelude::{q, r, ApiVersion, DeploymentHash, SubgraphName, ENV_VARS},
};

fn deserialize_number<'de, D>(deserializer: D) -> Result<q::Number, D::Error>
Expand Down Expand Up @@ -112,19 +112,15 @@ impl serde::ser::Serialize for QueryVariables {

#[derive(Clone, Debug)]
pub enum QueryTarget {
Name(SubgraphName),
Deployment(DeploymentHash),
Name(SubgraphName, ApiVersion),
Deployment(DeploymentHash, ApiVersion),
}

impl From<DeploymentHash> for QueryTarget {
fn from(id: DeploymentHash) -> Self {
Self::Deployment(id)
}
}

impl From<SubgraphName> for QueryTarget {
fn from(name: SubgraphName) -> Self {
QueryTarget::Name(name)
impl QueryTarget {
pub fn get_version(&self) -> &ApiVersion {
match self {
Self::Deployment(_, version) | Self::Name(_, version) => version,
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub mod prelude {
SubgraphVersionSwitchingMode,
};
pub use crate::components::trigger_processor::TriggerProcessor;
pub use crate::components::versions::{ApiVersion, FeatureFlag};
pub use crate::components::{transaction_receipt, EventConsumer, EventProducer};
pub use crate::env::ENV_VARS;

Expand All @@ -141,7 +142,7 @@ pub mod prelude {
shape_hash::shape_hash, SerializableValue, TryFromValue, ValueMap,
};
pub use crate::data::query::{
Query, QueryError, QueryExecutionError, QueryResult, QueryVariables,
Query, QueryError, QueryExecutionError, QueryResult, QueryTarget, QueryVariables,
};
pub use crate::data::schema::{ApiSchema, Schema};
pub use crate::data::store::ethereum::*;
Expand Down
5 changes: 3 additions & 2 deletions graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ where
// while the query is running. `self.store` can not be used after this
// point, and everything needs to go through the `store` we are
// setting up here
let store = self.store.query_store(target, false).await?;

let store = self.store.query_store(target.clone(), false).await?;
let state = store.deployment_state().await?;
let network = Some(store.network_name().to_string());
let schema = store.api_schema()?;
Expand Down Expand Up @@ -227,7 +228,7 @@ where
subscription: Subscription,
target: QueryTarget,
) -> Result<SubscriptionResult, SubscriptionError> {
let store = self.store.query_store(target, true).await?;
let store = self.store.query_store(target.clone(), true).await?;
let schema = store.api_schema()?;
let network = store.network_name().to_string();

Expand Down
16 changes: 11 additions & 5 deletions graphql/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn execute_query_document_with_variables(
LOAD_MANAGER.clone(),
METRICS_REGISTRY.clone(),
));
let target = QueryTarget::Deployment(id.clone());
let target = QueryTarget::Deployment(id.clone(), Default::default());
let query = Query::new(query, variables);

runner
Expand Down Expand Up @@ -364,7 +364,7 @@ where
LOAD_MANAGER.clone(),
METRICS_REGISTRY.clone(),
));
let target = QueryTarget::Deployment(id.clone());
let target = QueryTarget::Deployment(id.clone(), Default::default());
let query = Query::new(query, variables);

runner
Expand All @@ -388,7 +388,10 @@ async fn run_subscription(
let deployment = setup_readonly(store.as_ref()).await;
let logger = Logger::root(slog::Discard, o!());
let query_store = store
.query_store(deployment.hash.clone().into(), true)
.query_store(
QueryTarget::Deployment(deployment.hash.clone(), Default::default()),
true,
)
.await
.unwrap();

Expand All @@ -407,7 +410,10 @@ async fn run_subscription(
max_skip: std::u32::MAX,
graphql_metrics: graphql_metrics(),
};
let schema = STORE.subgraph_store().api_schema(&deployment.hash).unwrap();
let schema = STORE
.subgraph_store()
.api_schema(&deployment.hash, &Default::default())
.unwrap();

execute_subscription(Subscription { query }, schema.clone(), options)
}
Expand Down Expand Up @@ -931,7 +937,7 @@ fn instant_timeout() {
match first_result(
execute_subgraph_query_with_deadline(
query,
deployment.hash.into(),
QueryTarget::Deployment(deployment.hash.into(), Default::default()),
Some(Instant::now()),
)
.await,
Expand Down
2 changes: 1 addition & 1 deletion node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Config {
})
}

/// Genrate a JSON representation of the config.
/// Generate a JSON representation of the config.
pub fn to_json(&self) -> Result<String> {
// It would be nice to produce a TOML representation, but that runs
// into this error: https://github.com/alexcrichton/toml-rs/issues/142
Expand Down
8 changes: 7 additions & 1 deletion node/src/manager/commands/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::SystemTime};

use graph::{
components::store::BlockStore as _,
data::query::QueryTarget,
prelude::{
anyhow::{anyhow, bail, Error},
chrono::{DateTime, Duration, SecondsFormat, Utc},
Expand Down Expand Up @@ -90,7 +91,12 @@ pub async fn create(
let block_offset = block_offset as i32;
let subgraph_store = store.subgraph_store();
let src = src.locate_unique(&primary)?;
let query_store = store.query_store(src.hash.clone().into(), true).await?;
let query_store = store
.query_store(
QueryTarget::Deployment(src.hash.clone(), Default::default()),
true,
)
.await?;
let network = query_store.network_name();

let src_ptr = query_store.block_ptr().await?.ok_or_else(|| anyhow!("subgraph {} has not indexed any blocks yet and can not be used as the source of a copy", src))?;
Expand Down
4 changes: 2 additions & 2 deletions node/src/manager/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ pub async fn run(
let target = if target.starts_with("Qm") {
let id =
DeploymentHash::new(target).map_err(|id| anyhow!("illegal deployment id `{}`", id))?;
QueryTarget::Deployment(id)
QueryTarget::Deployment(id, Default::default())
} else {
let name = SubgraphName::new(target.clone())
.map_err(|()| anyhow!("illegal subgraph name `{}`", target))?;
QueryTarget::Name(name)
QueryTarget::Name(name, Default::default())
};

let document = graphql_parser::parse_query(&query)?.into_static();
Expand Down
6 changes: 4 additions & 2 deletions server/http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ mod tests {
use super::parse_graphql_request;

lazy_static! {
static ref TARGET: QueryTarget =
QueryTarget::Name(SubgraphName::new("test/request").unwrap());
static ref TARGET: QueryTarget = QueryTarget::Name(
SubgraphName::new("test/request").unwrap(),
Default::default()
);
}

#[test]
Expand Down
48 changes: 43 additions & 5 deletions server/http/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::task::Poll;
use std::time::Instant;

use graph::prelude::*;
use graph::semver::VersionReq;
use graph::{components::server::query::GraphQLServerError, data::query::QueryTarget};
use http::header;
use http::header::{
Expand Down Expand Up @@ -89,17 +90,49 @@ where
self.serve_dynamic_file(self.graphiql_html())
}

fn resolve_api_version(
&self,
request: &Request<Body>,
) -> Result<ApiVersion, GraphQLServerError> {
let mut version = ApiVersion::default();

if let Some(query) = request.uri().query() {
let potential_version_requirement = query.split("&").find_map(|pair| {
if pair.starts_with("api-version=") {
if let Some(version_requirement) = pair.split("=").nth(1) {
return Some(version_requirement);
}
}
return None;
});

if let Some(version_requirement) = potential_version_requirement {
version = ApiVersion::new(
&VersionReq::parse(version_requirement)
.map_err(|error| GraphQLServerError::ClientError(error.to_string()))?,
)
.map_err(|error| GraphQLServerError::ClientError(error))?;
}
}

Ok(version)
}

async fn handle_graphql_query_by_name(
self,
subgraph_name: String,
request: Request<Body>,
) -> GraphQLServiceResult {
let version = self.resolve_api_version(&request)?;
let subgraph_name = SubgraphName::new(subgraph_name.as_str()).map_err(|()| {
GraphQLServerError::ClientError(format!("Invalid subgraph name {:?}", subgraph_name))
})?;

self.handle_graphql_query(subgraph_name.into(), request.into_body())
.await
self.handle_graphql_query(
QueryTarget::Name(subgraph_name, version),
request.into_body(),
)
.await
}

fn handle_graphql_query_by_id(
Expand All @@ -108,11 +141,16 @@ where
request: Request<Body>,
) -> GraphQLServiceResponse {
let res = DeploymentHash::new(id)
.map_err(|id| GraphQLServerError::ClientError(format!("Invalid subgraph id `{}`", id)));
.map_err(|id| GraphQLServerError::ClientError(format!("Invalid subgraph id `{}`", id)))
.and_then(|id| match self.resolve_api_version(&request) {
Ok(version) => Ok((id, version)),
Err(error) => Err(error),
});

match res {
Err(_) => self.handle_not_found(),
Ok(id) => self
.handle_graphql_query(id.into(), request.into_body())
Ok((id, version)) => self
.handle_graphql_query(QueryTarget::Deployment(id, version), request.into_body())
.boxed(),
}
}
Expand Down
Loading

0 comments on commit 1b0d091

Please sign in to comment.