Skip to content

Commit f04a25b

Browse files
graph, store: Add experimental job to automatically set account-like flag on eligible tables
Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com>
1 parent ae84888 commit f04a25b

File tree

4 files changed

+155
-1
lines changed

4 files changed

+155
-1
lines changed

graph/src/env/store.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,18 @@ pub struct EnvVarsStore {
149149
/// The number of rows to fetch from the foreign data wrapper in one go,
150150
/// this will be set as the option 'fetch_size' on all foreign servers
151151
pub fdw_fetch_size: usize,
152+
153+
/// Experimental feature to automatically set the account-like flag on eligible tables
154+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`
155+
/// If not set, the job is disabled.
156+
/// Utilizes materialized view stats that refresh every 6 hours to discover heavy-write tables.
157+
pub account_like_scan_interval_hours: Option<u32>,
158+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`
159+
/// Tables must have at least this many total versions to be considered.
160+
pub account_like_min_versions_count: Option<u64>,
161+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`
162+
/// Defines the maximum share of unique entities (e.g. 0.01 for a 1:100 entity-to-version ratio).
163+
pub account_like_max_unique_ratio: Option<f64>,
152164
}
153165

154166
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -206,6 +218,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
206218
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
207219
insert_extra_cols: x.insert_extra_cols,
208220
fdw_fetch_size: x.fdw_fetch_size,
221+
account_like_scan_interval_hours: x.account_like_scan_interval_hours,
222+
account_like_min_versions_count: x.account_like_min_versions_count,
223+
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
209224
};
210225
if let Some(timeout) = vars.batch_timeout {
211226
if timeout < 2 * vars.batch_target_duration {
@@ -217,6 +232,16 @@ impl TryFrom<InnerStore> for EnvVarsStore {
217232
if vars.batch_workers < 1 {
218233
bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1");
219234
}
235+
if vars.account_like_scan_interval_hours.is_some()
236+
&& (vars.account_like_min_versions_count.is_none()
237+
|| vars.account_like_max_unique_ratio.is_none())
238+
{
239+
bail!(
240+
"Both GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT and \
241+
GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO must be set when \
242+
GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS is set"
243+
);
244+
}
220245
Ok(vars)
221246
}
222247
}
@@ -295,6 +320,12 @@ pub struct InnerStore {
295320
insert_extra_cols: usize,
296321
#[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")]
297322
fdw_fetch_size: usize,
323+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS")]
324+
account_like_scan_interval_hours: Option<u32>,
325+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT")]
326+
account_like_min_versions_count: Option<u64>,
327+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO")]
328+
account_like_max_unique_ratio: Option<ZeroToOneF64>,
298329
}
299330

300331
#[derive(Clone, Copy, Debug)]

store/postgres/src/deployment_store.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,50 @@ impl DeploymentStore {
765765
catalog::drop_index(&mut conn, schema_name.as_str(), &index_name).await
766766
}
767767

768+
pub(crate) async fn identify_account_like_candidates(
769+
&self,
770+
min_versions: u64,
771+
ratio: f64,
772+
) -> Result<Vec<(String, String)>, StoreError> {
773+
#[derive(QueryableByName)]
774+
struct TableStat {
775+
#[diesel(sql_type = diesel::sql_types::Text)]
776+
subgraph: String,
777+
#[diesel(sql_type = diesel::sql_types::Text)]
778+
table_name: String,
779+
}
780+
let result = self
781+
.with_conn(move |conn, _| {
782+
let query = r#"
783+
SELECT
784+
stats.subgraph,
785+
stats.table_name
786+
FROM info.table_stats AS stats
787+
LEFT JOIN subgraphs.table_stats ts
788+
ON ts.deployment = stats.deployment
789+
AND ts.table_name = stats.table_name
790+
WHERE
791+
stats.versions > $1
792+
AND stats.ratio < $2
793+
AND ts.is_account_like IS NOT TRUE
794+
"#;
795+
796+
diesel::sql_query(query)
797+
.bind::<diesel::sql_types::BigInt, _>(min_versions as i64)
798+
.bind::<diesel::sql_types::Double, _>(ratio)
799+
.load::<TableStat>(conn)
800+
.map_err(Into::into)
801+
})
802+
.await;
803+
804+
result.map(|tables| {
805+
tables
806+
.into_iter()
807+
.map(|table_stat| (table_stat.subgraph, table_stat.table_name))
808+
.collect()
809+
})
810+
}
811+
768812
pub(crate) async fn set_account_like(
769813
&self,
770814
site: Arc<Site>,
@@ -1815,10 +1859,11 @@ impl DeploymentStore {
18151859
// We hardcode our materialized views, but could also use
18161860
// pg_matviews to list all of them, though that might inadvertently
18171861
// refresh materialized views that operators created themselves
1818-
const VIEWS: [&str; 3] = [
1862+
const VIEWS: [&str; 4] = [
18191863
"info.table_sizes",
18201864
"info.subgraph_sizes",
18211865
"info.chain_sizes",
1866+
"info.table_stats",
18221867
];
18231868
let mut conn = store.pool.get().await?;
18241869
for view in VIEWS {

store/postgres/src/jobs.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ pub fn register(
4848
Arc::new(RefreshMaterializedView::new(store.subgraph_store())),
4949
6 * ONE_HOUR,
5050
);
51+
52+
if let Some(interval) = ENV_VARS.store.account_like_scan_interval_hours {
53+
runner.register(
54+
Arc::new(AccountLikeJob::new(store.subgraph_store())),
55+
interval * ONE_HOUR,
56+
);
57+
}
5158
}
5259

5360
/// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a
@@ -235,3 +242,37 @@ impl Job for UnusedJob {
235242
}
236243
}
237244
}
245+
246+
struct AccountLikeJob {
247+
store: Arc<SubgraphStore>,
248+
}
249+
250+
impl AccountLikeJob {
251+
fn new(store: Arc<SubgraphStore>) -> AccountLikeJob {
252+
AccountLikeJob { store }
253+
}
254+
}
255+
256+
#[async_trait]
257+
impl Job for AccountLikeJob {
258+
fn name(&self) -> &str {
259+
"Set account-like flag on eligible tables"
260+
}
261+
262+
async fn run(&self, logger: &Logger) {
263+
// Safe to unwrap due to a startup validation
264+
// which ensures these values are present when account_like_scan_interval_hours is set.
265+
let min_versions_count = ENV_VARS.store.account_like_min_versions_count.unwrap();
266+
let ratio = ENV_VARS.store.account_like_max_unique_ratio.unwrap();
267+
268+
self.store
269+
.identify_and_set_account_like(logger, min_versions_count, ratio)
270+
.await
271+
.unwrap_or_else(|e| {
272+
error!(
273+
logger,
274+
"Failed to set account-like flag on eligible tables: {}", e
275+
)
276+
});
277+
}
278+
}

store/postgres/src/subgraph_store.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,43 @@ impl Inner {
12771277
store.drop_index(site, index_name).await
12781278
}
12791279

1280+
pub(crate) async fn identify_and_set_account_like(
1281+
&self,
1282+
logger: &Logger,
1283+
min_records: u64,
1284+
ratio: f64,
1285+
) -> Result<(), StoreError> {
1286+
for (_shard, store) in &self.stores {
1287+
let candidates = store
1288+
.identify_account_like_candidates(min_records, ratio)
1289+
.await?;
1290+
1291+
graph::slog::debug!(
1292+
logger,
1293+
"Found {} account-like candidates in shard {}",
1294+
candidates.len(),
1295+
_shard
1296+
);
1297+
1298+
for (subgraph, table_name) in candidates {
1299+
graph::slog::debug!(
1300+
logger,
1301+
"Setting table {} as account-like for deployment {}",
1302+
table_name,
1303+
subgraph
1304+
);
1305+
1306+
let hash = DeploymentHash::new(subgraph.clone()).map_err(|_| {
1307+
anyhow!("Failed to create deployment hash for subgraph: {subgraph}")
1308+
})?;
1309+
let (store, site) = self.store(&hash)?;
1310+
store.set_account_like(site, &table_name, true).await?;
1311+
}
1312+
}
1313+
1314+
Ok(())
1315+
}
1316+
12801317
pub async fn set_account_like(
12811318
&self,
12821319
deployment: &DeploymentLocator,

0 commit comments

Comments
 (0)