Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ secp256k1 = { version = "0.29.1", features = ["global-context"] }
anyhow = "1.0.97"
strum = { version = "0.26", features = ["derive"] }
tempfile = "3.19.1"
reqwest = { version = "0.12.28", default-features = false, features = ["json"] }
reqwest = { version = "0.12.28", default-features = false, features = ["json", "rustls-tls"] }
http-body-util = "0.1"

axum = "0.8.1"
Expand Down
81 changes: 68 additions & 13 deletions crates/client/src/graphs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::graphs::graph_query::get_meta_data_query;
use std::time::Duration;
use tokio::time::sleep;

const GRAPH_QUERY_RETRY_MAX_ATTEMPTS: usize = 3;
const GRAPH_QUERY_RETRY_BASE_DELAY_MS: u64 = 250;

pub mod graph_query;
#[derive(Clone)]
Expand All @@ -8,7 +13,13 @@ pub struct GraphQueryClient {

impl Default for GraphQueryClient {
fn default() -> Self {
Self { client: reqwest::Client::new() }
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(10))
.pool_max_idle_per_host(0)
.build()
.expect("failed to create graph query client");
Self { client }
}
}

Expand All @@ -22,18 +33,62 @@ impl GraphQueryClient {
subgraph_url: &str,
query: &str,
) -> anyhow::Result<serde_json::Value> {
let response = self
.client
.post(subgraph_url)
.json(&serde_json::json!({
"query": query
}))
.send()
.await?
.json::<serde_json::Value>()
.await
.map_err(|err| anyhow::format_err!("{err}"))?;
Ok(response["data"].clone())
let payload = serde_json::json!({
"query": query
});

let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=GRAPH_QUERY_RETRY_MAX_ATTEMPTS {
let result = self
.client
.post(subgraph_url)
.header(reqwest::header::CONNECTION, "close")
.json(&payload)
.send()
.await;

match result {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| String::new());
if !status.is_success() {
let err = anyhow::format_err!(
"graph query failed, status: {}, body: {}",
status,
body
);
if attempt < GRAPH_QUERY_RETRY_MAX_ATTEMPTS {
let delay = GRAPH_QUERY_RETRY_BASE_DELAY_MS * attempt as u64;
sleep(Duration::from_millis(delay)).await;
last_err = Some(err);
continue;
}
return Err(err);
}

let response = serde_json::from_str::<serde_json::Value>(&body)
.map_err(|err| anyhow::format_err!("invalid graph json response: {err}"))?;

if let Some(errors) = response.get("errors") {
return Err(anyhow::format_err!("graph query returned errors: {errors}"));
}

return Ok(response["data"].clone());
}
Err(err) => {
let reqwest_err = anyhow::format_err!("graph request failed: {err:#}");
if attempt < GRAPH_QUERY_RETRY_MAX_ATTEMPTS {
let delay = GRAPH_QUERY_RETRY_BASE_DELAY_MS * attempt as u64;
sleep(Duration::from_millis(delay)).await;
last_err = Some(reqwest_err);
continue;
}
return Err(reqwest_err);
}
}
}

Err(last_err.unwrap_or_else(|| anyhow::format_err!("unknown graph query error")))
}

pub async fn get_sync_block_height(&self, subgraph_url: &str) -> anyhow::Result<Option<i64>> {
Expand Down
37 changes: 27 additions & 10 deletions node/src/scheduled_tasks/event_watch_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,16 @@ pub async fn fetch_history_events(
.await
{
Ok(Some(v)) => v,
Ok(None) | Err(_) => {
Ok(None) => {
warn!(
"fetch_history_events:fail to get graph sync block height, will try later"
"fetch_history_events:fail to get graph sync block height, will try later, empty value returned"
);
sleep(Duration::from_millis(500)).await;
continue;
}
Err(e) => {
warn!(
"fetch_history_events:fail to get graph sync block height, will try later, err:{e:#}"
);
sleep(Duration::from_millis(500)).await;
continue;
Expand Down Expand Up @@ -1089,14 +1096,24 @@ pub async fn monitor_events_item(
.await?;
let query_client = GraphQueryClient::new();
// let current_finalized = goat_client.get_finalized_block_number().await?;
let current_finalized =
match query_client.get_sync_block_height(&watch_contract.the_graph_url).await {
Ok(Some(v)) => v,
Ok(None) | Err(_) => {
warn!("monitor_events_item:fail to get graph sync block height, will try later");
return Ok(());
}
};
let current_finalized = match query_client
.get_sync_block_height(&watch_contract.the_graph_url)
.await
{
Ok(Some(v)) => v,
Ok(None) => {
warn!(
"monitor_events_item:fail to get graph sync block height, will try later, empty value returned"
);
return Ok(());
}
Err(e) => {
warn!(
"monitor_events_item:fail to get graph sync block height, will try later, err:{e:#}"
);
return Ok(());
}
};

if watch_contract.from_height == 0 || watch_contract.from_height >= current_finalized {
warn!(
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "nightly-2025-06-30"
channel = "nightly-2025-09-15"
components = ["rustfmt", "clippy"]
Loading