Continuous transaction fetching during syncing#1121
Continuous transaction fetching during syncing#1121Klakurka merged 1 commit intoPayButton:masterfrom
Conversation
This commit changes the logic of the transactions fetching and processing during the initlal sync. Previously the process was fetching the data from (up to) 16 addresses, then if there was enough txs to fill a batch it would drain the txs, then process the next 16 addresses. This means that it's fetching the addresses by batches of 16 which is suboptimal as most of the time is spent waiting for the last ones to complete. After this patch, the process attempts to always keep 16 address being fetched in parallel, while still processing the transactions the same way. Example: A batch of 16 addresses contains 15 whith no transactions a one address with 10k transactions. Before the patch, the process will patiently wait for the last address to complete before fetching new addresses, so it's single chronik stream. After this patch, the first 15 addresses will be replaced by new ones and so on, so we maintain 16 chronik streams all the time. This speeds up syncing significantly, especially on eCash. There is one minor downside: the logs are not showing the progress accurately anymore. It's printing the current address index over the total addresses, but since they don't start/finish in order the number can jump back and forth. It still gives a good enough estimate of the progress.
📝 WalkthroughWalkthroughRefactors Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
services/chronikService.ts (2)
359-383:⚠️ Potential issue | 🔴 CriticalDo not convert page-fetch failures into empty-page flow.
When a fetch fails, the code sets
pageTxs = [], then falls into empty-address / timestamp logic. That can silently skip addresses or crash on empty indexing. Failures should be handled explicitly, and true empty pages should be handled separately.💡 Proposed fix
try { pageTxs = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE) } catch (err: any) { console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err.message as string}`) - pageTxs = [] + throw err } - if (pageIndex === 0 && pageTxs.length === 0) { - console.log(`${addrLogPrefix} EMPTY ADDRESS`) - break - } + if (pageTxs.length === 0) { + if (pageIndex === 0) { + console.log(`${addrLogPrefix} EMPTY ADDRESS`) + } + break + } if (pageTxs.length < CHRONIK_FETCH_N_TXS_PER_PAGE) { hasReachedStoppingCondition = true }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/chronikService.ts` around lines 359 - 383, The catch block around getPaginatedTxs currently replaces errors with pageTxs = [], which then triggers EMPTY ADDRESS / NO NEW TXS logic and can cause out-of-bounds access when indexing pageTxs; instead, in the catch for getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE) log the error with addrLogPrefix and either rethrow or continue to the next address/page as an explicit failure path (do not assign []). Add a guard before accessing pageTxs[0] and pageTxs[pageTxs.length - 1] that ensures pageTxs.length > 0 and treat an actual empty page (length === 0) separately from a fetch error; keep the stopping-condition logic (hasReachedStoppingCondition) for true empty pages only.
389-451:⚠️ Potential issue | 🟠 MajorAdd backpressure to prevent unbounded
chronikTxsmemory growth.Workers keep appending while the generator can be paused at
yield(DB commit path). Under high-throughput addresses, the in-memory buffer can grow too large before consumer drain catches up.💡 Proposed fix
let chronikTxs: ChronikTxWithAddress[] = [] const completedAddresses: string[] = [] + const MAX_BUFFERED_CHRONIK_TXS = + TX_EMIT_BATCH_SIZE * INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY * 4 @@ const newTxsInThisPage = pageTxs.length if (newTxsInThisPage > 0) { + while (chronikTxs.length >= MAX_BUFFERED_CHRONIK_TXS) { + await new Promise(resolve => setTimeout(resolve, TX_BATCH_POLLING_DELAY)) + } chronikTxs.push(...pageTxs.map(tx => ({ tx, address }))) pageTxs = [] }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/chronikService.ts` around lines 389 - 451, The generator allows unbounded growth of chronikTxs because worker function processAddress keeps pushing into chronikTxs while the consumer may be paused; add backpressure by introducing high/low watermarks (e.g., CHRONIKTX_HIGH_WATERMARK = N * TX_EMIT_BATCH_SIZE and CHRONIKTX_LOW_WATERMARK) and make processAddress (or the push logic) await when chronikTxs.length >= HIGH_WATERMARK until the buffer drains below LOW_WATERMARK; implement this with a shared Promise/resolve pair or a simple async wait-notify (e.g., await drainPromise) that startNextWorker or the consumer resolves when it removes batches (inside the batch-yielding loops where chronikTxs.splice is called), and ensure activeWorkers/startNextWorker check/await the same backpressure before spawning more workers so memory stays bounded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@services/chronikService.ts`:
- Around line 359-383: The catch block around getPaginatedTxs currently replaces
errors with pageTxs = [], which then triggers EMPTY ADDRESS / NO NEW TXS logic
and can cause out-of-bounds access when indexing pageTxs; instead, in the catch
for getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
log the error with addrLogPrefix and either rethrow or continue to the next
address/page as an explicit failure path (do not assign []). Add a guard before
accessing pageTxs[0] and pageTxs[pageTxs.length - 1] that ensures pageTxs.length
> 0 and treat an actual empty page (length === 0) separately from a fetch error;
keep the stopping-condition logic (hasReachedStoppingCondition) for true empty
pages only.
- Around line 389-451: The generator allows unbounded growth of chronikTxs
because worker function processAddress keeps pushing into chronikTxs while the
consumer may be paused; add backpressure by introducing high/low watermarks
(e.g., CHRONIKTX_HIGH_WATERMARK = N * TX_EMIT_BATCH_SIZE and
CHRONIKTX_LOW_WATERMARK) and make processAddress (or the push logic) await when
chronikTxs.length >= HIGH_WATERMARK until the buffer drains below LOW_WATERMARK;
implement this with a shared Promise/resolve pair or a simple async wait-notify
(e.g., await drainPromise) that startNextWorker or the consumer resolves when it
removes batches (inside the batch-yielding loops where chronikTxs.splice is
called), and ensure activeWorkers/startNextWorker check/await the same
backpressure before spawning more workers so memory stays bounded.
This commit changes the logic of the transactions fetching and processing during the initlal sync.
Previously the process was fetching the data from (up to) 16 addresses, then if there was enough txs to fill a batch it would drain the txs, then process the next 16 addresses.
This means that it's fetching the addresses by batches of 16 which is suboptimal as most of the time is spent waiting for the last ones to complete.
After this patch, the process attempts to always keep 16 address being fetched in parallel, while still processing the transactions the same way.
Example: A batch of 16 addresses contains 15 whith no transactions a one address with 10k transactions. Before the patch, the process will patiently wait for the last address to complete before fetching new addresses, so it's single chronik stream. After this patch, the first 15 addresses will be replaced by new ones and so on, so we maintain 16 chronik streams all the time.
This speeds up syncing significantly, especially on eCash.
There is one minor downside: the logs are not showing the progress accurately anymore. It's printing the current address index over the total addresses, but since they don't start/finish in order the number can jump back and forth. It still gives a good enough estimate of the progress.
Summary by CodeRabbit