Skip to content

Commit

Permalink
Revert "Do not repeat a rollup after restart in some corner cases (#5675
Browse files Browse the repository at this point in the history
)"

This reverts commits:
- 22bca4e
- 22f805d
- 207e31f
  • Loading branch information
encalypto committed Jan 9, 2025
1 parent b7851fc commit 1f2732c
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 102 deletions.
6 changes: 0 additions & 6 deletions graph/src/blockchain/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,3 @@ impl ToSql<Timestamptz, Pg> for BlockTime {
<Timestamp as ToSql<Timestamptz, Pg>>::to_sql(&self.0, out)
}
}

impl FromSql<Timestamptz, Pg> for BlockTime {
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
<Timestamp as FromSql<Timestamptz, Pg>>::from_sql(bytes).map(|ts| Self(ts))
}
}
5 changes: 2 additions & 3 deletions graph/src/data/store/scalar/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chrono::{DateTime, Utc};
use diesel::serialize::ToSql;
use diesel::sql_types::Timestamptz;
use serde::{self, Deserialize, Serialize};
use stable_hash::StableHash;

Expand Down Expand Up @@ -94,12 +93,12 @@ impl Display for Timestamp {
}
}

impl ToSql<Timestamptz, diesel::pg::Pg> for Timestamp {
impl ToSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<_ as ToSql<Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
<_ as ToSql<diesel::sql_types::Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
}
}

Expand Down
11 changes: 0 additions & 11 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ pub struct EnvVarsStore {
pub use_brin_for_all_query_types: bool,
/// Temporary env var to disable certain lookups in the chain store
pub disable_block_cache_for_lookup: bool,
/// Temporary env var to fall back to the old broken way of determining
/// the time of the last rollup from the POI table instead of the new
/// way that fixes
/// https://github.com/graphprotocol/graph-node/issues/5530 Remove this
/// and all code that is dead as a consequence once this has been vetted
/// sufficiently, probably after 2024-12-01
/// Defaults to `false`, i.e. using the new fixed behavior
pub last_rollup_from_poi: bool,
}

// This does not print any values avoid accidentally leaking any sensitive env vars
Expand Down Expand Up @@ -176,7 +168,6 @@ impl From<InnerStore> for EnvVarsStore {
create_gin_indexes: x.create_gin_indexes,
use_brin_for_all_query_types: x.use_brin_for_all_query_types,
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
last_rollup_from_poi: x.last_rollup_from_poi,
}
}
}
Expand Down Expand Up @@ -238,8 +229,6 @@ pub struct InnerStore {
use_brin_for_all_query_types: bool,
#[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")]
disable_block_cache_for_lookup: bool,
#[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")]
last_rollup_from_poi: bool,
}

#[derive(Clone, Copy, Debug)]
Expand Down
19 changes: 3 additions & 16 deletions server/graphman/tests/deployment_query.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
pub mod util;

use graph::components::store::{QueryStoreManager, SubgraphStore};
use graph::data::subgraph::DeploymentHash;
use graph::prelude::QueryTarget;

use serde_json::json;
use test_store::store::create_test_subgraph;
use test_store::store::NETWORK_NAME;
use test_store::STORE;
use test_store::SUBGRAPH_STORE;
use test_store::store::NODE_ID;

use self::util::client::send_graphql_request;
use self::util::run_test;
Expand Down Expand Up @@ -58,15 +54,6 @@ fn graphql_returns_deployment_info() {
.await;

let namespace = format!("sgd{}", locator.id);
let node = SUBGRAPH_STORE.assigned_node(&locator).unwrap().unwrap();
let qs = STORE
.query_store(
QueryTarget::Deployment(locator.hash.clone(), Default::default()),
false,
)
.await
.expect("could get a query store");
let shard = qs.shard();

let expected_resp = json!({
"data": {
Expand All @@ -76,8 +63,8 @@ fn graphql_returns_deployment_info() {
"hash": "subgraph_1",
"namespace": namespace,
"name": "subgraph_1",
"nodeId": node.to_string(),
"shard": shard,
"nodeId": NODE_ID.to_string(),
"shard": "primary",
"chain": NETWORK_NAME,
"versionStatus": "current",
"isActive": true,
Expand Down
6 changes: 1 addition & 5 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,11 +910,7 @@ impl DeploymentStore {

let mut conn = self.get_conn()?;
let layout = store.layout(&mut conn, site.cheap_clone())?;
if ENV_VARS.store.last_rollup_from_poi {
layout.block_time(&mut conn, block)
} else {
layout.last_rollup(&mut conn)
}
layout.block_time(&mut conn, block)
}

pub(crate) async fn supports_proof_of_indexing<'a>(
Expand Down
14 changes: 0 additions & 14 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,20 +1012,6 @@ impl Layout {
Ok(block_time)
}

/// Find the time of the last rollup for the subgraph. We do this by
/// looking for the maximum timestamp in any aggregation table and
/// adding a little bit more than the corresponding interval to it. This
/// method crucially depends on the fact that we always write the rollup
/// for all aggregations, meaning that if some aggregations do not have
/// an entry with the maximum timestamp that there was just no data for
/// that interval, but we did try to aggregate at that time.
pub(crate) fn last_rollup(
&self,
conn: &mut PgConnection,
) -> Result<Option<BlockTime>, StoreError> {
Rollup::last_rollup(&self.rollups, conn)
}

/// Construct `Rolllup` for each of the aggregation mappings
/// `schema.agg_mappings()` and return them in the same order as the
/// aggregation mappings
Expand Down
48 changes: 1 addition & 47 deletions store/postgres/src/relational/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use std::sync::Arc;

use diesel::{sql_query, PgConnection, RunQueryDsl as _};

use diesel::sql_types::{Integer, Nullable, Timestamptz};
use diesel::sql_types::{Integer, Timestamptz};
use graph::blockchain::BlockTime;
use graph::components::store::{BlockNumber, StoreError};
use graph::constraint_violation;
Expand All @@ -70,7 +70,6 @@ use graph::schema::{
};
use graph::sqlparser::ast as p;
use graph::sqlparser::parser::ParserError;
use itertools::Itertools;

use crate::relational::Table;

Expand Down Expand Up @@ -230,10 +229,6 @@ pub(crate) struct Rollup {
#[allow(dead_code)]
agg_table: Arc<Table>,
insert_sql: String,
/// A query that determines the last time a rollup was done. The query
/// finds the latest timestamp in the aggregation table and adds the
/// length of the aggregation interval to deduce the last rollup time
last_rollup_sql: String,
}

impl Rollup {
Expand Down Expand Up @@ -261,12 +256,10 @@ impl Rollup {
);
let mut insert_sql = String::new();
sql.insert(&mut insert_sql)?;
let last_rollup_sql = sql.last_rollup();
Ok(Self {
interval,
agg_table,
insert_sql,
last_rollup_sql,
})
}

Expand All @@ -282,32 +275,6 @@ impl Rollup {
.bind::<Integer, _>(block);
query.execute(conn)
}

pub(crate) fn last_rollup(
rollups: &[Rollup],
conn: &mut PgConnection,
) -> Result<Option<BlockTime>, StoreError> {
#[derive(QueryableByName)]
#[diesel(check_for_backend(diesel::pg::Pg))]
struct BlockTimeRes {
#[diesel(sql_type = Nullable<Timestamptz>)]
last_rollup: Option<BlockTime>,
}

if rollups.is_empty() {
return Ok(None);
}

let union_all = rollups
.iter()
.map(|rollup| &rollup.last_rollup_sql)
.join(" union all ");
let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a");
let last_rollup = sql_query(&query)
.get_result::<BlockTimeRes>(conn)
.map(|res| res.last_rollup)?;
Ok(last_rollup)
}
}

struct RollupSql<'a> {
Expand Down Expand Up @@ -512,19 +479,6 @@ impl<'a> RollupSql<'a> {
self.insert_bucket(w)
}
}

/// Generate a query that selects the timestamp of the last rollup
fn last_rollup(&self) -> String {
// The timestamp column contains the timestamp of the start of the
// last bucket. The last rollup was therefore at least
// `self.interval` after that. We add 1 second to make sure we are
// well within the next bucket
let secs = self.interval.as_duration().as_secs() + 1;
format!(
"select max(timestamp) + '{} s'::interval as last_rollup from {}",
secs, self.agg_table.qualified_name
)
}
}

/// Write the elements in `list` separated by commas into `w`. The list
Expand Down

0 comments on commit 1f2732c

Please sign in to comment.