diff --git a/crates/bitvm2-ga/src/operator/api.rs b/crates/bitvm2-ga/src/operator/api.rs index 93cfb25c..c9c53f70 100644 --- a/crates/bitvm2-ga/src/operator/api.rs +++ b/crates/bitvm2-ga/src/operator/api.rs @@ -966,7 +966,7 @@ pub fn take2_timelocks(network: Network) -> (u32, u32) { + if network == Network::Regtest { 6 } else { 0 }, // Regtest extra delay num_blocks_per_network(network, CONNECTOR_D_TIMELOCK) + if network == Network::Testnet { 6 } else { 0 } // Testnet extra delay - + if network == Network::Testnet4 { 100 } else { 0 } // Testnet4 extra delay + + if network == Network::Testnet4 { 60 } else { 0 } // Testnet4 extra delay + if network == Network::Regtest { 6 } else { 0 }, // Regtest extra delay ) } diff --git a/node/README.md b/node/README.md index ce54f0eb..4a073417 100644 --- a/node/README.md +++ b/node/README.md @@ -847,7 +847,6 @@ bitvm2-noded \ | `GOAT_ADDRESS` | Conditional | GOAT address (required for Operator/Challenger) | - | | `ENABLE_RELAYER` | No | Enable relayer mode for Committee nodes | `false` | | `BTC_CHAIN_URL` | No | Bitcoin Esplora API endpoint | Public Esplora | -| `MARA_SLIPSTREAM_API_URL` | No | MARA slipstream API base URL (used for non-standard tx broadcast) | mainnet: `https://slipstream.mara.com/api`; testnet4: `https://teststream.mara.com/api` | | `GOAT_PROOF_BUILD_URL` | No | Proof Builder RPC endpoint | - | | `NODE_NAME` | No | Node display name | `ZKM` | | `OPERATOR_NODE_SERVICE_FEE` | No | Operator service fee rate | `0.001` | diff --git a/node/src/env.rs b/node/src/env.rs index 34d718cc..260adc1d 100644 --- a/node/src/env.rs +++ b/node/src/env.rs @@ -33,9 +33,6 @@ pub const ENV_GOAT_SEQUENCER_SET_MULTI_SIG_VERIFIER_ADDRESS: &str = pub const ENV_ENABLE_RELAYER: &str = "ENABLE_RELAYER"; pub const ENV_ENABLE_UPDATE_SPV_CONTRACT: &str = "ENABLE_UPDATE_SPV_CONTRACT"; pub const ENV_BTC_BLOCK_CONFIRMS: &str = "BTC_BLOCK_CONFIRMS"; -pub const ENV_MARA_SLIPSTREAM_API_URL: &str = "MARA_SLIPSTREAM_API_URL"; -pub const DEFAULT_MARA_SLIPSTREAM_MAINNET_API_URL: &str = "https://slipstream.mara.com/api"; -pub const DEFAULT_MARA_SLIPSTREAM_TESTNET4_API_URL: &str = "https://teststream.mara.com/api"; pub const ENV_GOAT_PRIVATE_KEY: &str = "GOAT_PRIVATE_KEY"; @@ -352,19 +349,6 @@ pub fn get_btc_url_from_env() -> Option { std::env::var(ENV_BTC_CHAIN_URL).ok() } -pub fn get_mara_slipstream_api_base_url(network: Network) -> String { - if let Ok(url) = std::env::var(ENV_MARA_SLIPSTREAM_API_URL) - && !url.trim().is_empty() - { - return url; - } - - match network { - Network::Bitcoin => DEFAULT_MARA_SLIPSTREAM_MAINNET_API_URL.to_string(), - _ => DEFAULT_MARA_SLIPSTREAM_TESTNET4_API_URL.to_string(), - } -} - pub fn get_sequencer_set_monitor_start_cosmos_block_from_env() -> Option { std::env::var(ENV_SEQUENCER_SET_MONITOR_START_COSMOS_BLOCK) .ok() diff --git a/node/src/lib.rs b/node/src/lib.rs index 3042c519..a5122d5f 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -10,6 +10,7 @@ mod scheduled_tasks; pub mod utils; pub use scheduled_tasks::{ run_maintenance_tasks, run_sequencer_set_hash_monitor_task, run_watch_event_task, + run_watchdog_task, }; mod error; diff --git a/node/src/main.rs b/node/src/main.rs index a360ba8f..daaf530d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -18,6 +18,7 @@ use bitvm2_noded::utils::{ }; use bitvm2_noded::{ rpc_service, run_maintenance_tasks, run_sequencer_set_hash_monitor_task, run_watch_event_task, + run_watchdog_task, }; use anyhow::Result; @@ -169,6 +170,7 @@ async fn main() -> Result<(), Box> { let local_db_clone2 = local_db.clone(); let local_db_clone3 = local_db.clone(); let local_db_clone4 = local_db.clone(); + let local_db_clone5 = local_db.clone(); let opt_rpc_addr = opt.rpc_addr.clone(); let peer_id_string_clone = peer_id_string.clone(); let metric_registry_clone = Arc::new(Mutex::new(metric_registry)); @@ -275,6 +277,21 @@ async fn main() -> Result<(), Box> { } })); + if actor == Actor::Challenger || actor == Actor::Watchtower { + let cancel_token_clone = cancellation_token.clone(); + task_handles.push(tokio::spawn(async move { + let btc_client = + Arc::new(BTCClient::new(get_network(), get_btc_url_from_env().as_deref())); + match run_watchdog_task(local_db_clone5, btc_client, 60, cancel_token_clone).await { + Ok(tag) => Ok(tag), + Err(e) => { + tracing::error!("Watchdog task error: {}", e); + Err("watchdog_error".to_string()) + } + } + })); + } + let swarm_actor = actor.clone(); let cancel_token_clone = cancellation_token.clone(); task_handles.push(tokio::spawn(async move { diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 217361ec..92c0bf35 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -4,6 +4,7 @@ pub mod instance_maintenance_tasks; mod node_maintenance_tasks; mod sequencer_set_hash_monitor_task; mod spv_maintenance_tasks; +pub mod watchdog; use crate::action::GOATMessageContent; use crate::env::{get_maintenance_run_timeout_secs, is_enable_update_spv_contract, is_relayer}; @@ -21,6 +22,7 @@ use client::btc_chain::BTCClient; use client::goat_chain::GOATClient; pub use event_watch_task::{is_processing_gateway_history_events, run_watch_event_task}; pub use sequencer_set_hash_monitor_task::run_sequencer_set_hash_monitor_task; +pub use watchdog::run_watchdog_task; use std::sync::Arc; use std::time::{Duration, Instant}; use store::localdb::{LocalDB, StorageProcessor}; diff --git a/node/src/scheduled_tasks/watchdog/alert.rs b/node/src/scheduled_tasks/watchdog/alert.rs new file mode 100644 index 00000000..f331d934 --- /dev/null +++ b/node/src/scheduled_tasks/watchdog/alert.rs @@ -0,0 +1,103 @@ +use super::monitor::{AlertType, LivenessAlert}; +use reqwest::Client; +use serde_json::json; +use tracing::warn; + +pub struct AlertConfig { + pub webhook_url: Option, + pub telegram_bot_token: Option, + pub telegram_chat_id: Option, + pub pagerduty_routing_key: Option, +} + +impl AlertConfig { + pub fn from_env() -> Self { + Self { + webhook_url: std::env::var("WATCHDOG_WEBHOOK_URL").ok(), + telegram_bot_token: std::env::var("WATCHDOG_TELEGRAM_TOKEN").ok(), + telegram_chat_id: std::env::var("WATCHDOG_TELEGRAM_CHAT_ID").ok(), + pagerduty_routing_key: std::env::var("WATCHDOG_PAGERDUTY_KEY").ok(), + } + } + +} + +pub async fn dispatch_alerts(alerts: Vec, config: &AlertConfig) { + for alert in &alerts { + let msg = format_alert_message(alert); + + if let Some(url) = &config.webhook_url { + send_webhook(url, &msg).await; + } + if let (Some(token), Some(chat_id)) = (&config.telegram_bot_token, &config.telegram_chat_id) + { + send_telegram(token, chat_id, &msg).await; + } + if let Some(key) = &config.pagerduty_routing_key { + send_pagerduty(key, alert).await; + } + } +} + +fn format_alert_message(a: &LivenessAlert) -> String { + let kind = match a.alert_type { + AlertType::AssertInitMissing => "AssertInitMissing", + AlertType::AssertCommitStall => "AssertCommitStall", + }; + format!( + "[GOAT Watchdog] Challenger Liveness Alert\n\ + Type: {kind}\n\ + Instance: {}\n\ + Graph: {}\n\ + Kickoff TX: {}\n\ + Blocks Until Timeout: {}\n\ + Action Required: Check challenger node immediately!", + a.instance_id, a.graph_id, a.kickoff_txid, a.blocks_until_timeout + ) +} + +async fn send_webhook(url: &str, message: &str) { + let client = Client::new(); + if let Err(e) = client.post(url).json(&json!({ "text": message })).send().await { + warn!("watchdog: webhook send failed: {e}"); + } +} + +async fn send_telegram(token: &str, chat_id: &str, message: &str) { + let url = format!("https://api.telegram.org/bot{token}/sendMessage"); + let client = Client::new(); + if let Err(e) = + client.post(&url).json(&json!({ "chat_id": chat_id, "text": message })).send().await + { + warn!("watchdog: telegram send failed: {e}"); + } +} + +async fn send_pagerduty(routing_key: &str, alert: &LivenessAlert) { + let kind = match alert.alert_type { + AlertType::AssertInitMissing => "AssertInitMissing", + AlertType::AssertCommitStall => "AssertCommitStall", + }; + let client = Client::new(); + if let Err(e) = client + .post("https://events.pagerduty.com/v2/enqueue") + .json(&json!({ + "routing_key": routing_key, + "event_action": "trigger", + "payload": { + "summary": format!("Challenger offline - {kind}"), + "severity": "critical", + "source": "goat-bitvm2-watchdog", + "custom_details": { + "graph_id": alert.graph_id.to_string(), + "kickoff_txid": alert.kickoff_txid, + "blocks_until_timeout": alert.blocks_until_timeout, + } + } + })) + .send() + .await + { + warn!("watchdog: pagerduty send failed: {e}"); + } +} diff --git a/node/src/scheduled_tasks/watchdog/mod.rs b/node/src/scheduled_tasks/watchdog/mod.rs new file mode 100644 index 00000000..fed2c38b --- /dev/null +++ b/node/src/scheduled_tasks/watchdog/mod.rs @@ -0,0 +1,40 @@ +pub mod alert; +pub mod monitor; + +use alert::{dispatch_alerts, AlertConfig}; +use client::btc_chain::BTCClient; +use monitor::scan_stale_challenges; +use std::sync::Arc; +use std::time::Duration; +use store::localdb::LocalDB; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +pub async fn run_watchdog_task( + local_db: LocalDB, + btc_client: Arc, + interval_secs: u64, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let alert_config = AlertConfig::from_env(); + info!("watchdog: starting challenger liveness monitor (interval={}s)", interval_secs); + + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(interval_secs)) => { + match scan_stale_challenges(&local_db, &btc_client).await { + Ok(alerts) if !alerts.is_empty() => { + warn!("watchdog: {} liveness alert(s) detected", alerts.len()); + dispatch_alerts(alerts, &alert_config).await; + } + Ok(_) => info!("watchdog: all challenge graphs healthy"), + Err(e) => warn!("watchdog: scan error: {e}"), + } + } + _ = cancellation_token.cancelled() => { + info!("watchdog: received shutdown signal"); + return Ok("watchdog_shutdown".to_string()); + } + } + } +} diff --git a/node/src/scheduled_tasks/watchdog/monitor.rs b/node/src/scheduled_tasks/watchdog/monitor.rs new file mode 100644 index 00000000..ba255e79 --- /dev/null +++ b/node/src/scheduled_tasks/watchdog/monitor.rs @@ -0,0 +1,123 @@ +use crate::env::get_network; +use crate::scheduled_tasks::graph_maintenance_tasks::{AssertCommitStatus, ChallengeSubStatus}; +use client::btc_chain::BTCClient; +use store::localdb::{GraphQuery, LocalDB}; +use store::GraphStatus; +use tracing::{info, warn}; +use uuid::Uuid; + +use bitvm2_lib::challenger::assert_commit_timeout_timelock; +use bitvm2_lib::operator::watchtower_challenge_timeout_timelock; + +#[derive(Debug, Clone)] +pub struct LivenessAlert { + pub graph_id: Uuid, + pub instance_id: Uuid, + pub alert_type: AlertType, + pub blocks_until_timeout: i64, + pub kickoff_txid: String, +} + +#[derive(Debug, Clone)] +pub enum AlertType { + /// Kickoff confirmed but Assert-Init not detected, approaching WT challenge timeout + AssertInitMissing, + /// Assert-Init detected but Assert-Commit not complete, approaching assert timeout + AssertCommitStall, +} + +fn alert_margin_blocks() -> i64 { + std::env::var("WATCHDOG_ALERT_MARGIN_BLOCKS").ok().and_then(|v| v.parse().ok()).unwrap_or(6) +} + +pub async fn scan_stale_challenges( + local_db: &LocalDB, + btc_client: &BTCClient, +) -> anyhow::Result> { + let current_height = btc_client.get_height().await? as i64; + let assert_timelock = assert_commit_timeout_timelock(get_network()) as i64; + let wt_timelock = watchtower_challenge_timeout_timelock(get_network()) as i64; + let margin = alert_margin_blocks(); + + let mut storage = local_db.acquire().await?; + let (graphs, _) = storage + .find_graphs(GraphQuery::default().with_status(GraphStatus::Challenge.to_string())) + .await?; + + info!("watchdog: scanning {} challenge graph(s) at height {}", graphs.len(), current_height); + + let mut alerts = vec![]; + + for graph in graphs { + let sub_status: ChallengeSubStatus = + serde_json::from_str(&graph.sub_status).unwrap_or_default(); + + // Already resolved — skip + if sub_status.is_disproved() || sub_status.is_all_commit_success() { + continue; + } + + let kickoff_txid_str = + graph.kickoff_txid.as_ref().map(|t| t.0.to_string()).unwrap_or_default(); + + // Check: kickoff confirmed but assert-init not yet seen + if graph.assert_init_txid.is_none() { + if let Some(ref txid) = graph.kickoff_txid { + match btc_client.get_tx_status(&txid.0).await { + Ok(status) if status.confirmed => { + if let Some(height) = status.block_height { + let blocks_left = (height as i64 + wt_timelock) - current_height; + if blocks_left < margin { + alerts.push(LivenessAlert { + graph_id: graph.graph_id, + instance_id: graph.instance_id, + alert_type: AlertType::AssertInitMissing, + blocks_until_timeout: blocks_left, + kickoff_txid: kickoff_txid_str.clone(), + }); + } + } + } + Ok(_) => {} // not confirmed yet + Err(e) => { + warn!( + "watchdog: failed to get kickoff tx status for {}: {e}", + graph.graph_id + ) + } + } + } + } + + // Check: assert-init seen but assert-commit stalled + if sub_status.assert_commit_status == AssertCommitStatus::OperatorInit { + if let Some(ref txid) = graph.assert_init_txid { + match btc_client.get_tx_status(&txid.0).await { + Ok(status) if status.confirmed => { + if let Some(height) = status.block_height { + let blocks_left = (height as i64 + assert_timelock) - current_height; + if blocks_left < margin { + alerts.push(LivenessAlert { + graph_id: graph.graph_id, + instance_id: graph.instance_id, + alert_type: AlertType::AssertCommitStall, + blocks_until_timeout: blocks_left, + kickoff_txid: kickoff_txid_str, + }); + } + } + } + Ok(_) => {} + Err(e) => { + warn!( + "watchdog: failed to get assert_init tx status for {}: {e}", + graph.graph_id + ) + } + } + } + } + } + + Ok(alerts) +} diff --git a/node/src/utils.rs b/node/src/utils.rs index b5758a87..6bcb431f 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -355,6 +355,15 @@ pub mod todo_funcs { let operator_master_key = OperatorMasterKey::new(get_bitvm_key()?); Ok(operator_master_key.preimage_for_graph(graph_id, index)) } + pub async fn broadcast_nonstandard_tx(btc_client: &BTCClient, tx: &Transaction) -> Result<()> { + match broadcast_tx(btc_client, tx).await { + Ok(_) => Ok(()), + Err(e) => { + tracing::warn!("broadcast_nonstandard_tx not implemented yet: {} , Skipped", e); + Ok(()) + } + } + } } pub mod evm_swap_utils { @@ -1658,58 +1667,6 @@ pub async fn get_fee_rate(client: &BTCClient) -> Result { } } -pub async fn broadcast_nonstandard_tx(btc_client: &BTCClient, tx: &Transaction) -> Result<()> { - match broadcast_tx(btc_client, tx).await { - Ok(_) => Ok(()), - Err(e) => { - let network = btc_client.network(); - let base_url = get_mara_slipstream_api_base_url(network); - let submit_url = format!("{}/transactions", base_url.trim_end_matches('/')); - let tx_hex = hex::encode(serialize(tx)); - - warn!( - "normal broadcast failed, fallback to MARA slipstream api. network: {network:?}, url: {submit_url}, err: {e}" - ); - - let response = reqwest::Client::new() - .post(&submit_url) - .json(&serde_json::json!({ "tx_hex": tx_hex })) - .send() - .await - .map_err(|fallback_err| { - anyhow!( - "fallback broadcast request failed. normal error: {e}; fallback error: {fallback_err}" - ) - })?; - - let status = response.status(); - let body = response - .text() - .await - .unwrap_or_else(|_| "".to_string()); - - if !status.is_success() { - bail!( - "fallback broadcast failed. normal error: {e}; fallback status: {status}; fallback body: {body}" - ); - } - - let status_field = serde_json::from_str::(&body) - .ok() - .and_then(|v| v.get("status").and_then(|s| s.as_str()).map(str::to_string)); - if let Some(status_field) = status_field - && !status_field.eq_ignore_ascii_case("success") - { - bail!( - "fallback broadcast returned non-success status. normal error: {e}; fallback body: {body}" - ); - } - - Ok(()) - } - } -} - /// Broadcasts a raw transaction to the Bitcoin network using the mempool API. /// /// Requirements: @@ -2261,7 +2218,7 @@ pub async fn build_sign_and_broadcast_non_standard_tx( &node_keypair, )?; } - broadcast_nonstandard_tx(client, &tx).await?; + todo_funcs::broadcast_nonstandard_tx(client, &tx).await?; Ok(tx.compute_txid()) } None => { @@ -4680,116 +4637,6 @@ pub async fn get_largest_watchtower_challenge_block( #[cfg(test)] mod tests { use super::*; - use std::time::Duration; - - async fn build_large_nonstandard_tx_for_debug( - btc_client: &BTCClient, - node_keypair: &Keypair, - payload_size: usize, - ) -> Result { - let mut msg = "large_opreturn_msg".to_string().as_bytes().to_vec(); - msg.extend(vec![0u8; payload_size - msg.len()]); - let scr = script! { - OP_RETURN - {msg} - } - .compile(); - let mut tx = Transaction { - version: bitcoin::transaction::Version(2), - lock_time: bitcoin::absolute::LockTime::ZERO, - input: vec![], - output: vec![TxOut { value: Amount::ZERO, script_pubkey: scr }], - }; - - let node_pubkey: PublicKey = node_keypair.public_key().into(); - let node_address = node_p2wsh_address(get_network(), &node_pubkey); - - let (inputs, _, change_amount) = get_proper_utxo_set( - btc_client, - tx.weight().to_vbytes_ceil(), - node_address.clone(), - Amount::ZERO, - 1.0, - ) - .await? - .ok_or_else(|| { - anyhow!("insufficient UTXOs on {} for debug nonstandard tx test", node_address) - })?; - - for input in &inputs { - tx.input.push(TxIn { - previous_output: input.outpoint, - script_sig: ScriptBuf::new(), - sequence: Sequence::MAX, - witness: Witness::default(), - }); - } - - if change_amount > Amount::from_sat(DUST_AMOUNT) { - tx.output - .push(TxOut { script_pubkey: node_address.script_pubkey(), value: change_amount }); - } - - for (i, input) in inputs.iter().enumerate() { - node_sign(&mut tx, i, input.amount, EcdsaSighashType::All, node_keypair)?; - } - - Ok(tx) - } - - #[tokio::test] - #[ignore] - async fn debug_test_broadcast_nonstandard_tx_and_check_mempool_visibility() -> Result<()> { - if std::env::var(ENV_BITVM_SECRET).is_err() { - bail!("BITVM_SECRET is required and should correspond to a funded testnet key"); - } - - let network = get_network(); - if network != Network::Testnet4 { - bail!("refuse to run this debug test on non-testnet4 network: {network}"); - } - - let btc_url = - get_btc_url_from_env().unwrap_or_else(|| "https://mempool.space/testnet4".to_string()); - let btc_client = BTCClient::new(network, Some(&btc_url)); - let node_keypair = get_bitvm_key()?; - - // Use an oversized OP_RETURN payload to make tx non-standard for regular mempool relay. - let tx = build_large_nonstandard_tx_for_debug(&btc_client, &node_keypair, 200_000).await?; - let txid = tx.compute_txid(); - println!( - "debug nonstandard tx built: txid={}, vbytes={}", - txid, - tx.weight().to_vbytes_ceil() - ); - - let normal_err = broadcast_tx(&btc_client, &tx).await.err().ok_or_else(|| { - anyhow!("expected normal mempool broadcast to fail for non-standard tx") - })?; - println!("normal broadcast failed as expected: {normal_err:#}"); - - broadcast_nonstandard_tx(&btc_client, &tx).await?; - println!("broadcast_nonstandard_tx succeeded for non-standard tx"); - - let mut found_in_mempool = false; - for _ in 0..10 { - if btc_client.get_tx(&txid).await?.is_some() { - found_in_mempool = true; - break; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - - if found_in_mempool { - println!( - "successfully found non-standard tx in mempool after broadcast_nonstandard_tx" - ); - } else { - println!("failed to find non-standard tx in mempool after broadcast_nonstandard_tx"); - } - - Ok(()) - } #[tokio::test] #[ignore = "test on regtest"]