Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 16
export const TX_EMIT_BATCH_SIZE = 200 // for our generator, not chronik
export const DB_COMMIT_BATCH_SIZE = 200 // tamanho dos lotes para commit no DB
export const TX_BATCH_POLLING_DELAY = 500 // delay (ms) between polling for new batches of txs to commit to the DB

export const TRIGGER_POST_CONCURRENCY = 100
export const TRIGGER_EMAIL_CONCURRENCY = 100
Expand Down
74 changes: 61 additions & 13 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client'
import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS } from 'constants/index'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY } from 'constants/index'
import { productionAddresses } from 'prisma-local/seeds/addresses'
import prisma from 'prisma-local/clientInstance'
import {
Expand Down Expand Up @@ -344,6 +344,9 @@ export class ChronikBlockchainClient {

console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`)

// Track completed addresses
const completedAddresses: string[] = []

const perAddressWorkers = addressBatchSlice.map(async (address) => {
const addrLogPrefix = `${logPrefix} > ${address.address}:`
const lastSyncedTimestampSeconds = this.getLastSyncTs(address)
Expand Down Expand Up @@ -389,6 +392,7 @@ export class ChronikBlockchainClient {
const newTxsInThisPage = pageTxs.length
if (newTxsInThisPage > 0) {
chronikTxs.push(...pageTxs.map(tx => ({ tx, address })))
pageTxs = []
}

if (oldestTs < lastSyncedTimestampSeconds) {
Expand All @@ -404,22 +408,60 @@ export class ChronikBlockchainClient {
if (newTxs > 0) {
console.log(`${addrLogPrefix} ${newTxs} new txs.`)
}
completedAddresses.push(address.address)
})
syncedAlready += addressBatchSlice.length

await Promise.all(
// Start workers but don't wait - yield batches while they're running
const workersPromise = Promise.all(
perAddressWorkers.map(async worker =>
await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`))
)
)

// Yield full TX batches when buffer reaches TX_EMIT_BATCH_SIZE
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
const chronikTxsSlice = chronikTxs.slice(0, TX_EMIT_BATCH_SIZE)
chronikTxs = chronikTxs.slice(TX_EMIT_BATCH_SIZE)
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
// Race between worker completion and periodic checks to yield batches incrementally
let allWorkersDone = false

while (!allWorkersDone || chronikTxs.length > 0) {
// Yield batches if buffer is large enough
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
}

// If workers are done, yield any remaining transactions (even if < batch size)
if (allWorkersDone && chronikTxs.length > 0) {
// This clears chronikTxs so the below check for length === 0 is true
const remaining = chronikTxs.splice(0)
yield { chronikTxs: remaining, addressesSynced: [] }
}

// Yield completed addresses if any
if (completedAddresses.length > 0) {
const completed = completedAddresses.splice(0)
yield { chronikTxs: [], addressesSynced: completed }
}

// If workers are done and no more transactions, break
if (allWorkersDone && chronikTxs.length === 0) {
break
}
Comment on lines +425 to +448
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Per-address completion events can be skipped by the loop exit condition.

The loop guard/break only checks allWorkersDone and chronikTxs.length. If workers finish after the completedAddresses check in an iteration, pending completed addresses may not be yielded incrementally before exit.

🔧 Proposed fix
-      while (!allWorkersDone || chronikTxs.length > 0) {
+      while (!allWorkersDone || chronikTxs.length > 0 || completedAddresses.length > 0) {
         // Yield batches if buffer is large enough
         while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
           const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
           yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
         }
@@
-        if (allWorkersDone && chronikTxs.length === 0) {
+        if (allWorkersDone && chronikTxs.length === 0 && completedAddresses.length === 0) {
           break
         }

Also applies to: 451-459

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/chronikService.ts` around lines 425 - 448, The loop can exit before
emitting pending per-address completions because the break only checks
allWorkersDone and chronikTxs.length; update the loop logic so the break also
requires completedAddresses to be empty (or check completedAddresses.length in
the break guard), and/or move the "Yield completed addresses" block to run
immediately before the final break; ensure both remaining chronikTxs
(chronikTxs.splice(0)) and completedAddresses.splice(0) are emitted when
allWorkersDone is true so no completedAddresses are dropped—adjust while
condition and final break to depend on allWorkersDone && chronikTxs.length === 0
&& completedAddresses.length === 0 and keep the existing batching logic using
TX_EMIT_BATCH_SIZE.


// Wait a bit or until workers complete
const raceResult = await Promise.race([
workersPromise.then(() => true),
new Promise<boolean>(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY))
])

// Update flag if workers completed
if (raceResult) {
allWorkersDone = true
}
}

// Ensure all workers are finished
await workersPromise

// Yield batch marker for completed address group
yield { chronikTxs: [], addressesSynced: lastBatchAddresses }
}
Expand Down Expand Up @@ -836,19 +878,22 @@ export class ChronikBlockchainClient {
}

if (createdTxs.length > 0) {
const rawByHash = new Map(commitTuples.map(p => [p.raw.txid, p.raw]))
const triggerBatch: BroadcastTxData[] = []
for (const createdTx of createdTxs) {
const raw = rawByHash.get(createdTx.hash)
if (raw == null) {
const tuple = commitTuples.find(t => t.row.hash === createdTx.hash)
if (tuple == null) {
continue
}
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx)
triggerBatch.push(bd)
}
if (runTriggers && triggerBatch.length > 0) {
await executeTriggersBatch(triggerBatch, this.networkId)
}

// Release memory
createdTxs.length = 0
triggerBatch.length = 0
}

// Get the latest timestamp of all committed transactions (including pre-existent) for each address.
Expand Down Expand Up @@ -933,11 +978,14 @@ export class ChronikBlockchainClient {
}

toCommit.push(...tupleFromBatch)
// Release memory
tupleFromBatch.length = 0

if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)
const commitPairs = toCommit.splice(0, DB_COMMIT_BATCH_SIZE)
await this.commitTransactionsBatch(commitPairs, productionAddressesIds, runTriggers)
// Clear commitPairs
commitPairs.length = 0
}
} catch (err: any) {
console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR in batch (scoped): ${err.message as string}`)
Expand Down