Conversation
It's O(n) instead of O(2n)
This changes the logic in fetchLatestTxsForAddresses to yield the chronik transactions periodically instead of waiting for the full address transactions to be fetched. With this implementation, there is a race between the download and the processing, which will drain the transactions every 500ms up to the point where there is not enough remaining txs to fill a batch. This means that the tx buffer memory can grow up to batch size (20) + how many txs can be downloaded during the polling delay. This is not a hard limit but is a clear win when processing addresses with thousands of transactions.
📝 WalkthroughWalkthroughAdded a new polling delay constant and refactored chronikService batching logic to emit transactions incrementally with per-address progress tracking. Implements race-based worker completion detection and completion-driven flush for improved batch emission timing, alongside memory management optimizations. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
services/chronikService.ts (1)
883-887: Use indexed tuple lookup to avoid repeated linear scans.
commitTuples.find(...)inside thecreatedTxsloop is O(n²) for the batch. Build a lookup once and do O(1) access.♻️ Proposed refactor
if (createdTxs.length > 0) { + const tupleByHash = new Map<string, { row: Prisma.TransactionUncheckedCreateInput, raw: Tx, addressString: string }>() + for (const t of commitTuples) { + if (!tupleByHash.has(t.row.hash)) tupleByHash.set(t.row.hash, t) + } + const triggerBatch: BroadcastTxData[] = [] for (const createdTx of createdTxs) { - const tuple = commitTuples.find(t => t.row.hash === createdTx.hash) + const tuple = tupleByHash.get(createdTx.hash) if (tuple == null) { continue } const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx) triggerBatch.push(bd) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/chronikService.ts` around lines 883 - 887, The loop currently calls commitTuples.find(...) for every createdTx causing O(n²) behavior; instead, before iterating createdTxs build a lookup map keyed by row.hash (e.g., const commitTupleByHash = new Map(commitTuples.map(t => [t.row.hash, t]))) and then inside the createdTxs loop replace commitTuples.find(...) with a constant-time lookup (commitTupleByHash.get(createdTx.hash)); keep using the same tuple variable and pass tuple.raw into broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx) and handle the null/undefined case as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@services/chronikService.ts`:
- Around line 425-448: The loop can exit before emitting pending per-address
completions because the break only checks allWorkersDone and chronikTxs.length;
update the loop logic so the break also requires completedAddresses to be empty
(or check completedAddresses.length in the break guard), and/or move the "Yield
completed addresses" block to run immediately before the final break; ensure
both remaining chronikTxs (chronikTxs.splice(0)) and
completedAddresses.splice(0) are emitted when allWorkersDone is true so no
completedAddresses are dropped—adjust while condition and final break to depend
on allWorkersDone && chronikTxs.length === 0 && completedAddresses.length === 0
and keep the existing batching logic using TX_EMIT_BATCH_SIZE.
---
Nitpick comments:
In `@services/chronikService.ts`:
- Around line 883-887: The loop currently calls commitTuples.find(...) for every
createdTx causing O(n²) behavior; instead, before iterating createdTxs build a
lookup map keyed by row.hash (e.g., const commitTupleByHash = new
Map(commitTuples.map(t => [t.row.hash, t]))) and then inside the createdTxs loop
replace commitTuples.find(...) with a constant-time lookup
(commitTupleByHash.get(createdTx.hash)); keep using the same tuple variable and
pass tuple.raw into broadcastIncomingTx(createdTx.address.address, tuple.raw,
createdTx) and handle the null/undefined case as before.
| while (!allWorkersDone || chronikTxs.length > 0) { | ||
| // Yield batches if buffer is large enough | ||
| while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { | ||
| const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) | ||
| yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } | ||
| } | ||
|
|
||
| // If workers are done, yield any remaining transactions (even if < batch size) | ||
| if (allWorkersDone && chronikTxs.length > 0) { | ||
| // This clears chronikTxs so the below check for length === 0 is true | ||
| const remaining = chronikTxs.splice(0) | ||
| yield { chronikTxs: remaining, addressesSynced: [] } | ||
| } | ||
|
|
||
| // Yield completed addresses if any | ||
| if (completedAddresses.length > 0) { | ||
| const completed = completedAddresses.splice(0) | ||
| yield { chronikTxs: [], addressesSynced: completed } | ||
| } | ||
|
|
||
| // If workers are done and no more transactions, break | ||
| if (allWorkersDone && chronikTxs.length === 0) { | ||
| break | ||
| } |
There was a problem hiding this comment.
Per-address completion events can be skipped by the loop exit condition.
The loop guard/break only checks allWorkersDone and chronikTxs.length. If workers finish after the completedAddresses check in an iteration, pending completed addresses may not be yielded incrementally before exit.
🔧 Proposed fix
- while (!allWorkersDone || chronikTxs.length > 0) {
+ while (!allWorkersDone || chronikTxs.length > 0 || completedAddresses.length > 0) {
// Yield batches if buffer is large enough
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
}
@@
- if (allWorkersDone && chronikTxs.length === 0) {
+ if (allWorkersDone && chronikTxs.length === 0 && completedAddresses.length === 0) {
break
}Also applies to: 451-459
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@services/chronikService.ts` around lines 425 - 448, The loop can exit before
emitting pending per-address completions because the break only checks
allWorkersDone and chronikTxs.length; update the loop logic so the break also
requires completedAddresses to be empty (or check completedAddresses.length in
the break guard), and/or move the "Yield completed addresses" block to run
immediately before the final break; ensure both remaining chronikTxs
(chronikTxs.splice(0)) and completedAddresses.splice(0) are emitted when
allWorkersDone is true so no completedAddresses are dropped—adjust while
condition and final break to depend on allWorkersDone && chronikTxs.length === 0
&& completedAddresses.length === 0 and keep the existing batching logic using
TX_EMIT_BATCH_SIZE.
Instead of downloading the whole history of an address then commit the transactions to the database, store them periodically if there are enough to fill a batch. This frees the memory and avoid memory peaks when syncing a big address.
Includes some trivial memory optimizations as well which are low hanging fruits.
Summary by CodeRabbit