diff --git a/Cargo.toml b/Cargo.toml index fc503f7e..88256e8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ secp256k1 = { version = "0.29.1", features = ["global-context"] } anyhow = "1.0.97" strum = { version = "0.26", features = ["derive"] } tempfile = "3.19.1" -reqwest = { version = "0.12.28", default-features = false, features = ["json"] } +reqwest = { version = "0.12.28", default-features = false, features = ["json", "rustls-tls"] } http-body-util = "0.1" axum = "0.8.1" diff --git a/crates/client/src/graphs/mod.rs b/crates/client/src/graphs/mod.rs index 27a36ea6..6e48b973 100644 --- a/crates/client/src/graphs/mod.rs +++ b/crates/client/src/graphs/mod.rs @@ -1,4 +1,9 @@ use crate::graphs::graph_query::get_meta_data_query; +use std::time::Duration; +use tokio::time::sleep; + +const GRAPH_QUERY_RETRY_MAX_ATTEMPTS: usize = 3; +const GRAPH_QUERY_RETRY_BASE_DELAY_MS: u64 = 250; pub mod graph_query; #[derive(Clone)] @@ -8,7 +13,13 @@ pub struct GraphQueryClient { impl Default for GraphQueryClient { fn default() -> Self { - Self { client: reqwest::Client::new() } + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(10)) + .pool_max_idle_per_host(0) + .build() + .expect("failed to create graph query client"); + Self { client } } } @@ -22,18 +33,62 @@ impl GraphQueryClient { subgraph_url: &str, query: &str, ) -> anyhow::Result { - let response = self - .client - .post(subgraph_url) - .json(&serde_json::json!({ - "query": query - })) - .send() - .await? - .json::() - .await - .map_err(|err| anyhow::format_err!("{err}"))?; - Ok(response["data"].clone()) + let payload = serde_json::json!({ + "query": query + }); + + let mut last_err: Option = None; + for attempt in 1..=GRAPH_QUERY_RETRY_MAX_ATTEMPTS { + let result = self + .client + .post(subgraph_url) + .header(reqwest::header::CONNECTION, "close") + .json(&payload) + .send() + .await; + + match result { + Ok(resp) => { + let status = resp.status(); + let body = resp.text().await.unwrap_or_else(|_| String::new()); + if !status.is_success() { + let err = anyhow::format_err!( + "graph query failed, status: {}, body: {}", + status, + body + ); + if attempt < GRAPH_QUERY_RETRY_MAX_ATTEMPTS { + let delay = GRAPH_QUERY_RETRY_BASE_DELAY_MS * attempt as u64; + sleep(Duration::from_millis(delay)).await; + last_err = Some(err); + continue; + } + return Err(err); + } + + let response = serde_json::from_str::(&body) + .map_err(|err| anyhow::format_err!("invalid graph json response: {err}"))?; + + if let Some(errors) = response.get("errors") { + return Err(anyhow::format_err!("graph query returned errors: {errors}")); + } + + return Ok(response["data"].clone()); + } + Err(err) => { + let reqwest_err = anyhow::format_err!("graph request failed: {err:#}"); + if attempt < GRAPH_QUERY_RETRY_MAX_ATTEMPTS { + let delay = GRAPH_QUERY_RETRY_BASE_DELAY_MS * attempt as u64; + sleep(Duration::from_millis(delay)).await; + last_err = Some(reqwest_err); + continue; + } + return Err(reqwest_err); + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow::format_err!("unknown graph query error"))) } pub async fn get_sync_block_height(&self, subgraph_url: &str) -> anyhow::Result> { diff --git a/node/src/scheduled_tasks/event_watch_task.rs b/node/src/scheduled_tasks/event_watch_task.rs index 800c7f0c..484106c4 100644 --- a/node/src/scheduled_tasks/event_watch_task.rs +++ b/node/src/scheduled_tasks/event_watch_task.rs @@ -977,9 +977,16 @@ pub async fn fetch_history_events( .await { Ok(Some(v)) => v, - Ok(None) | Err(_) => { + Ok(None) => { warn!( - "fetch_history_events:fail to get graph sync block height, will try later" + "fetch_history_events:fail to get graph sync block height, will try later, empty value returned" + ); + sleep(Duration::from_millis(500)).await; + continue; + } + Err(e) => { + warn!( + "fetch_history_events:fail to get graph sync block height, will try later, err:{e:#}" ); sleep(Duration::from_millis(500)).await; continue; @@ -1089,14 +1096,24 @@ pub async fn monitor_events_item( .await?; let query_client = GraphQueryClient::new(); // let current_finalized = goat_client.get_finalized_block_number().await?; - let current_finalized = - match query_client.get_sync_block_height(&watch_contract.the_graph_url).await { - Ok(Some(v)) => v, - Ok(None) | Err(_) => { - warn!("monitor_events_item:fail to get graph sync block height, will try later"); - return Ok(()); - } - }; + let current_finalized = match query_client + .get_sync_block_height(&watch_contract.the_graph_url) + .await + { + Ok(Some(v)) => v, + Ok(None) => { + warn!( + "monitor_events_item:fail to get graph sync block height, will try later, empty value returned" + ); + return Ok(()); + } + Err(e) => { + warn!( + "monitor_events_item:fail to get graph sync block height, will try later, err:{e:#}" + ); + return Ok(()); + } + }; if watch_contract.from_height == 0 || watch_contract.from_height >= current_finalized { warn!( diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 62c3c938..57d75fdb 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "nightly-2025-06-30" +channel = "nightly-2025-09-15" components = ["rustfmt", "clippy"]