Skip to content

Commit

Permalink
Use HashMap in SubgraphInfo.api
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilkisiela committed Jun 20, 2022
1 parent 763228f commit eaa5c19
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 44 deletions.
2 changes: 1 addition & 1 deletion graph/src/components/versions/features.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[derive(Clone, PartialEq, Eq, Debug, Ord, PartialOrd)]
#[derive(Clone, PartialEq, Eq, Debug, Ord, PartialOrd, Hash)]
pub enum FeatureFlag {
// A description of the feature. Give it a little context.
BasicOrdering,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/versions/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy_static! {
};
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ApiVersion {
pub version: Version,
features: Vec<FeatureFlag>,
Expand Down
64 changes: 30 additions & 34 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use graph::components::store::{EntityType, StoredDynamicDataSource};
use graph::components::versions::VERSIONS;
use graph::data::subgraph::status;
use graph::prelude::{
tokio, ApiVersion, CancelHandle, CancelToken, CancelableError, EntityOperation, PoolWaitStats,
SubgraphDeploymentEntity,
};
use graph::semver::VersionReq;
use lru_time_cache::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use std::borrow::Cow;
Expand All @@ -18,6 +20,7 @@ use std::convert::Into;
use std::iter::FromIterator;
use std::ops::Bound;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use std::time::Instant;

Expand Down Expand Up @@ -62,7 +65,7 @@ pub(crate) struct SubgraphInfo {
/// The schema as supplied by the user
pub(crate) input: Arc<Schema>,
/// The schema we derive from `input` with `graphql::schema::api::api_schema`
pub(crate) api: Arc<ApiSchema>,
pub(crate) api: HashMap<ApiVersion, Arc<ApiSchema>>,
/// The block number at which this subgraph was grafted onto
/// another one. We do not allow reverting past this block
pub(crate) graft_block: Option<BlockNumber>,
Expand All @@ -88,7 +91,7 @@ pub struct StoreInner {
conn_round_robin_counter: AtomicUsize,

/// A cache of commonly needed data about a subgraph.
subgraph_cache: Mutex<LruCache<(DeploymentHash, ApiVersion), SubgraphInfo>>,
subgraph_cache: Mutex<LruCache<DeploymentHash, SubgraphInfo>>,

/// A cache for the layout metadata for subgraphs. The Store just
/// hosts this because it lives long enough, but it is managed from
Expand Down Expand Up @@ -262,8 +265,11 @@ impl DeploymentStore {
//
// This assumes that there are no concurrent writes to a subgraph.
let schema = self
.subgraph_info_with_conn(conn, &layout.site, &Default::default())?
.api;
.subgraph_info_with_conn(conn, &layout.site)?
.api
.get(&Default::default())
.expect("API schema should be present")
.clone();
let types_for_interface = schema.types_for_interface();
let entity_type = key.entity_type.to_string();
let types_with_shared_interface = Vec::from_iter(
Expand Down Expand Up @@ -536,14 +542,8 @@ impl DeploymentStore {
&self,
conn: &PgConnection,
site: &Site,
api_version: &ApiVersion,
) -> Result<SubgraphInfo, StoreError> {
if let Some(info) = self
.subgraph_cache
.lock()
.unwrap()
.get(&(site.deployment.clone(), api_version.clone()))
{
if let Some(info) = self.subgraph_cache.lock().unwrap().get(&site.deployment) {
return Ok(info.clone());
}

Expand All @@ -556,14 +556,22 @@ impl DeploymentStore {

// Generate an API schema for the subgraph and make sure all types in the
// API schema have a @subgraphId directive as well
let mut schema = input_schema.clone();
schema.document =
api_schema(&schema.document, api_version).map_err(|e| StoreError::Unknown(e.into()))?;
schema.add_subgraph_id_directives(site.deployment.clone());
let mut api: HashMap<ApiVersion, Arc<ApiSchema>> = HashMap::new();

for (version, _) in VERSIONS.iter() {
let api_version =
ApiVersion::new(&VersionReq::from_str(version.to_string().as_str()).unwrap())
.unwrap();
let mut schema = input_schema.clone();
schema.document = api_schema(&schema.document, &api_version)
.map_err(|e| StoreError::Unknown(e.into()))?;
schema.add_subgraph_id_directives(site.deployment.clone());
api.insert(api_version, Arc::new(ApiSchema::from_api_schema(schema)?));
}

let info = SubgraphInfo {
input: Arc::new(input_schema),
api: Arc::new(ApiSchema::from_api_schema(schema)?),
api,
graft_block,
debug_fork,
description,
Expand All @@ -572,30 +580,18 @@ impl DeploymentStore {

// Insert the schema into the cache.
let mut cache = self.subgraph_cache.lock().unwrap();
cache.insert((site.deployment.clone(), api_version.clone()), info);
cache.insert(site.deployment.clone(), info);

Ok(cache
.get(&(site.deployment.clone(), api_version.clone()))
.unwrap()
.clone())
Ok(cache.get(&site.deployment).unwrap().clone())
}

pub(crate) fn subgraph_info(
&self,
site: &Site,
api_version: &ApiVersion,
) -> Result<SubgraphInfo, StoreError> {
if let Some(info) = self
.subgraph_cache
.lock()
.unwrap()
.get(&(site.deployment.clone(), api_version.clone()))
{
pub(crate) fn subgraph_info(&self, site: &Site) -> Result<SubgraphInfo, StoreError> {
if let Some(info) = self.subgraph_cache.lock().unwrap().get(&site.deployment) {
return Ok(info.clone());
}

let conn = self.get_conn()?;
self.subgraph_info_with_conn(&conn, site, api_version)
self.subgraph_info_with_conn(&conn, site)
}

fn block_ptr_with_conn(
Expand Down Expand Up @@ -1054,7 +1050,7 @@ impl DeploymentStore {
) -> Result<StoreEvent, StoreError> {
let event = conn.transaction(|| -> Result<_, StoreError> {
// Don't revert past a graft point
let info = self.subgraph_info_with_conn(conn, site.as_ref(), &Default::default())?;
let info = self.subgraph_info_with_conn(conn, site.as_ref())?;
if let Some(graft_block) = info.graft_block {
if graft_block > block_ptr_to.number {
return Err(anyhow!(
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/query_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl QueryStoreTrait for QueryStore {
}

fn api_schema(&self) -> Result<Arc<ApiSchema>, QueryExecutionError> {
let info = self.store.subgraph_info(&self.site, &self.api_version)?;
Ok(info.api)
let info = self.store.subgraph_info(&self.site)?;
Ok(info.api.get(&self.api_version).unwrap().clone())
}

fn network_name(&self) -> &str {
Expand Down
12 changes: 6 additions & 6 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ impl SubgraphStoreInner {
) -> Result<DeploymentLocator, StoreError> {
let src = self.find_site(src.id.into())?;
let src_store = self.for_site(src.as_ref())?;
let src_info = src_store.subgraph_info(src.as_ref(), &Default::default())?;
let src_info = src_store.subgraph_info(src.as_ref())?;
let src_loc = DeploymentLocator::from(src.as_ref());

let dst = Arc::new(self.primary_conn()?.copy_site(&src, shard.clone())?);
Expand Down Expand Up @@ -864,7 +864,7 @@ impl SubgraphStoreInner {
.ok_or_else(|| constraint_violation!("no chain info for {}", deployment_id))?;
let latest_ethereum_block_number =
chain.latest_block.as_ref().map(|block| block.number());
let subgraph_info = store.subgraph_info(site.as_ref(), &Default::default())?;
let subgraph_info = store.subgraph_info(site.as_ref())?;
let network = site.network.clone();

let info = VersionInfo {
Expand Down Expand Up @@ -1132,7 +1132,7 @@ impl SubgraphStoreTrait for SubgraphStore {

fn input_schema(&self, id: &DeploymentHash) -> Result<Arc<Schema>, StoreError> {
let (store, site) = self.store(id)?;
let info = store.subgraph_info(&site, &Default::default())?;
let info = store.subgraph_info(&site)?;
Ok(info.input)
}

Expand All @@ -1142,8 +1142,8 @@ impl SubgraphStoreTrait for SubgraphStore {
version: &ApiVersion,
) -> Result<Arc<ApiSchema>, StoreError> {
let (store, site) = self.store(id)?;
let info = store.subgraph_info(&site, version)?;
Ok(info.api)
let info = store.subgraph_info(&site)?;
Ok(info.api.get(version).unwrap().clone())
}

fn debug_fork(
Expand All @@ -1152,7 +1152,7 @@ impl SubgraphStoreTrait for SubgraphStore {
logger: Logger,
) -> Result<Option<Arc<dyn SubgraphFork>>, StoreError> {
let (store, site) = self.store(id)?;
let info = store.subgraph_info(&site, &Default::default())?;
let info = store.subgraph_info(&site)?;
let fork_id = info.debug_fork;
let schema = info.input;

Expand Down

0 comments on commit eaa5c19

Please sign in to comment.