diff --git a/constants/index.ts b/constants/index.ts index e18cb3ec..d835c275 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -280,6 +280,7 @@ export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 16 export const TX_EMIT_BATCH_SIZE = 200 // for our generator, not chronik export const DB_COMMIT_BATCH_SIZE = 200 // tamanho dos lotes para commit no DB +export const TX_BATCH_POLLING_DELAY = 500 // delay (ms) between polling for new batches of txs to commit to the DB export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 diff --git a/services/chronikService.ts b/services/chronikService.ts index 3a026c55..2e009325 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1,7 +1,7 @@ import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client' import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs' import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes' -import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS } from 'constants/index' +import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY } from 'constants/index' import { productionAddresses } from 'prisma-local/seeds/addresses' import prisma from 'prisma-local/clientInstance' import { @@ -344,6 +344,9 @@ export class ChronikBlockchainClient { console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`) + // Track completed addresses + const completedAddresses: string[] = [] + const perAddressWorkers = addressBatchSlice.map(async (address) => { const addrLogPrefix = `${logPrefix} > ${address.address}:` const lastSyncedTimestampSeconds = this.getLastSyncTs(address) @@ -389,6 +392,7 @@ export class ChronikBlockchainClient { const newTxsInThisPage = pageTxs.length if (newTxsInThisPage > 0) { chronikTxs.push(...pageTxs.map(tx => ({ tx, address }))) + pageTxs = [] } if (oldestTs < lastSyncedTimestampSeconds) { @@ -404,22 +408,60 @@ export class ChronikBlockchainClient { if (newTxs > 0) { console.log(`${addrLogPrefix} ${newTxs} new txs.`) } + completedAddresses.push(address.address) }) syncedAlready += addressBatchSlice.length - await Promise.all( + // Start workers but don't wait - yield batches while they're running + const workersPromise = Promise.all( perAddressWorkers.map(async worker => await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`)) ) ) - // Yield full TX batches when buffer reaches TX_EMIT_BATCH_SIZE - while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { - const chronikTxsSlice = chronikTxs.slice(0, TX_EMIT_BATCH_SIZE) - chronikTxs = chronikTxs.slice(TX_EMIT_BATCH_SIZE) - yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + // Race between worker completion and periodic checks to yield batches incrementally + let allWorkersDone = false + + while (!allWorkersDone || chronikTxs.length > 0) { + // Yield batches if buffer is large enough + while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) + yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + } + + // If workers are done, yield any remaining transactions (even if < batch size) + if (allWorkersDone && chronikTxs.length > 0) { + // This clears chronikTxs so the below check for length === 0 is true + const remaining = chronikTxs.splice(0) + yield { chronikTxs: remaining, addressesSynced: [] } + } + + // Yield completed addresses if any + if (completedAddresses.length > 0) { + const completed = completedAddresses.splice(0) + yield { chronikTxs: [], addressesSynced: completed } + } + + // If workers are done and no more transactions, break + if (allWorkersDone && chronikTxs.length === 0) { + break + } + + // Wait a bit or until workers complete + const raceResult = await Promise.race([ + workersPromise.then(() => true), + new Promise(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY)) + ]) + + // Update flag if workers completed + if (raceResult) { + allWorkersDone = true + } } + // Ensure all workers are finished + await workersPromise + // Yield batch marker for completed address group yield { chronikTxs: [], addressesSynced: lastBatchAddresses } } @@ -836,19 +878,22 @@ export class ChronikBlockchainClient { } if (createdTxs.length > 0) { - const rawByHash = new Map(commitTuples.map(p => [p.raw.txid, p.raw])) const triggerBatch: BroadcastTxData[] = [] for (const createdTx of createdTxs) { - const raw = rawByHash.get(createdTx.hash) - if (raw == null) { + const tuple = commitTuples.find(t => t.row.hash === createdTx.hash) + if (tuple == null) { continue } - const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx) + const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx) triggerBatch.push(bd) } if (runTriggers && triggerBatch.length > 0) { await executeTriggersBatch(triggerBatch, this.networkId) } + + // Release memory + createdTxs.length = 0 + triggerBatch.length = 0 } // Get the latest timestamp of all committed transactions (including pre-existent) for each address. @@ -933,11 +978,14 @@ export class ChronikBlockchainClient { } toCommit.push(...tupleFromBatch) + // Release memory + tupleFromBatch.length = 0 if (toCommit.length >= DB_COMMIT_BATCH_SIZE) { - const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE) - toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE) + const commitPairs = toCommit.splice(0, DB_COMMIT_BATCH_SIZE) await this.commitTransactionsBatch(commitPairs, productionAddressesIds, runTriggers) + // Clear commitPairs + commitPairs.length = 0 } } catch (err: any) { console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR in batch (scoped): ${err.message as string}`)