Skip to content

Continuous transaction fetching during syncing#1121

Merged
Klakurka merged 1 commit intoPayButton:masterfrom
Fabcien:busy_parallel_fetching
Mar 3, 2026
Merged

Continuous transaction fetching during syncing#1121
Klakurka merged 1 commit intoPayButton:masterfrom
Fabcien:busy_parallel_fetching

Conversation

@Fabcien
Copy link
Collaborator

@Fabcien Fabcien commented Mar 3, 2026

This commit changes the logic of the transactions fetching and processing during the initlal sync.

Previously the process was fetching the data from (up to) 16 addresses, then if there was enough txs to fill a batch it would drain the txs, then process the next 16 addresses.

This means that it's fetching the addresses by batches of 16 which is suboptimal as most of the time is spent waiting for the last ones to complete.

After this patch, the process attempts to always keep 16 address being fetched in parallel, while still processing the transactions the same way.

Example: A batch of 16 addresses contains 15 whith no transactions a one address with 10k transactions. Before the patch, the process will patiently wait for the last address to complete before fetching new addresses, so it's single chronik stream. After this patch, the first 15 addresses will be replaced by new ones and so on, so we maintain 16 chronik streams all the time.

This speeds up syncing significantly, especially on eCash.

There is one minor downside: the logs are not showing the progress accurately anymore. It's printing the current address index over the total addresses, but since they don't start/finish in order the number can jump back and forth. It still gives a good enough estimate of the progress.

Summary by CodeRabbit

  • Refactor
    • Optimized transaction data synchronization with enhanced concurrent processing to improve efficiency and reliability.

This commit changes the logic of the transactions fetching and processing during the initlal sync.

Previously the process was fetching the data from (up to) 16 addresses, then if there was enough txs to fill a batch it would drain the txs, then process the next 16 addresses.

This means that it's fetching the addresses by batches of 16 which is suboptimal as most of the time is spent waiting for the last ones to complete.

After this patch, the process attempts to always keep 16 address being fetched in parallel, while still processing the transactions the same way.

Example: A batch of 16 addresses contains 15 whith no transactions a one address with 10k transactions.
Before the patch, the process will patiently wait for the last address to complete before fetching new addresses, so it's single chronik stream.
After this patch, the first 15 addresses will be replaced by new ones and so on, so we maintain 16 chronik streams all the time.

This speeds up syncing significantly, especially on eCash.

There is one minor downside: the logs are not showing the progress accurately anymore. It's printing the current address index over the total addresses, but since they don't start/finish in order the number can jump back and forth. It still gives a good enough estimate of the progress.
@Klakurka Klakurka self-requested a review March 3, 2026 17:23
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

Refactors fetchLatestTxsForAddresses to execute per-address processing through a controlled worker pool rather than sequential batching. Introduces processAddress function for encapsulated paging and error handling, replaces flat loops with explicit worker coordination, and adjusts batch emission to accumulate transactions and yield in-flight progress.

Changes

Cohort / File(s) Summary
Worker Pool Refactoring
services/chronikService.ts
Replaces sequential batch processing with controlled worker pool architecture. Introduces processAddress function for per-address logic, adds activeWorkers set and startNextWorker mechanism to maintain fixed concurrency, refactors polling and yield logic for dynamic batch emission, integrates error handling with try/catch/finally, and adds initial kick-off and completion-wait loop.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • feat: separate process to do initial syncing #1065: Modifies the same services/chronikService.ts file to refactor missed-transaction syncing into worker/worker-pool flows with separate API handling.
  • Memory optimization #1120: Directly overlaps with the same fetchLatestTxsForAddresses function refactoring, including TX_BATCH_POLLING_DELAY constant, polling-driven batch/drain logic, and memory optimizations.

Suggested labels

enhancement (behind the scenes)

Suggested reviewers

  • Klakurka
  • lissavxo

Poem

🐰 A worker pool hops through the patch,
Coordinating tasks in a synchronized batch,
No more flat loops, just graceful concurrency,
Transactions now flow with deliberate urgency! 🌾

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: refactoring the transaction fetching process during syncing to maintain continuous parallel processing instead of fixed-batch processing.
Description check ✅ Passed The description is detailed and covers the problem, solution, benefits, and known limitations. It follows the template structure with a clear description section explaining the architectural change and its trade-offs.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@Klakurka Klakurka added the enhancement (behind the scenes) Stuff that users won't see label Mar 3, 2026
@Klakurka Klakurka added this to the Phase 3 milestone Mar 3, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
services/chronikService.ts (2)

359-383: ⚠️ Potential issue | 🔴 Critical

Do not convert page-fetch failures into empty-page flow.

When a fetch fails, the code sets pageTxs = [], then falls into empty-address / timestamp logic. That can silently skip addresses or crash on empty indexing. Failures should be handled explicitly, and true empty pages should be handled separately.

💡 Proposed fix
           try {
             pageTxs = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
           } catch (err: any) {
             console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err.message as string}`)
-            pageTxs = []
+            throw err
           }

-          if (pageIndex === 0 && pageTxs.length === 0) {
-            console.log(`${addrLogPrefix} EMPTY ADDRESS`)
-            break
-          }
+          if (pageTxs.length === 0) {
+            if (pageIndex === 0) {
+              console.log(`${addrLogPrefix} EMPTY ADDRESS`)
+            }
+            break
+          }

           if (pageTxs.length < CHRONIK_FETCH_N_TXS_PER_PAGE) {
             hasReachedStoppingCondition = true
           }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/chronikService.ts` around lines 359 - 383, The catch block around
getPaginatedTxs currently replaces errors with pageTxs = [], which then triggers
EMPTY ADDRESS / NO NEW TXS logic and can cause out-of-bounds access when
indexing pageTxs; instead, in the catch for getPaginatedTxs(address.address,
pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE) log the error with addrLogPrefix and
either rethrow or continue to the next address/page as an explicit failure path
(do not assign []). Add a guard before accessing pageTxs[0] and
pageTxs[pageTxs.length - 1] that ensures pageTxs.length > 0 and treat an actual
empty page (length === 0) separately from a fetch error; keep the
stopping-condition logic (hasReachedStoppingCondition) for true empty pages
only.

389-451: ⚠️ Potential issue | 🟠 Major

Add backpressure to prevent unbounded chronikTxs memory growth.

Workers keep appending while the generator can be paused at yield (DB commit path). Under high-throughput addresses, the in-memory buffer can grow too large before consumer drain catches up.

💡 Proposed fix
     let chronikTxs: ChronikTxWithAddress[] = []
     const completedAddresses: string[] = []
+    const MAX_BUFFERED_CHRONIK_TXS =
+      TX_EMIT_BATCH_SIZE * INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY * 4

@@
           const newTxsInThisPage = pageTxs.length
           if (newTxsInThisPage > 0) {
+            while (chronikTxs.length >= MAX_BUFFERED_CHRONIK_TXS) {
+              await new Promise(resolve => setTimeout(resolve, TX_BATCH_POLLING_DELAY))
+            }
             chronikTxs.push(...pageTxs.map(tx => ({ tx, address })))
             pageTxs = []
           }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/chronikService.ts` around lines 389 - 451, The generator allows
unbounded growth of chronikTxs because worker function processAddress keeps
pushing into chronikTxs while the consumer may be paused; add backpressure by
introducing high/low watermarks (e.g., CHRONIKTX_HIGH_WATERMARK = N *
TX_EMIT_BATCH_SIZE and CHRONIKTX_LOW_WATERMARK) and make processAddress (or the
push logic) await when chronikTxs.length >= HIGH_WATERMARK until the buffer
drains below LOW_WATERMARK; implement this with a shared Promise/resolve pair or
a simple async wait-notify (e.g., await drainPromise) that startNextWorker or
the consumer resolves when it removes batches (inside the batch-yielding loops
where chronikTxs.splice is called), and ensure activeWorkers/startNextWorker
check/await the same backpressure before spawning more workers so memory stays
bounded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@services/chronikService.ts`:
- Around line 359-383: The catch block around getPaginatedTxs currently replaces
errors with pageTxs = [], which then triggers EMPTY ADDRESS / NO NEW TXS logic
and can cause out-of-bounds access when indexing pageTxs; instead, in the catch
for getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
log the error with addrLogPrefix and either rethrow or continue to the next
address/page as an explicit failure path (do not assign []). Add a guard before
accessing pageTxs[0] and pageTxs[pageTxs.length - 1] that ensures pageTxs.length
> 0 and treat an actual empty page (length === 0) separately from a fetch error;
keep the stopping-condition logic (hasReachedStoppingCondition) for true empty
pages only.
- Around line 389-451: The generator allows unbounded growth of chronikTxs
because worker function processAddress keeps pushing into chronikTxs while the
consumer may be paused; add backpressure by introducing high/low watermarks
(e.g., CHRONIKTX_HIGH_WATERMARK = N * TX_EMIT_BATCH_SIZE and
CHRONIKTX_LOW_WATERMARK) and make processAddress (or the push logic) await when
chronikTxs.length >= HIGH_WATERMARK until the buffer drains below LOW_WATERMARK;
implement this with a shared Promise/resolve pair or a simple async wait-notify
(e.g., await drainPromise) that startNextWorker or the consumer resolves when it
removes batches (inside the batch-yielding loops where chronikTxs.splice is
called), and ensure activeWorkers/startNextWorker check/await the same
backpressure before spawning more workers so memory stays bounded.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 06d9c3e and bc42eaf.

📒 Files selected for processing (1)
  • services/chronikService.ts

@Klakurka Klakurka merged commit f44fa99 into PayButton:master Mar 3, 2026
3 checks passed
@Fabcien Fabcien deleted the busy_parallel_fetching branch March 3, 2026 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement (behind the scenes) Stuff that users won't see

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants