Skip to content

#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730

Open
yruslan wants to merge 10 commits intomainfrom
feature/181-synamodb-as-bookkeeping
Open

#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730
yruslan wants to merge 10 commits intomainfrom
feature/181-synamodb-as-bookkeeping

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented Apr 2, 2026

Closes #181

Summary by CodeRabbit

  • New Features

    • DynamoDB-backed bookkeeping: offsets, journal, metadata, locks, automatic table creation, configurable region/table prefix/ARN, and builder APIs.
  • Documentation

    • Added comprehensive DynamoDB bookkeeping guide and example configs with setup, IAM, troubleshooting, comparisons, and usage notes.
  • Chores

    • Added AWS SDK DynamoDB dependency and version; standardized resource lifecycle by making key components closeable.
  • Tests

    • Added builder and behavior tests for DynamoDB-related components.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 2, 2026

Walkthrough

Adds optional DynamoDB-backed bookkeeping: new DynamoDB implementations for Bookkeeper components (OffsetManager, Journal, MetadataManager, TokenLock/Factory), traits extended to AutoCloseable, builders and tests, example configs/docs, and AWS SDK DynamoDB dependency updates.

Changes

Cohort / File(s) Summary
API & core lifecycle
pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala, pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLockFactory.scala, pramen/core/src/main/scala/.../AppContext.scala, pramen/core/src/main/scala/.../journal/Journal.scala, pramen/core/src/main/scala/.../bookkeeper/OffsetManager.scala, pramen/extras/src/main/scala/.../SingleMessageProducer.scala
Traits updated to extend AutoCloseable; close() now part of the lifecycle and implementations add no-op or delegating overrides.
Bookkeeper wiring
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
Bookkeeper selects DynamoDB when configured; introduces BookkeeperDynamoDb with builders, table-prefix/ARN handling, table creation, and explicit closing of components.
Offset managers
pramen/core/src/main/scala/.../OffsetManagerDynamoDb.scala, pramen/core/src/main/scala/.../OffsetManagerCached.scala, pramen/core/src/main/scala/.../OffsetManagerJdbc.scala
New OffsetManagerDynamoDb with CRUD, pagination, table management and builder; cached wrapper delegates close(); JDBC variant adds no-op close().
Journal (DynamoDB)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala
New JournalDynamoDB with table auto-creation, scan/put implementations, failure handling, builder, and client close.
Metadata manager (DynamoDB)
pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala
New MetadataManagerDynamoDb implementing get/set/delete/query, table creation, builder, and client lifecycle.
Token lock & factory (DynamoDB)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
DynamoDB-backed locking and factory with conditional writes, expiry/retry logic, table auto-creation, builders, and client close.
Builders & tests
pramen/core/src/test/scala/.../BookkeeperDynamoDbBuilderSuite.scala, .../OffsetManagerDynamoDbBuilderSuite.scala, .../JournalDynamoDBBuilderSuite.scala, .../TokenLockFactoryDynamoDbBuilderSuite.scala, .../MetadataManagerDynamoDbBuilderSuite.scala
New unit tests validating builder APIs, required-field validation, ARN/table-name normalization, and fluent chaining.
Config, examples & docs
README.md, pramen/examples/dynamodb_bookkeeping/README.md, pramen/examples/dynamodb_bookkeeping/dynamodb_bookkeeping.conf, pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf, pramen/core/src/main/scala/.../app/config/BookkeeperConfig.scala, pramen/core/src/test/.../BookkeepingConfigFactory.scala
Added example configs and extensive DynamoDB README; BookkeeperConfig extended with DynamoDB fields; test factory updated.
Build & dependencies
pramen/pom.xml, pramen/core/pom.xml, pramen/project/Dependencies.scala, pramen/project/Versions.scala
Added Maven property aws.sdk.version, managed dependency for software.amazon.awssdk:dynamodb, and core dependency entry (provided scope).
Minor tests & adapters
pramen/core/src/test/scala/.../model/QueryBuilderSuite.scala, pramen/core/src/main/scala/.../bookkeeper/OffsetManagerCached.scala, test mocks
Small test variable rename, added override modifiers, and added delegating/no-op close() overrides in several implementations and mocks.
Top-level README
README.md
Added experimental DynamoDB bookkeeping section and Maven dependency snippet for software.amazon.awssdk:dynamodb.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Config as BookkeeperConfig
    participant Factory as Bookkeeper.fromConfig
    participant Offset as OffsetManagerDynamoDb
    participant Journal as JournalDynamoDB
    participant LockFactory as TokenLockFactoryDynamoDb
    participant Dynamo as AWS DynamoDB

    App->>Config: load config (dynamoDbRegion/table.prefix/arn)
    App->>Factory: Bookkeeper.fromConfig(config)
    Factory->>Offset: build OffsetManagerDynamoDb (builder)
    Offset->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
    Factory->>Journal: build JournalDynamoDB (builder)
    Journal->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
    Factory->>LockFactory: build TokenLockFactoryDynamoDb (builder)
    LockFactory->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
    Factory-->>App: Bookkeeper (components ready)
    App->>Factory: close()
    Factory->>Offset: close()
    Factory->>Journal: close()
    Factory->>LockFactory: close()
    Offset->>Dynamo: client.close()
    Journal->>Dynamo: client.close()
    LockFactory->>Dynamo: client.close()
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Poem

🐇 I dug a tunnel into tables bright,
DynamoDB hummed through day and night,
Locks and journals tucked in rows,
Builders bloom where the prefix grows,
AutoCloseable — tidy rabbit delight!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding DynamoDB support for bookkeeping storage including schemas, locks, journal, and offset information.
Linked Issues check ✅ Passed The PR fully implements the feature objective from #181: DynamoDB is now available as a storage option for bookkeeping, schemas, locks, journal, and offset information, with necessary configuration, implementations, and documentation.
Out of Scope Changes check ✅ Passed All changes are within scope. Updates to traits (AutoCloseable) are necessary to support resource management for new DynamoDB implementations. No unrelated modifications were introduced.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/181-synamodb-as-bookkeeping

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

Unit Test Coverage

Overall Project 77.91% -7.97%
Files changed 10.12%

Module Coverage
pramen:core Jacoco Report 79.05% -8.74%
Files
Module File Coverage
pramen:core Jacoco Report Journal.scala 100% 🍏
MetadataManagerNull.scala 100% 🍏
MetadataManagerJdbc.scala 95.38% 🍏
OffsetManagerCached.scala 95.02% -1.25%
OffsetManagerJdbc.scala 88.9% -0.13%
BookkeeperConfig.scala 87.77% -10.31%
Bookkeeper.scala 73.28% -20.61%
TokenLockFactoryDynamoDb.scala 18.87% -82.11%
JournalDynamoDB.scala 16.3% -84.23%
MetadataManagerDynamoDb.scala 10.12% -90.37%
BookkeeperDynamoDb.scala 6.84% -93.28%
OffsetManagerDynamoDb.scala 5.38% -94.93%
TokenLockDynamoDb.scala 0%

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala (1)

36-92: 🛠️ Refactor suggestion | 🟠 Major

Don’t hide offset-manager cleanup behind a trait-level no-op.

OffsetManagerDynamoDb now has real shutdown logic, while OffsetManagerJdbc will silently inherit this default. Keeping close() abstract here—or forcing explicit no-op overrides where ownership is delegated—makes resource semantics intentional instead of accidental.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala`
around lines 36 - 92, The trait OffsetManager currently provides a no-op default
close() implementation which hides required cleanup; remove the default override
def close(): Unit = {} from the OffsetManager trait so close() is abstract, and
then implement explicit close() methods in concrete classes (e.g.,
OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up
or intentionally no-oped; update the implementations to compile by adding
close() where missing.
pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala (1)

21-74: 🛠️ Refactor suggestion | 🟠 Major

Keep close() explicit on resource-owning implementations.

Making close() a concrete no-op here removes the compiler check that concrete managers must opt into their lifecycle semantics. With pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerBase.scala:1-50 sitting under both JDBC and DynamoDB managers, a missing override now silently degrades into “do nothing” instead of failing fast at compile time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala` around
lines 21 - 74, The trait MetadataManager currently provides a concrete no-op
close() which prevents the compiler from forcing resource-owning implementations
to declare lifecycle behavior; change the signature in MetadataManager from
"override def close(): Unit = {}" to an abstract "def close(): Unit" (i.e.
remove the default body/override) so concrete classes must explicitly
implement/override close; also update any implementations (e.g. classes
extending MetadataManagerBase under JDBC/DynamoDB) that rely on the implicit
no-op to add an explicit close implementation (even if it's a no-op) to make
lifecycle intent explicit.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)

236-243: ⚠️ Potential issue | 🔴 Critical

Add bookkeeper.close() to the aggregate closer.

The closable block at lines 236-243 should also call bookkeeper.close(). Concrete implementations like BookkeeperDynamoDb and BookkeeperJdbc have meaningful close() logic that manages critical resources (DynamoDB client, database connections). Without this call, these resources will leak when closable.close() is invoked.

Relevant code
val closable = new AutoCloseable {
  override def close(): Unit = {
    mongoDbConnection.foreach(_.close())
    dbOpt.foreach(_.close())
    tokenFactory.close()
    journal.close()
    metadataManager.close()
    bookkeeper.close()  // Add this line
  }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`
around lines 236 - 243, The closable AutoCloseable currently omits calling
bookkeeper.close(), which leaks resources for implementations like
BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable
implementation (closable) to invoke bookkeeper.close() inside its close() method
after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(),
journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete
resources are released.
🧹 Nitpick comments (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala (1)

25-97: These tests only prove the fluent API compiles.

Each positive case asserts isInstanceOf[MetadataManagerDynamoDbBuilder], which the method signatures already guarantee. Because none of them call build(), malformed values like the ARN on Line 43 still pass and the default-prefix / endpoint / credentials wiring stays untested.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala`
around lines 25 - 97, The tests only check that the fluent builder methods
compile; fix them to exercise build() and assert resulting instance
configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters
(withRegion, withTableArn, withTablePrefix, withCredentialsProvider,
withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or
its accessible config) contains the expected region, tableArn/tablePrefix,
endpoint and credentials provider values; keep the existing negative test that
intercepts build() without region but change the positive cases to call build()
and verify actual wiring rather than isInstanceOf checks.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)

105-112: Use the lock factory’s own default prefix constant here.

Line 106 is building TokenLockFactoryDynamoDb but falls back to BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX. If those defaults ever diverge, the lock backend will quietly start using a different default naming convention from the one defined by the lock implementation itself.

Suggested fix
-        val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX)
+        val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`
around lines 105 - 112, The code currently falls back to
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb,
which can diverge from the lock factory's own default; change the fallback to
use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the
getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on
bookkeepingConfig.dynamoDbTablePrefix with
getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the
TokenLockFactoryDynamoDb builder uses its native default prefix.
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala (1)

25-97: These assertions don’t exercise the journal builder.

The positive cases only check the static return type, so they won’t catch broken default-prefix wiring or malformed inputs such as the ARN on Line 43. Prefer one or two end-to-end build() smoke tests and keep the negative validation case.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala`
around lines 25 - 97, Replace the superficial type-only assertions with one or
two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify
real behavior: e.g. construct a builder via
JournalDynamoDB.builder.withRegion(...) (and optionally
withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call
build(), then assert the resulting JournalDynamoDB instance has the expected
tablePrefix (default when not provided), correctly parsed tableArn handling, and
any endpoint/credentials set; keep the existing negative test that intercepts
IllegalArgumentException from builder.build() when region is missing. Use the
builder methods (withRegion, withTableArn, withTablePrefix,
withCredentialsProvider, withEndpoint) and the build() method to locate where to
change tests.
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala (2)

142-164: Consider adding owner verification in updateTicket.

The updateTicket method updates the expiration without verifying ownership. If another process has taken over the lock (e.g., after expiration), this could inadvertently extend the other process's lock. Adding a condition expression would make this safer:

🛡️ Suggested improvement
       val updateRequest = UpdateItemRequest.builder()
         .tableName(tableName)
         .key(Map(
           ATTR_TOKEN -> AttributeValue.builder().s(escapedToken).build()
         ).asJava)
         .updateExpression(s"SET $ATTR_EXPIRES = :expires")
+        .conditionExpression(s"$ATTR_OWNER = :owner")
         .expressionAttributeValues(Map(
-          ":expires" -> AttributeValue.builder().n(newTicket.toString).build()
+          ":expires" -> AttributeValue.builder().n(newTicket.toString).build(),
+          ":owner" -> AttributeValue.builder().s(owner).build()
         ).asJava)
         .build()

This prevents a rare race condition where a lock expires, is taken by another process, and the original holder's watcher thread updates the expiration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`
around lines 142 - 164, The updateTicket method currently unconditionally sets
ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and
the UpdateItemRequest built there) to include a condition expression that
verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to
the current owner/id) so the update only succeeds if the lock is still held by
this process, and add handling for the conditional-failure case (do not treat it
as success; log or ignore the ConditionalCheckFailedException) so the watcher
doesn't silently extend someone else's lock; refer to updateTicket,
getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.

178-185: Add null-safety check for required DynamoDB item attributes.

The code assumes ATTR_EXPIRES and ATTR_OWNER will always be present in the item. If the DynamoDB item is malformed (e.g., created by a different version or manually edited), accessing these attributes could throw NullPointerException.

Consider adding defensive null checks:

🛡️ Proposed defensive fix
       if (response.hasItem && !response.item().isEmpty) {
         val item = response.item()
+        val tokenAttr = item.get(ATTR_TOKEN)
+        val ownerAttr = item.get(ATTR_OWNER)
+        val expiresAttr = item.get(ATTR_EXPIRES)
+        
+        if (tokenAttr == null || ownerAttr == null || expiresAttr == null) {
+          log.warn(s"Malformed lock item for $escapedToken - missing required attributes")
+          return None
+        }
+        
         Some(LockTicket(
-          token = item.get(ATTR_TOKEN).s(),
-          owner = item.get(ATTR_OWNER).s(),
-          expires = item.get(ATTR_EXPIRES).n().toLong,
+          token = tokenAttr.s(),
+          owner = ownerAttr.s(),
+          expires = expiresAttr.n().toLong,
           createdAt = Option(item.get(ATTR_CREATED_AT)).map(_.n().toLong)
         ))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`
around lines 178 - 185, The DynamoDB item handling in TokenLockDynamoDb assumes
ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive
null-safety checks before constructing the LockTicket: verify item.containsKey
or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER
and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required
attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep
current Option mapping), and if required attrs are missing log a warning and
return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pramen/core/pom.xml`:
- Around line 147-151: The Maven dependency for the AWS SDK DynamoDB
(groupId=software.amazon.awssdk, artifactId=dynamodb) is missing a scope and
defaults to compile, causing Maven builds to bundle the SDK; update that
dependency entry to include <scope>provided</scope> so it matches the SBT
Dependencies.scala Provided scope and prevents packaging the AWS SDK into the
artifact.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 495-499: The BookkeeperDynamoDb.deleteTable currently throws
UnsupportedOperationException which violates the Bookkeeper contract; implement
wildcard deletion inside BookkeeperDynamoDb.deleteTable by scanning the DynamoDB
bookkeeper table for items whose table/name attribute matches the provided
wildcard (convert wildcard to a prefix/regex or Dynamo-compatible
FilterExpression), paginate through Scan results, delete matching items using
DeleteItem or BatchWriteItem for efficiency, collect and return the deleted item
names as Seq[String], and ensure proper error handling and logging;
alternatively, if you prefer to disallow wildcards for DynamoDB, move this check
to the BookkeeperDynamoDb constructor/factory and throw a configuration-time
IllegalArgumentException with a clear message so callers fail fast.
- Around line 66-68: The init() method currently creates/checks tables using
bookkeepingTableBaseName and schemaTableBaseName which mismatches the resolved
names used by all CRUDs; update init() to use the resolved private vals
bookkeepingTableName and schemaTableName instead (replace usages of
bookkeepingTableBaseName and schemaTableBaseName in init()), ensuring any calls
that pass names into ensureTable/describe/createTable use bookkeepingTableName
and schemaTableName so the created tables match the identifiers used by the CRUD
methods.
- Around line 141-165: The createBookkeepingTable function currently builds a
DynamoDB key schema with ATTR_TABLE_NAME (HASH) and ATTR_INFO_DATE (RANGE),
which enforces one item per table/date and causes overwrites; modify
createBookkeepingTable to include ATTR_BATCH_ID as an additional sort key
component (or change the sort key to a composite that includes ATTR_INFO_DATE
and ATTR_BATCH_ID) so the primary key uniquely identifies (tableName, infoDate,
batchId). Update attributeDefinitions to include ATTR_BATCH_ID with
ScalarAttributeType.S and adjust any code that assumes the old two-part key
accordingly (references: createBookkeepingTable, ATTR_TABLE_NAME,
ATTR_INFO_DATE, ATTR_BATCH_ID).
- Around line 838-848: The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 123-126: The composite primary key built in startWriteOffsets uses
createdAt.toEpochMilli (createdAtMilli) which can collide for concurrent writes;
change the key generation so it cannot repeat within the same millisecond—for
example, stop reducing Instant to millis and instead incorporate the full
Instant string (createdAt.toString which includes nanoseconds) or append a short
random/UUID suffix to compositeKey (e.g., use createdAt and a UUID) so
compositeKey is unique for concurrent calls to startWriteOffsets and prevents
silent overwrites/races.
- Around line 301-317: getOffsetRecords and getMaximumInfoDate currently query
DynamoDB without handling pagination and use a filterExpression on
ATTR_INFO_DATE, which can miss results when a partition >1MB; update both
methods (getOffsetRecords, getMaximumInfoDate) to iterate pages using the
QueryResponse.lastEvaluatedKey and repeatedly call dynamoDbClient.query until
lastEvaluatedKey is empty, accumulating result.items(); also change the query
keyConditionExpression to use begins_with on the composite sort key (e.g.,
compositeKey begins_with "<infoDate>#") instead of filtering by ATTR_INFO_DATE
to limit data scanned (use expressionAttributeValues for the begins_with prefix)
and remove the post-query filter. Ensure QueryRequest.builder() is built
per-page with ExclusiveStartKey when present and that itemToOffsetRecord is
applied to all accumulated items.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 78-80: The current JournalDynamoDB primary key uses
(ATTR_JOB_NAME, ATTR_FINISHED_AT) causing collisions and preventing efficient
time-range reads; change the write path that builds items (where itemBuilder is
populated and putItem is called) to include a unique per-entry identifier (e.g.,
UUID or nano-timestamp) as a new attribute and include that identifier in the
primary key so simultaneous finishes do not overwrite each other; add a Global
Secondary Index on ATTR_FINISHED_AT (or a numeric finishedAt attribute) so
getEntries can perform a Query over finishedAt ranges instead of a full table
Scan; where getEntries currently performs an unpaginated ScanRequest, replace it
with either a paginated Scan loop that uses ExclusiveStartKey/LastEvaluatedKey
to fetch all pages or — preferably — change getEntries to Query the new GSI with
KeyConditionExpression for the finishedAt range and handle Query pagination
similarly.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 51-53: TokenLockFactoryDynamoDb builds locksTableName via
BookkeeperDynamoDb.getFullTableName(tableArn, locksTableBaseName) but subsequent
DynamoDB calls still use locksTableBaseName, which breaks cross-account/region
behavior; update all usages in TokenLockFactoryDynamoDb (including table
creation/reads/writes around the init/ensure block and the sections currently
referencing locksTableBaseName at lines ~58–60 and ~83–85) to use locksTableName
instead so that all table operations consistently use the resolved identifier
returned by BookkeeperDynamoDb.getFullTableName.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 83-102: getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.

In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf`:
- Around line 125-132: The IAM policy Resource array is missing several
Pramen-created tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.

In `@pramen/examples/dynamodb_bookkeeping/README.md`:
- Line 437: The README references a non-existent example file
TokenLockFactoryDynamoDbExample.scala; update the link to point to the actual
implementation TokenLockFactoryDynamoDb.scala (or remove the reference
entirely). Open pramen/examples/dynamodb_bookkeeping/README.md, find the line
mentioning
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala
and replace it with
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
(or delete the sentence if no example reference is desired) so the documentation
points to an existing symbol/class.

In `@pramen/project/Versions.scala`:
- Line 40: The awsSdkVersion constant in Versions.scala is set to a non-existent
Maven version; update the value of awsSdkVersion to a valid released AWS SDK v2
artifact (e.g., "2.34.0" or a later available version) and ensure any places
that reference Versions.awsSdkVersion (dependency declarations or build files)
will pick up the new value so dependency resolution succeeds.

In `@README.md`:
- Around line 2592-2598: The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.

---

Outside diff comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala`:
- Around line 21-74: The trait MetadataManager currently provides a concrete
no-op close() which prevents the compiler from forcing resource-owning
implementations to declare lifecycle behavior; change the signature in
MetadataManager from "override def close(): Unit = {}" to an abstract "def
close(): Unit" (i.e. remove the default body/override) so concrete classes must
explicitly implement/override close; also update any implementations (e.g.
classes extending MetadataManagerBase under JDBC/DynamoDB) that rely on the
implicit no-op to add an explicit close implementation (even if it's a no-op) to
make lifecycle intent explicit.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 236-243: The closable AutoCloseable currently omits calling
bookkeeper.close(), which leaks resources for implementations like
BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable
implementation (closable) to invoke bookkeeper.close() inside its close() method
after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(),
journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete
resources are released.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala`:
- Around line 36-92: The trait OffsetManager currently provides a no-op default
close() implementation which hides required cleanup; remove the default override
def close(): Unit = {} from the OffsetManager trait so close() is abstract, and
then implement explicit close() methods in concrete classes (e.g.,
OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up
or intentionally no-oped; update the implementations to compile by adding
close() where missing.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 105-112: The code currently falls back to
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb,
which can diverge from the lock factory's own default; change the fallback to
use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the
getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on
bookkeepingConfig.dynamoDbTablePrefix with
getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the
TokenLockFactoryDynamoDb builder uses its native default prefix.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`:
- Around line 142-164: The updateTicket method currently unconditionally sets
ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and
the UpdateItemRequest built there) to include a condition expression that
verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to
the current owner/id) so the update only succeeds if the lock is still held by
this process, and add handling for the conditional-failure case (do not treat it
as success; log or ignore the ConditionalCheckFailedException) so the watcher
doesn't silently extend someone else's lock; refer to updateTicket,
getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.
- Around line 178-185: The DynamoDB item handling in TokenLockDynamoDb assumes
ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive
null-safety checks before constructing the LockTicket: verify item.containsKey
or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER
and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required
attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep
current Option mapping), and if required attrs are missing log a warning and
return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala`:
- Around line 25-97: Replace the superficial type-only assertions with one or
two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify
real behavior: e.g. construct a builder via
JournalDynamoDB.builder.withRegion(...) (and optionally
withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call
build(), then assert the resulting JournalDynamoDB instance has the expected
tablePrefix (default when not provided), correctly parsed tableArn handling, and
any endpoint/credentials set; keep the existing negative test that intercepts
IllegalArgumentException from builder.build() when region is missing. Use the
builder methods (withRegion, withTableArn, withTablePrefix,
withCredentialsProvider, withEndpoint) and the build() method to locate where to
change tests.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala`:
- Around line 25-97: The tests only check that the fluent builder methods
compile; fix them to exercise build() and assert resulting instance
configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters
(withRegion, withTableArn, withTablePrefix, withCredentialsProvider,
withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or
its accessible config) contains the expected region, tableArn/tablePrefix,
endpoint and credentials provider values; keep the existing negative test that
intercepts build() without region but change the positive cases to call build()
and verify actual wiring rather than isInstanceOf checks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a93de128-e535-4c03-9bcd-8d89e08cbee7

📥 Commits

Reviewing files that changed from the base of the PR and between 5b38e8b and 143c6bc.

📒 Files selected for processing (30)
  • README.md
  • pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLockFactory.scala
  • pramen/core/pom.xml
  • pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContext.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/BookkeepingConfigFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerDynamoDbBuilderSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockFactoryDynamoDbBuilderSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala
  • pramen/examples/dynamodb_bookkeeping/README.md
  • pramen/examples/dynamodb_bookkeeping/dynamodb_bookkeeping.conf
  • pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scala
  • pramen/pom.xml
  • pramen/project/Dependencies.scala
  • pramen/project/Versions.scala

Comment on lines +838 to +848
def getFullTableName(tableArn: Option[String], tableName: String): String = {
tableArn match {
case Some(arn) if arn.nonEmpty =>
// If ARN ends with table/, append the table name, otherwise append /table/tableName
if (arn.endsWith("/")) {
s"${arn}table/$tableName"
} else if (arn.contains("/table/")) {
arn // ARN already includes table path
} else {
s"$arn/table/$tableName"
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "BookkeeperDynamoDb.scala" -type f

Repository: AbsaOSS/pramen

Length of output: 145


🏁 Script executed:

find . -name "*test*" -o -name "*Test*" | grep -i dynamo | head -20

Repository: AbsaOSS/pramen

Length of output: 40


🏁 Script executed:

cd pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper && head -n 860 BookkeeperDynamoDb.scala | tail -n 50

Repository: AbsaOSS/pramen

Length of output: 1549


🌐 Web query:

DynamoDB ARN format specification AWS

💡 Result:

The ARN format specification for DynamoDB resources follows the general AWS ARN structure: arn:partition:dynamodb:region:account-id:resource-type/resource-id. Key DynamoDB resource types and their formats: - Table: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection - Stream: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/stream/{StreamLabel} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection/stream/2024-02-12T18:57:26.492 StreamLabel is a timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmm). - Index: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/index/{IndexName} - Backup: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/backup/{BackupName} - Global Table: Supported via AWS::DynamoDB::GlobalTable in CloudFormation, follows similar table ARN pattern. Partition is typically 'aws' (standard), 'aws-cn' (China), or 'aws-us-gov' (GovCloud). Region, account-id (12-digit without hyphens), and resource identifiers are specific to the resource. These formats are used in IAM policies, API calls, and resource references. Examples appear consistently across AWS docs, CloudFormation, IAM policies, and API references.

Citations:


🏁 Script executed:

find . -path "*test*" -name "*BookkeeperDynamoDb*" -o -path "*test*" -name "*Bookkeeper*Dynamo*"

Repository: AbsaOSS/pramen

Length of output: 163


🏁 Script executed:

rg "getFullTableName" -n

Repository: AbsaOSS/pramen

Length of output: 8271


🏁 Script executed:

rg "tableArn" -n -A 3 -B 3 | head -100

Repository: AbsaOSS/pramen

Length of output: 11519


🏁 Script executed:

sed -n '130,155p' pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala

Repository: AbsaOSS/pramen

Length of output: 805


🏁 Script executed:

cat pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala | sed -n '130,160p'

Repository: AbsaOSS/pramen

Length of output: 805


🏁 Script executed:

rg "locksTableName|journalTableFullName|metadataTableFullName|bookkeepingTableName|schemaTableName" -A 5 -B 2 | head -150

Repository: AbsaOSS/pramen

Length of output: 17556


🏁 Script executed:

sed -n '100,120p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala

Repository: AbsaOSS/pramen

Length of output: 1141


🏁 Script executed:

sed -n '45,75p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 851


getFullTableName builds invalid DynamoDB ARNs.

The method's logic is flawed for handling DynamoDB ARN prefixes. When input ends with / (e.g., arn:aws:dynamodb:us-east-1:123456789012:table/), it appends table/ producing the malformed result arn:aws:dynamodb:us-east-1:123456789012:table/table/<name>. Additionally, the check for /table/ substring does not align with standard DynamoDB ARN format which uses :table/ (colon-slash), making the logic fragile and error-prone.

This breaks ARN-based configuration for every manager (MetadataManagerDynamoDb, TokenLockFactoryDynamoDb, JournalDynamoDB, OffsetManagerDynamoDb) that depends on this helper to construct valid table identifiers for DynamoDB API calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 838 - 848, The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.

Comment on lines +83 to +102
override def getMetadataFromStorage(tableName: String, infoDate: LocalDate): Map[String, MetadataValue] = {
try {
val compositeKey = s"$tableName#$infoDate"

val queryRequest = QueryRequest.builder()
.tableName(metadataTableFullName)
.keyConditionExpression(s"${MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY} = :composite_key")
.expressionAttributeValues(Map(
":composite_key" -> AttributeValue.builder().s(compositeKey).build()
).asJava)
.build()

val result = dynamoDbClient.query(queryRequest)

result.items().asScala.map { item =>
val key = item.get(MetadataManagerDynamoDb.ATTR_METADATA_KEY).s()
val value = item.get(MetadataManagerDynamoDb.ATTR_METADATA_VALUE).s()
val lastUpdated = Instant.ofEpochSecond(item.get(MetadataManagerDynamoDb.ATTR_LAST_UPDATED).n().toLong)
key -> MetadataValue(value, lastUpdated)
}.toMap
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala | sed -n '80,170p'

Repository: AbsaOSS/pramen

Length of output: 4532


🏁 Script executed:

rg -n "getMetadataFromStorage|deleteMetadataFromStorage" pramen/core/src/main/scala/ -A 3 -B 1 | head -80

Repository: AbsaOSS/pramen

Length of output: 9502


Add pagination to getMetadataFromStorage and deleteMetadataFromStorage.

The query at line 95 in getMetadataFromStorage(tableName, infoDate) and the query at line 164 in deleteMetadataFromStorage(tableName, infoDate) do not loop on lastEvaluatedKey. If a (tableName, infoDate) partition exceeds DynamoDB's 1 MB default page size, getMetadataFromStorage returns incomplete metadata and deleteMetadataFromStorage leaves stale items without warning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 83 - 102, getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.

Comment on lines +125 to +132
# "Resource": [
# "arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
# "arn:aws:dynamodb:*:*:table/pramen_production_schemas",
# "arn:aws:dynamodb:*:*:table/pramen_production_locks"
# ]
# }
# ]
# }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

IAM policy example is incomplete - missing tables.

The IAM policy Resource list only includes 3 tables (bookkeeping, schemas, locks) but the documentation above mentions 5+ tables that are automatically created: journal, metadata, and offsets. Users following this example may encounter permission errors.

Consider updating to include all tables:

           "Resource": [
             "arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
             "arn:aws:dynamodb:*:*:table/pramen_production_schemas",
-            "arn:aws:dynamodb:*:*:table/pramen_production_locks"
+            "arn:aws:dynamodb:*:*:table/pramen_production_locks",
+            "arn:aws:dynamodb:*:*:table/pramen_production_journal",
+            "arn:aws:dynamodb:*:*:table/pramen_production_metadata",
+            "arn:aws:dynamodb:*:*:table/pramen_production_offsets"
           ]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf` around lines
125 - 132, The IAM policy Resource array is missing several Pramen-created
tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.

val scalatestVersion = "3.2.14"
val mockitoVersion = "2.28.2"
val httpClientVersion = "4.5.14"
val awsSdkVersion = "2.42.23"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if AWS SDK version 2.42.23 exists on Maven Central

echo "=== Checking AWS SDK DynamoDB version on Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[] | {version: .latestVersion}'

echo ""
echo "=== Check for security advisories ==="
gh api graphql -f query='
{
  securityVulnerabilities(first: 5, ecosystem: MAVEN, package: "software.amazon.awssdk:dynamodb") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
      firstPatchedVersion {
        identifier
      }
    }
  }
}'

Repository: AbsaOSS/pramen

Length of output: 342


🏁 Script executed:

#!/bin/bash
# Query for specific version 2.42.23 and surrounding versions
echo "=== Checking if AWS SDK DynamoDB 2.42.23 exists ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:2.42.23&rows=1&wt=json" | jq '.response'

echo ""
echo "=== Listing all versions in 2.40+ range ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:[2.40+TO+2.50]&rows=20&wt=json" | jq '.response.docs[] | {version: .v}' | sort -u

echo ""
echo "=== Latest 10 versions available ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=10&wt=json" | jq -r '.response.docs[] | .v' | sort -V | tail -10

Repository: AbsaOSS/pramen

Length of output: 245


🏁 Script executed:

#!/bin/bash
# Get actual available versions
echo "=== Latest 15 AWS SDK DynamoDB versions ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=100&wt=json" | jq -r '.response.docs | sort_by(.v) | reverse | .[0:15] | .[] | .v'

Repository: AbsaOSS/pramen

Length of output: 106


🏁 Script executed:

#!/bin/bash
# Get the complete JSON response first
echo "=== AWS SDK DynamoDB versions from Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=50&wt=json" | jq '.response.docs[0:15] | map({version: .v, timestamp: .timestamp}) | sort_by(.version) | reverse'

Repository: AbsaOSS/pramen

Length of output: 174


🏁 Script executed:

#!/bin/bash
# Get raw response structure
echo "=== Raw Maven Central response structure ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[0]'

Repository: AbsaOSS/pramen

Length of output: 530


🏁 Script executed:

#!/bin/bash
# Verify the actual version in the Versions.scala file
cat pramen/project/Versions.scala | grep -A 2 -B 2 "awsSdkVersion"

Repository: AbsaOSS/pramen

Length of output: 218


AWS SDK version 2.42.23 does not exist in Maven Central.

The specified version does not exist; the latest available version is 2.34.0. This will cause a build failure when the dependency cannot be resolved. Update to an available version, such as 2.34.0 or later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/project/Versions.scala` at line 40, The awsSdkVersion constant in
Versions.scala is set to a non-existent Maven version; update the value of
awsSdkVersion to a valid released AWS SDK v2 artifact (e.g., "2.34.0" or a later
available version) and ensure any places that reference Versions.awsSdkVersion
(dependency declarations or build files) will pick up the new value so
dependency resolution succeeds.

Comment on lines +2592 to +2598
```xml
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the Maven example copy-pasteable.

${aws.sdk.version} is defined in this repository’s POM, not in a downstream user’s project, so the snippet on Line 2596 will fail as pasted. Either inline the version here or show the matching <properties> block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 2592 - 2598, The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala (1)

990-1000: ⚠️ Potential issue | 🟠 Major

getFullTableName() still mis-normalizes already resolved table ARNs.

DynamoDB table ARNs use the ...:table/<name> form, and the APIs this class calls accept a full table ARN in TableName. If tableArn is already resolved to that form, or if it is an ARN prefix without the trailing table/, the current branches produce malformed identifiers such as ...:table/<existing>/table/<name> or .../table/<name>. Treat :table/<name> as terminal and only append when the input really ends with the table/ prefix. (docs.aws.amazon.com)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 990 - 1000, getFullTableName currently appends "table/<name>" even
when the provided ARN already contains a full table segment (":table/<name>"),
producing duplicates; update BookkeeperDynamoDb.getFullTableName to first detect
a terminal table ARN (e.g. matches pattern like ".*:table/<non-slash-chars>$" or
contains ":table/" followed by a table name) and return it unchanged, otherwise
keep the existing normalization logic: if arn ends with "table/" append
tableName, else if arn ends with "/" append "table/<tableName>", else append
"/table/<tableName>".
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala (1)

40-43: Keep the default tablePrefix in one place.

TokenLockFactoryDynamoDb hardcodes "pramen" while the builder uses BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX. They match today, but the public constructor and builder will drift on the next constant change. Reuse the shared constant in both paths.

Suggested fix
 class TokenLockFactoryDynamoDb(
     dynamoDbClient: DynamoDbClient,
     tableArn: Option[String] = None,
-    tablePrefix: String = "pramen"
+    tablePrefix: String = BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX
 ) extends TokenLockFactory with AutoCloseable {

Also applies to: 203-208

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 40 - 43, The constructor for TokenLockFactoryDynamoDb currently
hardcodes the default tablePrefix ("pramen") which can drift from the shared
constant; change the default parameter to reuse
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX instead of the literal string and apply
the same replacement where the literal is used around the
TokenLockFactoryDynamoDb-related methods (e.g., other constructors/factory
methods near the class, lines referencing tablePrefix) so both the public
constructor and the builder consistently use
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 503-517: The deleteTable method in BookkeeperDynamoDb currently
removes bookkeeping, schema and offsets but omits metadata records, leaving
backend-specific state behind; update deleteTable (the method overriding
deleteTable) to also remove metadata records similar to BookkeeperJdbc by
invoking the DynamoDB metadata deletion routine (e.g., add a call like
deleteTableFromMetadata(tableName) or
MetadataManagerDynamoDb.deleteAllMetadata(tableName, dynamoDbClient)), capture
the returned count/result and append a corresponding message to the results
ListBuffer so the method reports metadata deletion alongside bookkeeping, schema
and offsets.
- Around line 515-516: The call in BookkeeperDynamoDb to
OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) uses the raw
default name and can delete the wrong namespace; resolve the actual offsets
table using the same resolution logic that considers tablePrefix and tableArn
and pass that resolved name into OffsetManagerDynamoDb.deleteAllOffsets instead.
Concretely, in BookkeeperDynamoDb replace the direct use of tableName with the
value produced by OffsetManagerDynamoDb's resolution helper (use the
resolveOffsetsTableName / equivalent that accepts tablePrefix and tableArn) and
then call OffsetManagerDynamoDb.deleteAllOffsets(resolvedTableName,
dynamoDbClient).
- Around line 89-102: The table-initialization has a race: replace the unchecked
createTable calls by catching AWS ResourceInUseException inside
createBookkeepingTable and createSchemaTable (the functions invoked after
tableExists), so when dynamoDbClient.createTable(...) throws
ResourceInUseException you swallow it and continue by calling
waitForTableActive(...) for that table; keep existing logging but ensure any
other exceptions still propagate. Use the existing symbols
createBookkeepingTable, createSchemaTable, tableExists and waitForTableActive to
locate where to add the try/catch around the createTable invocation.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Line 56: The constructor currently calls init() which can throw and leak the
builder-created DynamoDbClient; remove the init() call from the constructor and
instead invoke init() inside the build() method (for TokenLockFactoryDynamoDb
and the analogous builder at lines ~296-302), wrapping the init() call in a
try/catch so that if init() throws you close the DynamoDbClient only when it was
created by the builder (i.e., track whether the client was supplied externally
or created in build()), call client.close() on failure, and then rethrow the
exception; ensure externally supplied clients are never closed by this cleanup
path.
- Around line 126-147: The createLocksTable method can throw a
ResourceInUseException when another process is concurrently creating the same
DynamoDB table; update createLocksTable (in TokenLockFactoryDynamoDb) to catch
the AWS SDK's ResourceInUseException around
dynamoDbClient.createTable(createTableRequest), ignore that specific exception
and continue to call waitForTableActive(tableName), while rethrowing or letting
other exceptions propagate normally so only the "table already being
created/exists" case is swallowed.
- Around line 177-180: The current catch in TokenLockFactoryDynamoDb.scala
swallows all NonFatal exceptions during DescribeTable; change it to only catch
the AWS ResourceNotFoundException
(software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException) and
treat that as the transient case (log.warn and Thread.sleep to retry), while
immediately rethrowing any other exceptions (including NonFatal others) so
credential/permission/validation errors surface; update imports accordingly and
keep the surrounding retry loop logic intact.

---

Duplicate comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 990-1000: getFullTableName currently appends "table/<name>" even
when the provided ARN already contains a full table segment (":table/<name>"),
producing duplicates; update BookkeeperDynamoDb.getFullTableName to first detect
a terminal table ARN (e.g. matches pattern like ".*:table/<non-slash-chars>$" or
contains ":table/" followed by a table name) and return it unchanged, otherwise
keep the existing normalization logic: if arn ends with "table/" append
tableName, else if arn ends with "/" append "table/<tableName>", else append
"/table/<tableName>".

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 40-43: The constructor for TokenLockFactoryDynamoDb currently
hardcodes the default tablePrefix ("pramen") which can drift from the shared
constant; change the default parameter to reuse
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX instead of the literal string and apply
the same replacement where the literal is used around the
TokenLockFactoryDynamoDb-related methods (e.g., other constructors/factory
methods near the class, lines referencing tablePrefix) so both the public
constructor and the builder consistently use
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: de80f2a8-dbfe-428a-9935-893528d64f7e

📥 Commits

Reviewing files that changed from the base of the PR and between 143c6bc and d464934.

📒 Files selected for processing (10)
  • pramen/core/pom.xml
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala
  • pramen/examples/dynamodb_bookkeeping/README.md
  • pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf
✅ Files skipped from review due to trivial changes (3)
  • pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala
  • pramen/examples/dynamodb_bookkeeping/README.md
🚧 Files skipped from review as they are similar to previous changes (4)
  • pramen/core/pom.xml
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala

Comment on lines +89 to +102
// Initialize bookkeeping table
if (!tableExists(bookkeepingTableName)) {
log.info(s"Creating DynamoDB bookkeeping table: $bookkeepingTableName")
createBookkeepingTable(bookkeepingTableName)
log.info(s"Successfully created bookkeeping table: $bookkeepingTableName")
} else {
log.info(s"DynamoDB bookkeeping table already exists: $bookkeepingTableName")
}

// Initialize schema table
if (!tableExists(schemaTableName)) {
log.info(s"Creating DynamoDB schema table: $schemaTableName")
createSchemaTable(schemaTableName)
log.info(s"Successfully created schema table: $schemaTableName")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

❓ Verification inconclusive

Script executed:

cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala | head -120

Repository: AbsaOSS/pramen


Repository: AbsaOSS/pramen
Exit code: 0

stdout:

     1	/*
     2	 * Copyright 2022 ABSA Group Limited
     3	 *
     4	 * Licensed under the Apache License, Version 2.0 (the "License");
     5	 * you may not use this file except in compliance with the License.
     6	 * You may obtain a copy of the License at
     7	 *
     8	 *     http://www.apache.org/licenses/LICENSE-2.0
     9	 *
    10	 * Unless required by applicable law or agreed to in writing, software
    11	 * distributed under the License is distributed on an "AS IS" BASIS,
    12	 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	 * See the License for the specific language governing permissions and
    14	 * limitations under the License.
    15	 */
    16	
    17	package za.co.absa.pramen.core.bookkeeper
    18	
    19	import org.apache.spark.sql.types.StructType
    20	import org.slf4j.LoggerFactory
    21	import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
    22	import software.amazon.awssdk.regions.Region
    23	import software.amazon.awssdk.services.dynamodb.DynamoDbClient
    24	import software.amazon.awssdk.services.dynamodb.model._
    25	import za.co.absa.pramen.core.bookkeeper.model.DataAvailability
    26	import za.co.absa.pramen.core.model.{DataChunk, TableSchema}
    27	import za.co.absa.pramen.core.utils.{AlgorithmUtils, TimeUtils}
    28	
    29	import java.net.URI
    30	import java.time.LocalDate
    31	import scala.collection.JavaConverters._
    32	import scala.util.control.NonFatal
    33	
    34	/**
    35	  * DynamoDB-based implementation of the Bookkeeper.
    36	  *
    37	  * Table schema for bookkeeping:
    38	  * - Partition key: tableName (String)
    39	  * - Sort key: infoDateSortKey (String in "yyyy-MM-dd#jobFinishedMillis" format)
    40	  *   The composite sort key allows multiple entries for the same table and date.
    41	  *
    42	  * Table schema for schemas:
    43	  * - Partition key: tableName (String)
    44	  * - Sort key: infoDate (String in yyyy-MM-dd format)
    45	  *
    46	  * `@param` dynamoDbClient The DynamoDB client to use for operations
    47	  * `@param` batchId The batch ID for this execution
    48	  * `@param` tableArn Optional ARN prefix for DynamoDB tables (e.g., "arn:aws:dynamodb:region:account-id:table/")
    49	  * `@param` tablePrefix Prefix for table names to allow multiple bookkeeping sets in the same account (default: "pramen")
    50	  */
    51	class BookkeeperDynamoDb private (
    52	    dynamoDbClient: DynamoDbClient,
    53	    batchId: Long,
    54	    tableArn: Option[String] = None,
    55	    tablePrefix: String = BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX
    56	) extends BookkeeperBase(isBookkeepingEnabled = true, batchId) {
    57	
    58	  import BookkeeperDynamoDb._
    59	
    60	  private val log = LoggerFactory.getLogger(this.getClass)
    61	  private val queryWarningTimeoutMs = 10000L
    62	
    63	  // Construct table names with prefix
    64	  private val bookkeepingTableBaseName = s"${tablePrefix}_$DEFAULT_BOOKKEEPING_TABLE"
    65	  private val schemaTableBaseName = s"${tablePrefix}_$DEFAULT_SCHEMA_TABLE"
    66	
    67	  // Full table names/ARNs
    68	  private val bookkeepingTableName = getFullTableName(tableArn, bookkeepingTableBaseName)
    69	  private val schemaTableName = getFullTableName(tableArn, schemaTableBaseName)
    70	
    71	  // Offset management
    72	  private val offsetManagement = new OffsetManagerCached(
    73	    new OffsetManagerDynamoDb(dynamoDbClient, batchId, tableArn, tablePrefix, closesClient = false)
    74	  )
    75	
    76	  // Initialize tables on construction
    77	  init()
    78	
    79	  override val bookkeepingEnabled: Boolean = true
    80	
    81	  /**
    82	    * Initializes the DynamoDB tables for bookkeeping and schemas.
    83	    * Checks if tables exist and creates them if they don't.
    84	    */
    85	  def init(): Unit = {
    86	    try {
    87	      log.info(s"Initializing DynamoDB bookkeeper with tables: bookkeeping='$bookkeepingTableName', schemas='$schemaTableName'")
    88	
    89	      // Initialize bookkeeping table
    90	      if (!tableExists(bookkeepingTableName)) {
    91	        log.info(s"Creating DynamoDB bookkeeping table: $bookkeepingTableName")
    92	        createBookkeepingTable(bookkeepingTableName)
    93	        log.info(s"Successfully created bookkeeping table: $bookkeepingTableName")
    94	      } else {
    95	        log.info(s"DynamoDB bookkeeping table already exists: $bookkeepingTableName")
    96	      }
    97	
    98	      // Initialize schema table
    99	      if (!tableExists(schemaTableName)) {
   100	        log.info(s"Creating DynamoDB schema table: $schemaTableName")
   101	        createSchemaTable(schemaTableName)
   102	        log.info(s"Successfully created schema table: $schemaTableName")
   103	      } else {
   104	        log.info(s"DynamoDB schema table already exists: $schemaTableName")
   105	      }
   106	
   107	      log.info(s"DynamoDB bookkeeper initialization complete")
   108	    } catch {
   109	      case NonFatal(ex) =>
   110	        log.error("Error initializing DynamoDB bookkeeper tables", ex)
   111	        throw new RuntimeException("Failed to initialize DynamoDB bookkeeper", ex)
   112	    }
   113	  }
   114	
   115	  /**
   116	    * Checks if a DynamoDB table exists.
   117	    *
   118	    * `@param` tableName The name of the table to check
   119	    * `@return` true if the table exists, false otherwise
   120	    */

Script executed:

# Find the implementations of createBookkeepingTable and createSchemaTable
rg -n "def createBookkeepingTable|def createSchemaTable|def tableExists|def waitForTableActive" pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala -A 15

Repository: AbsaOSS/pramen


Repository: AbsaOSS/pramen
Exit code: 0

stdout:

121:  private def tableExists(tableName: String): Boolean = {
122-    try {
123-      val describeRequest = DescribeTableRequest.builder()
124-        .tableName(tableName)
125-        .build()
126-
127-      dynamoDbClient.describeTable(describeRequest)
128-      true
129-    } catch {
130-      case _: ResourceNotFoundException => false
131-      case NonFatal(ex) =>
132-        log.warn(s"Error checking if table exists: $tableName", ex)
133-        throw ex
134-    }
135-  }
136-
--
143:  private def createBookkeepingTable(tableName: String): Unit = {
144-    val createTableRequest = CreateTableRequest.builder()
145-      .tableName(tableName)
146-      .keySchema(
147-        KeySchemaElement.builder()
148-          .attributeName(ATTR_TABLE_NAME)
149-          .keyType(KeyType.HASH)
150-          .build(),
151-        KeySchemaElement.builder()
152-          .attributeName(ATTR_INFO_DATE_SORT_KEY)
153-          .keyType(KeyType.RANGE)
154-          .build()
155-      )
156-      .attributeDefinitions(
157-        AttributeDefinition.builder()
158-          .attributeName(ATTR_TABLE_NAME)
--
180:  private def createSchemaTable(tableName: String): Unit = {
181-    val createTableRequest = CreateTableRequest.builder()
182-      .tableName(tableName)
183-      .keySchema(
184-        KeySchemaElement.builder()
185-          .attributeName(ATTR_TABLE_NAME)
186-          .keyType(KeyType.HASH)
187-          .build(),
188-        KeySchemaElement.builder()
189-          .attributeName(ATTR_INFO_DATE)
190-          .keyType(KeyType.RANGE)
191-          .build()
192-      )
193-      .attributeDefinitions(
194-        AttributeDefinition.builder()
195-          .attributeName(ATTR_TABLE_NAME)
--
218:  private def waitForTableActive(tableName: String, maxWaitSeconds: Int = 60): Unit = {
219-    val startTime = System.currentTimeMillis()
220-    val maxWaitMs = maxWaitSeconds * 1000L
221-
222-    var tableActive = false
223-    while (!tableActive && (System.currentTimeMillis() - startTime) < maxWaitMs) {
224-      try {
225-        val describeRequest = DescribeTableRequest.builder()
226-          .tableName(tableName)
227-          .build()
228-
229-        val response = dynamoDbClient.describeTable(describeRequest)
230-        val status = response.table().tableStatus()
231-
232-        if (status == TableStatus.ACTIVE) {
233-          tableActive = true

Script executed:

# Check if there are any try-catch blocks around the create calls
rg -n "try|catch|ResourceInUseException" pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala

Repository: AbsaOSS/pramen


Repository: AbsaOSS/pramen
Exit code: 0

stdout:

86:    try {
108:    } catch {
122:    try {
129:    } catch {
224:      try {
239:      } catch {
252:    try {
288:    } catch {
296:    try {
320:    } catch {
328:    try {
360:    } catch {
368:    try {
386:    } catch {
394:    try {
406:    } catch {
422:    try {
438:    } catch {
446:    try {
483:            try {
485:            } catch {
496:    } catch {
504:    try {
521:    } catch {
564:        try {
567:        } catch {
615:        try {
618:        } catch {
631:    try {
656:    } catch {
664:    try {
678:    } catch {
690:    try {
694:    } catch {

Script executed:

sed -n '143,179p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala

Repository: AbsaOSS/pramen


Repository: AbsaOSS/pramen
Exit code: 0

stdout:

  private def createBookkeepingTable(tableName: String): Unit = {
    val createTableRequest = CreateTableRequest.builder()
      .tableName(tableName)
      .keySchema(
        KeySchemaElement.builder()
          .attributeName(ATTR_TABLE_NAME)
          .keyType(KeyType.HASH)
          .build(),
        KeySchemaElement.builder()
          .attributeName(ATTR_INFO_DATE_SORT_KEY)
          .keyType(KeyType.RANGE)
          .build()
      )
      .attributeDefinitions(
        AttributeDefinition.builder()
          .attributeName(ATTR_TABLE_NAME)
          .attributeType(ScalarAttributeType.S)
          .build(),
        AttributeDefinition.builder()
          .attributeName(ATTR_INFO_DATE_SORT_KEY)
          .attributeType(ScalarAttributeType.S)
          .build()
      )
      .billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
      .build()

    dynamoDbClient.createTable(createTableRequest)

    // Wait for table to become active
    waitForTableActive(tableName)
  }

  /**
    * Creates the schema table with the appropriate schema.
    *
    * `@param` tableName The name of the table to create
    */

Script executed:

sed -n '180,217p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala

Repository: AbsaOSS/pramen


Repository: AbsaOSS/pramen
Exit code: 0

stdout:

  private def createSchemaTable(tableName: String): Unit = {
    val createTableRequest = CreateTableRequest.builder()
      .tableName(tableName)
      .keySchema(
        KeySchemaElement.builder()
          .attributeName(ATTR_TABLE_NAME)
          .keyType(KeyType.HASH)
          .build(),
        KeySchemaElement.builder()
          .attributeName(ATTR_INFO_DATE)
          .keyType(KeyType.RANGE)
          .build()
      )
      .attributeDefinitions(
        AttributeDefinition.builder()
          .attributeName(ATTR_TABLE_NAME)
          .attributeType(ScalarAttributeType.S)
          .build(),
        AttributeDefinition.builder()
          .attributeName(ATTR_INFO_DATE)
          .attributeType(ScalarAttributeType.S)
          .build()
      )
      .billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
      .build()

    dynamoDbClient.createTable(createTableRequest)

    // Wait for table to become active
    waitForTableActive(tableName)
  }

  /**
    * Waits for a table to become active after creation.
    *
    * `@param` tableName The name of the table to wait for
    * `@param` maxWaitSeconds Maximum time to wait in seconds (default: 60)
    */

Catch ResourceInUseException during concurrent table initialization.

The check-then-create pattern on lines 90–92 and 99–101 has a race condition. When two jobs initialize the backend concurrently, both can pass the tableExists() check before either calls createTable(). The second job's createTable() call will then throw ResourceInUseException, which propagates uncaught to the outer try-catch in init() and causes initialization to fail even though the table already exists or is being created.

Wrap the dynamoDbClient.createTable() calls in createBookkeepingTable() and createSchemaTable() to catch ResourceInUseException, then proceed to waitForTableActive(). This follows DynamoDB's documented behavior where CreateTable is asynchronous and returns that exception when the table already exists or is already being created.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 89 - 102, The table-initialization has a race: replace the
unchecked createTable calls by catching AWS ResourceInUseException inside
createBookkeepingTable and createSchemaTable (the functions invoked after
tableExists), so when dynamoDbClient.createTable(...) throws
ResourceInUseException you swallow it and continue by calling
waitForTableActive(...) for that table; keep existing logging but ensure any
other exceptions still propagate. Use the existing symbols
createBookkeepingTable, createSchemaTable, tableExists and waitForTableActive to
locate where to add the try/catch around the createTable invocation.

Comment on lines +503 to +517
override def deleteTable(tableName: String): Seq[String] = {
try {
val results = scala.collection.mutable.ListBuffer[String]()

// Delete from bookkeeping table
val bookkeepingCount = deleteTableFromBookkeeping(tableName)
results += s"Deleted $bookkeepingCount bookkeeping records for table '$tableName'"

// Delete from schema table
val schemaCount = deleteTableFromSchemas(tableName)
results += s"Deleted $schemaCount schema records for table '$tableName'"

// Delete offsets
val offsetResults = OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient)
results += s"Deleted $offsetResults offset records for table '$tableName'"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

deleteTable() still leaves metadata behind.

This clears bookkeeping, schemas, and offsets only. The JDBC backend also clears metadata as part of the same operation, so Dynamo mode now leaves backend-specific state behind for the same logical table. Based on learnings, BookkeeperJdbc's deleteTable method intentionally spans BookkeepingRecords, SchemaRecords, OffsetRecords, and MetadataRecords, with partial deletion accepted for later retries.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 503 - 517, The deleteTable method in BookkeeperDynamoDb currently
removes bookkeeping, schema and offsets but omits metadata records, leaving
backend-specific state behind; update deleteTable (the method overriding
deleteTable) to also remove metadata records similar to BookkeeperJdbc by
invoking the DynamoDB metadata deletion routine (e.g., add a call like
deleteTableFromMetadata(tableName) or
MetadataManagerDynamoDb.deleteAllMetadata(tableName, dynamoDbClient)), capture
the returned count/result and append a corresponding message to the results
ListBuffer so the method reports metadata deletion alongside bookkeeping, schema
and offsets.

Comment on lines +515 to +516
// Delete offsets
val offsetResults = OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Delete offsets from the resolved DynamoDB table, not the default one.

Line 516 calls OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient), but that helper resolves the offset table with DEFAULT_TABLE_PREFIX and None for tableArn in pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala:610-650. With a custom tablePrefix or tableArn, deleteTable() will miss the real offsets and can delete another namespace's offsets instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 515 - 516, The call in BookkeeperDynamoDb to
OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) uses the raw
default name and can delete the wrong namespace; resolve the actual offsets
table using the same resolution logic that considers tablePrefix and tableArn
and pass that resolved name into OffsetManagerDynamoDb.deleteAllOffsets instead.
Concretely, in BookkeeperDynamoDb replace the direct use of tableName with the
value produced by OffsetManagerDynamoDb's resolution helper (use the
resolveOffsetsTableName / equivalent that accepts tablePrefix and tableArn) and
then call OffsetManagerDynamoDb.deleteAllOffsets(resolvedTableName,
dynamoDbClient).

private val locksTableName = BookkeeperDynamoDb.getFullTableName(tableArn, locksTableBaseName)

// Initialize table on construction
init()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the builder-owned client when constructor-side init fails.

Line 56 performs network initialization during construction. If init() throws, build() leaks the freshly created DynamoDbClient. Handle the cleanup in build() rather than in the constructor so externally supplied clients are not closed unexpectedly.

Suggested fix
       val client = clientBuilder.build()
 
-      new TokenLockFactoryDynamoDb(
-        dynamoDbClient = client,
-        tableArn = tableArn,
-        tablePrefix = tablePrefix
-      )
+      try {
+        new TokenLockFactoryDynamoDb(
+          dynamoDbClient = client,
+          tableArn = tableArn,
+          tablePrefix = tablePrefix
+        )
+      } catch {
+        case NonFatal(ex) =>
+          try {
+            client.close()
+          } catch {
+            case NonFatal(closeEx) => ex.addSuppressed(closeEx)
+          }
+          throw ex
+      }

Also applies to: 296-302

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
at line 56, The constructor currently calls init() which can throw and leak the
builder-created DynamoDbClient; remove the init() call from the constructor and
instead invoke init() inside the build() method (for TokenLockFactoryDynamoDb
and the analogous builder at lines ~296-302), wrapping the init() call in a
try/catch so that if init() throws you close the DynamoDbClient only when it was
created by the builder (i.e., track whether the client was supplied externally
or created in build()), call client.close() on failure, and then rethrow the
exception; ensure externally supplied clients are never closed by this cleanup
path.

Comment on lines +126 to +147
private def createLocksTable(tableName: String): Unit = {
val createTableRequest = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder()
.attributeName(ATTR_TOKEN)
.keyType(KeyType.HASH)
.build()
)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(ATTR_TOKEN)
.attributeType(ScalarAttributeType.S)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
.build()

dynamoDbClient.createTable(createTableRequest)

// Wait for table to become active
waitForTableActive(tableName)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 11989


🏁 Script executed:

# Check what exceptions are imported
rg "import.*exception|import.*Exception" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala -i

# Search for ResourceInUseException usage in the codebase
rg "ResourceInUseException" --type scala

# Search for other DynamoDB exception handling patterns
rg "catch.*Exception|catch.*:.*=>" pramen/core/src/main/scala -A 2 -B 2 --type scala | head -50

Repository: AbsaOSS/pramen

Length of output: 40


🏁 Script executed:

# Search for all exception handling in DynamoDB-related files
find . -name "*.scala" -path "*/lock/*" -o -path "*/dynamodb/*" | xargs grep -l "catch" 2>/dev/null | head -5

# Look for other DynamoDB client usages
rg "dynamoDbClient\." pramen/core/src/main/scala -A 2 --type scala | head -40

Repository: AbsaOSS/pramen

Length of output: 4149


🏁 Script executed:

# Check MetadataManagerDynamoDb for table creation exception handling
rg "createTable|ResourceInUseException|ResourceNotFoundException" pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala -B 3 -A 5 --type scala | head -60

Repository: AbsaOSS/pramen

Length of output: 698


🌐 Web query:

AWS SDK v2 DynamoDB ResourceInUseException Java Scala exception

💡 Result:

In the AWS SDK for Java v2 (usable from Scala), ResourceInUseException from DynamoDB generally means the resource is not in a usable state yet—most commonly:

  • You tried to create a table that already exists, or re-create a table name that’s still being deleted.
  • You tried to delete or update a table while it’s in CREATING or UPDATING (or it’s still DELETING after DeleteTable). [1], [3]
  • Less common: for on-demand tables, DynamoDB can return ResourceInUseException for very high request rates on some data-plane operations. [1]

What to do (best practice)

  1. Wait for the table to reach the right state (instead of immediately retrying blindly). Use SDK v2 waiters:
    • waitUntilTableExists after CreateTable
    • waitUntilTableNotExists after DeleteTable (so you can safely re-create) [2]

Scala example (AWS SDK for Java v2)

import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model._
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter

val ddb = DynamoDbClient.create()
val waiter = DynamoDbWaiter.create()

try {
  ddb.createTable(CreateTableRequest.builder()
    .tableName("my_table")
    // ... key schema, attrs, billing, etc.
    .build()
  )

  waiter.waitUntilTableExists(b => b.tableName("my_table"))  // wait for ACTIVE
} catch {
  case e: ResourceInUseException =>
    // table name already in use / still deleting / table not in a valid state
    // typically: describe + wait, or backoff + retry, depending on your workflow
    throw e
}

Quick diagnostic tip

Call DescribeTable and check tableStatus; if it’s CREATING, UPDATING, or DELETING, you need to wait before the next step. This is exactly the situation that triggers ResourceInUseException in many workflows. [1], [3]

Sources: DynamoDB error meaning and examples [1], SDK for Java v2 waiters (incl. DynamoDB table wait example) [2], explanation of failures when re-creating during deletion [3].


🏁 Script executed:

# Check if ResourceInUseException is available in the AWS SDK model package
rg "class ResourceInUseException|ResourceInUseException" pramen/ --type scala

# Look at the import statement more carefully
rg "software.amazon.awssdk.services.dynamodb.model" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 113


🏁 Script executed:

# Check the builder.build() method for exception handling
sed -n '282,303p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 654


Catch ResourceInUseException to handle concurrent table creation.

CreateTable is asynchronous, and AWS DynamoDB throws ResourceInUseException when attempting to create a table that already exists or is still in a transient state. With the current check-then-create pattern (lines 83–85), concurrent initializers can trigger this exception and abort startup even though the table is already being created or exists. Catch the exception and proceed to waitForTableActive(tableName) to allow the waiter to detect the table's ACTIVE state.

Suggested fix
   private def createLocksTable(tableName: String): Unit = {
     val createTableRequest = CreateTableRequest.builder()
       .tableName(tableName)
       .keySchema(
         KeySchemaElement.builder()
           .attributeName(ATTR_TOKEN)
           .keyType(KeyType.HASH)
           .build()
       )
       .attributeDefinitions(
         AttributeDefinition.builder()
           .attributeName(ATTR_TOKEN)
           .attributeType(ScalarAttributeType.S)
           .build()
       )
       .billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
       .build()
 
-    dynamoDbClient.createTable(createTableRequest)
+    try {
+      dynamoDbClient.createTable(createTableRequest)
+    } catch {
+      case _: ResourceInUseException =>
+        log.info(s"DynamoDB locks table is already being created: $tableName")
+    }
 
     // Wait for table to become active
     waitForTableActive(tableName)
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 126 - 147, The createLocksTable method can throw a
ResourceInUseException when another process is concurrently creating the same
DynamoDB table; update createLocksTable (in TokenLockFactoryDynamoDb) to catch
the AWS SDK's ResourceInUseException around
dynamoDbClient.createTable(createTableRequest), ignore that specific exception
and continue to call waitForTableActive(tableName), while rethrowing or letting
other exceptions propagate normally so only the "table already being
created/exists" case is swallowed.

Comment on lines +177 to +180
} catch {
case NonFatal(ex) =>
log.warn(s"Error checking table status for $tableName", ex)
Thread.sleep(2000)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's see the full method context around lines 177-180
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala | sed -n '160,200p'

Repository: AbsaOSS/pramen

Length of output: 1641


🏁 Script executed:

# Let's also check the imports to understand what AWS SDK is being used
head -50 pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 1862


🏁 Script executed:

# Check for any ResourceNotFoundException imports or usage in the file
rg "ResourceNotFoundException|import.*exception|catch.*Exception" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala -A 2 -B 2

Repository: AbsaOSS/pramen

Length of output: 226


🏁 Script executed:

# Check what DynamoDB exceptions are available in AWS SDK v2
web_search "AWS SDK v2 DynamoDB DescribeTable transient exceptions"

Repository: AbsaOSS/pramen

Length of output: 108


🏁 Script executed:

# Also verify what exceptions are thrown by describeTable in AWS SDK v2
web_search "software.amazon.awssdk DynamoDB describeTable ResourceNotFoundException"

Repository: AbsaOSS/pramen

Length of output: 108


🏁 Script executed:

# Check if there are any other exception catches or handling patterns in the file
rg "case.*:" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala | head -20

Repository: AbsaOSS/pramen

Length of output: 106


🏁 Script executed:

# Verify the full context of the method signature and where it's called
sed -n '155,187p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala

Repository: AbsaOSS/pramen

Length of output: 1194


🌐 Web query:

AWS SDK 2.0 Java DynamoDB DescribeTable ResourceNotFoundException exception

💡 Result:

In AWS SDK for Java v2, DynamoDbClient.describeTable(...) can throw ResourceNotFoundException for a few common reasons:

  1. You’re calling DescribeTable too soon after CreateTable
    DescribeTable is eventually consistent; right after creating a table, DynamoDB may temporarily return ResourceNotFoundException. Wait a few seconds and retry, or use the SDK waiter. [1]
    Example (SDK v2 waiter):
dynamoDbClient.waiter().waitUntilTableExists(b -> b.tableName(tableName));

The enhanced client docs explicitly recommend waitUntilTableExists(...) because table creation is asynchronous. [2]

  1. Wrong AWS Region and/or AWS account (credentials)
    DynamoDB tables are regional. If your client is configured for the wrong region or is using credentials for a different account, the table will “not exist” from that client’s perspective, resulting in ResourceNotFoundException. AWS’s guidance is to confirm the correct Account and Region when you see this error. [3]

  2. Resource name/identifier mismatch
    DescribeTable requires TableName and AWS notes you can also pass the table ARN there (works in AWS, but some local emulators may not fully support ARNs). [1] Ensure the exact table name/ARN you pass matches what exists in that region/account.

  3. Table exists but isn’t ACTIVE yet
    The SDK model for ResourceNotFoundException notes the resource may be unspecified/incorrect or its status may not be ACTIVE. [4]

If you paste your DynamoDbClient builder (region/endpoint/credentials) and the exact table name (and whether you’re using AWS, LocalStack, or dynamodb-local), I can pinpoint which of the above is most likely in your case.

Sources: [1] [2] [3] [4]


🌐 Web query:

AWS DynamoDB DescribeTable immediate post-CreateTable visibility delay ResourceNotFoundException

💡 Result:

Yes—this can be expected behavior in DynamoDB.

  • CreateTable is asynchronous: it returns immediately with TableStatus = CREATING, and only later transitions to ACTIVE (at which point the table is fully ready) [1].
  • DescribeTable is eventually consistent for metadata: if you call DescribeTable too soon after CreateTable, DynamoDB can return ResourceNotFoundException because the table metadata may not be visible yet [2]. (Even after it becomes visible, updated values may lag briefly for the same reason [2].)

What to do (recommended)

Wait/retry until the table exists and becomes ACTIVE, instead of calling DescribeTable just once.

  • AWS CLI:

    aws dynamodb wait table-exists --table-name MyTable

    This polls DescribeTable until TableStatus is ACTIVE [3].

  • Boto3 (Python):

    waiter = dynamodb_client.get_waiter('table_exists')
    waiter.wait(TableName='MyTable')

    This polls describe_table() until successful (or times out) [4].

If it keeps happening “too long”

Also confirm you’re using the same AWS account and region for both calls (a region mismatch also produces ResourceNotFoundException, but that’s not a “delay” issue).


Sources

  1. AWS DynamoDB API Reference – CreateTable returns immediately with TableStatus=CREATING, later ACTIVE [1]
  2. DynamoDB Developer Guide / API docs – DescribeTable is eventually consistent; too-early calls can return ResourceNotFoundException [2]
  3. AWS CLI wait table-exists – waits until DescribeTable shows ACTIVE [3]
  4. Boto3 waiter TableExists – polls describe_table() until the table exists [4]

Retry only the transient ResourceNotFoundException you expect after CreateTable.

AWS documents that DescribeTable is eventually consistent and can return ResourceNotFoundException immediately after CreateTable before table metadata becomes visible. However, catching all NonFatal exceptions here will also swallow unrelated errors—such as credential issues, access denied, or validation errors—and turn them into generic timeouts, making the real startup error much harder to diagnose. Catch ResourceNotFoundException specifically to retry the expected transient case, and rethrow other failures immediately. (AWS DynamoDB API Reference)

Suggested fix
       } catch {
-        case NonFatal(ex) =>
-          log.warn(s"Error checking table status for $tableName", ex)
-          Thread.sleep(2000)
+        case _: ResourceNotFoundException =>
+          log.debug(s"Table $tableName metadata is not visible yet, waiting...")
+          Thread.sleep(2000)
+        case NonFatal(ex) =>
+          throw ex
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch {
case NonFatal(ex) =>
log.warn(s"Error checking table status for $tableName", ex)
Thread.sleep(2000)
} catch {
case _: ResourceNotFoundException =>
log.debug(s"Table $tableName metadata is not visible yet, waiting...")
Thread.sleep(2000)
case NonFatal(ex) =>
throw ex
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 177 - 180, The current catch in TokenLockFactoryDynamoDb.scala
swallows all NonFatal exceptions during DescribeTable; change it to only catch
the AWS ResourceNotFoundException
(software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException) and
treat that as the transient case (log.warn and Thread.sleep to retry), while
immediately rethrowing any other exceptions (including NonFatal others) so
credential/permission/validation errors surface; update imports accordingly and
keep the surrounding retry loop logic intact.

yruslan added 2 commits April 7, 2026 12:57
…ger and OffsetManager traits to ensure the logic is defined by the implementation.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala (2)

129-140: ⚠️ Potential issue | 🟠 Major

Scan pagination is missing.

scan() only returns one page. Once the journal exceeds a single page, getEntries() silently drops the remaining matching items. Iterate on lastEvaluatedKey() until exhaustion, or switch this to a paginated query on a time-based index.

Suggested fallback if you keep Scan for now
-      val result = dynamoDbClient.scan(scanRequest)
-
-      result.items().asScala.map { item =>
+      `@annotation.tailrec`
+      def scanAll(
+        startKey: java.util.Map[String, AttributeValue],
+        acc: Vector[java.util.Map[String, AttributeValue]]
+      ): Vector[java.util.Map[String, AttributeValue]] = {
+        val requestBuilder = ScanRequest.builder()
+          .tableName(journalTableFullName)
+          .filterExpression(s"${JournalDynamoDB.ATTR_FINISHED_AT} >= :from_time AND ${JournalDynamoDB.ATTR_FINISHED_AT} <= :to_time")
+          .expressionAttributeValues(Map(
+            ":from_time" -> AttributeValue.builder().n(fromSec.toString).build(),
+            ":to_time" -> AttributeValue.builder().n(toSec.toString).build()
+          ).asJava)
+
+        Option(startKey).filterNot(_.isEmpty).foreach(requestBuilder.exclusiveStartKey)
+
+        val response = dynamoDbClient.scan(requestBuilder.build())
+        val nextAcc = acc ++ response.items().asScala
+        val nextKey = response.lastEvaluatedKey()
+
+        if (nextKey == null || nextKey.isEmpty) nextAcc
+        else scanAll(nextKey, nextAcc)
+      }
+
+      scanAll(null, Vector.empty).map { item =>
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 129 - 140, The ScanRequest usage in getEntries() (building
ScanRequest with tableName=journalTableFullName and filterExpression using
ATTR_FINISHED_AT) calls dynamoDbClient.scan once and ignores pagination, so
results beyond the first page are dropped; fix by iterating using the
LastEvaluatedKey from the ScanResponse (or switch to a Query on a time-based
index) — repeatedly call dynamoDbClient.scan with ExclusiveStartKey until
lastEvaluatedKey is empty, accumulating items into the result list returned by
getEntries(), ensuring expressionAttributeValues and the same filterExpression
are reused for each paginated request.

79-82: ⚠️ Potential issue | 🔴 Critical

Composite key can overwrite distinct completions.

This table is keyed only by (jobName, finishedAt), so two runs of the same job finishing in the same second collapse to the same item and putItem() replaces the earlier entry. It also leaves getEntries() without a key shape that supports cross-job time-range reads efficiently. Add a unique per-entry key component and index the time field for range queries.

Also applies to: 110-115, 204-228

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 79 - 82, The current primary key uses only
JournalDynamoDB.ATTR_JOB_NAME and JournalDynamoDB.ATTR_FINISHED_AT which allows
collisions when two runs finish in the same second and prevents efficient
cross-job time-range queries; change the item key to include a unique per-entry
component (e.g., add JournalDynamoDB.ATTR_ENTRY_ID and make the primary key
(ATTR_JOB_NAME, ATTR_ENTRY_ID) or append a high-resolution unique suffix to
ATTR_FINISHED_AT) so putItem() won’t overwrite concurrent entries, and update
getEntries() to read using a GSI on JournalDynamoDB.ATTR_FINISHED_AT (create a
GSI that projects the attributes needed) to support efficient cross-job range
reads; adjust any code that builds items (where ATTR_JOB_NAME and
ATTR_FINISHED_AT are set) and any DynamoDB key construction logic to include the
new ATTR_ENTRY_ID and GSI usage.
pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala (1)

84-103: ⚠️ Potential issue | 🟠 Major

Still missing DynamoDB query pagination.

Lines 96 and 165 each execute a single QueryRequest and ignore lastEvaluatedKey. Once a (tableName, infoDate) partition grows past DynamoDB's 1 MB page boundary, bulk reads return incomplete metadata and bulk deletes leave stale rows behind.

Also applies to: 152-171

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 84 - 103, getMetadataFromStorage in MetadataManagerDynamoDb
performs a single QueryRequest and ignores pagination; update it to page through
DynamoDB results by looping: make the QueryRequest (the one built with
QueryRequest.builder() and used with dynamoDbClient.query), collect items into a
mutable buffer, check the response.lastEvaluatedKey()/isEmpty, and if present
set ExclusiveStartKey on the next QueryRequest and repeat until no more pages,
then map accumulated items to the Map[String, MetadataValue]; apply the same
pagination loop fix to the other QueryRequest usage around the delete/bulk-read
logic referenced near the second dynamoDbClient.query (around the code at the
other occurrence) so both reads handle lastEvaluatedKey correctly.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala (3)

135-138: ⚠️ Potential issue | 🟠 Major

Millisecond precision in composite key can cause collisions for concurrent writes.

When two startWriteOffsets calls occur for the same table and infoDate within the same millisecond, they generate identical compositeKey values (infoDate#createdAtMilli). The second PutItem silently overwrites the first, causing data loss. Consider appending a UUID suffix or using the full nanosecond timestamp.

🔧 Proposed fix to ensure uniqueness
+import java.util.UUID
+
 override def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = {
   val createdAt = Instant.now()
   val createdAtMilli = createdAt.toEpochMilli
-  val compositeKey = s"${infoDate.toString}#${createdAtMilli}"
+  val uniqueSuffix = UUID.randomUUID().toString.take(8)
+  val compositeKey = s"${infoDate.toString}#${createdAtMilli}#${uniqueSuffix}"

Note: This requires corresponding changes to commitOffsets, commitRerun, and rollbackOffsets to reconstruct the key consistently, likely by storing the full compositeKey in DataOffsetRequest or by adding the suffix to createdAt.

,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 135 - 138, startWriteOffsets currently builds compositeKey using
infoDate and millisecond precision createdAtMilli which can collide; change
compositeKey generation in startWriteOffsets to include a uniqueness suffix
(e.g., append a UUID or use nanosecond timestamp) and propagate that full
compositeKey into the returned DataOffsetRequest so downstream methods can
reconstruct keys reliably. Update commitOffsets, commitRerun, and
rollbackOffsets to read the stored compositeKey from DataOffsetRequest (or use
the stored unique createdAt) when performing Put/Get/Delete operations so they
use the exact same key format; ensure DataOffsetRequest contains the new field
(fullCompositeKey or uniqueCreatedAt) and update all usages accordingly.

313-334: ⚠️ Potential issue | 🟠 Major

Missing pagination in getOffsetRecords can silently drop results.

This method doesn't handle DynamoDB's 1MB result limit. When a partition exceeds this threshold, records beyond the first page are silently omitted. This affects multiple callers: getOffsets, getUncommittedOffsets, commitRerun, postCommittedRecords, and getMinMaxOffsets.

Additionally, using filterExpression on infoDate is inefficient since the sort key is infoDate#createdAtMilli. Using begins_with on the sort key would reduce data transfer.

🔧 Proposed fix with pagination and efficient key condition
 private[core] def getOffsetRecords(table: String, infoDate: LocalDate): Array[OffsetRecord] = {
   try {
-    val queryRequest = QueryRequest.builder()
-      .tableName(offsetTableFullName)
-      .keyConditionExpression(s"${ATTR_PRAMEN_TABLE_NAME} = :table_name")
-      .filterExpression(s"${ATTR_INFO_DATE} = :info_date")
-      .expressionAttributeValues(Map(
-        ":table_name" -> AttributeValue.builder().s(table).build(),
-        ":info_date" -> AttributeValue.builder().s(infoDate.toString).build()
-      ).asJava)
-      .build()
-
-    val result = dynamoDbClient.query(queryRequest)
-
-    result.items().asScala
-      .map(itemToOffsetRecord)
-      .toArray
+    var allItems = Seq.empty[java.util.Map[String, AttributeValue]]
+    var lastEvaluatedKey: java.util.Map[String, AttributeValue] = null
+    val infoDatePrefix = s"${infoDate.toString}#"
+
+    do {
+      val queryRequestBuilder = QueryRequest.builder()
+        .tableName(offsetTableFullName)
+        .keyConditionExpression(s"$ATTR_PRAMEN_TABLE_NAME = :table_name AND begins_with($ATTR_COMPOSITE_KEY, :prefix)")
+        .expressionAttributeValues(Map(
+          ":table_name" -> AttributeValue.builder().s(table).build(),
+          ":prefix" -> AttributeValue.builder().s(infoDatePrefix).build()
+        ).asJava)
+
+      if (lastEvaluatedKey != null) {
+        queryRequestBuilder.exclusiveStartKey(lastEvaluatedKey)
+      }
+
+      val result = dynamoDbClient.query(queryRequestBuilder.build())
+      allItems = allItems ++ result.items().asScala
+      lastEvaluatedKey = result.lastEvaluatedKey()
+    } while (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty)
+
+    allItems.map(itemToOffsetRecord).toArray
   } catch {

,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 313 - 334, getOffsetRecords currently issues a single QueryRequest
which can miss results due to DynamoDB pagination and inefficiently uses
filterExpression on ATTR_INFO_DATE; update getOffsetRecords to (1) use a
keyConditionExpression combining ATTR_PRAMEN_TABLE_NAME = :table_name and
begins_with(sortKeyName, :info_date_prefix) (where sortKeyName is the actual
sort key like infoDate#createdAtMilli and :info_date_prefix is
infoDate.toString) instead of filterExpression, (2) page through results using
dynamoDbClient.queryPaginator or loop on QueryResponse.lastEvaluatedKey,
collecting and mapping all pages via itemToOffsetRecord into a single Array, and
(3) preserve current error handling and context (use offsetTableFullName in
error messages) so callers like getOffsets, getUncommittedOffsets, commitRerun,
postCommittedRecords, and getMinMaxOffsets receive complete results.

339-366: ⚠️ Potential issue | 🟠 Major

Missing pagination in getMaximumInfoDate can return incorrect results.

This method queries all items for a partition but doesn't handle DynamoDB's pagination. If there are more than 1MB of records, only the first page is processed, potentially returning an incorrect maximum date.

🔧 Proposed fix with pagination
 private[core] def getMaximumInfoDate(table: String): Option[LocalDate] = {
   try {
-    val queryRequest = QueryRequest.builder()
-      .tableName(offsetTableFullName)
-      .keyConditionExpression(s"${ATTR_PRAMEN_TABLE_NAME} = :table_name")
-      .expressionAttributeValues(Map(
-        ":table_name" -> AttributeValue.builder().s(table).build()
-      ).asJava)
-      .projectionExpression(ATTR_INFO_DATE)
-      .build()
-
-    val result = dynamoDbClient.query(queryRequest)
-
-    if (result.items().isEmpty) {
-      None
-    } else {
-      // Use maxBy with compareTo to avoid needing implicit Ordering
-      val maxInfoDate = result.items().asScala
-        .map(item => LocalDate.parse(item.get(ATTR_INFO_DATE).s()))
-        .maxBy(_.toEpochDay)
-
-      Some(maxInfoDate)
-    }
+    var maxDate: Option[LocalDate] = None
+    var lastEvaluatedKey: java.util.Map[String, AttributeValue] = null
+
+    do {
+      val queryRequestBuilder = QueryRequest.builder()
+        .tableName(offsetTableFullName)
+        .keyConditionExpression(s"$ATTR_PRAMEN_TABLE_NAME = :table_name")
+        .expressionAttributeValues(Map(
+          ":table_name" -> AttributeValue.builder().s(table).build()
+        ).asJava)
+        .projectionExpression(ATTR_INFO_DATE)
+
+      if (lastEvaluatedKey != null) {
+        queryRequestBuilder.exclusiveStartKey(lastEvaluatedKey)
+      }
+
+      val result = dynamoDbClient.query(queryRequestBuilder.build())
+      
+      if (!result.items().isEmpty) {
+        val pageMax = result.items().asScala
+          .map(item => LocalDate.parse(item.get(ATTR_INFO_DATE).s()))
+          .maxBy(_.toEpochDay)
+        maxDate = Some(maxDate.map(d => if (d.toEpochDay > pageMax.toEpochDay) d else pageMax).getOrElse(pageMax))
+      }
+      
+      lastEvaluatedKey = result.lastEvaluatedKey()
+    } while (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty)
+
+    maxDate
   } catch {

,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 339 - 366, getMaximumInfoDate currently only reads the first
DynamoDB query page and can miss later items; update getMaximumInfoDate to
paginate through all query pages (either via
dynamoDbClient.queryPaginator(queryRequest) or a loop using QueryRequest with
ExclusiveStartKey and checking QueryResponse.lastEvaluatedKey) and compute the
maximum LocalDate across all pages (parsing ATTR_INFO_DATE from each item)
before returning Some(max) or None when no items; keep existing error handling
and use the same ATTR_INFO_DATE, offsetTableFullName, and dynamoDbClient symbols
to locate and modify the logic.
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala (1)

67-74: Clarify client ownership semantics.

The close() method unconditionally closes the dynamoDbClient, but the class accepts an externally-provided client via its constructor. If a caller passes a shared client (not created by the builder), close() will shut it down unexpectedly.

Consider either:

  1. Tracking whether the client was builder-created and only closing owned clients, or
  2. Documenting that the factory takes ownership of the client and callers must not share it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 67 - 74, The close() method in TokenLockFactoryDynamoDb currently
always closes dynamoDbClient which can shutdown externally-provided/shared
clients; change the constructor and class to track client ownership (e.g. add a
boolean flag like clientOwned or wasClientCreated) and only call
dynamoDbClient.close() in close() when that flag is true, or alternatively
update the class API/docs to clearly state that TokenLockFactoryDynamoDb takes
ownership of the provided dynamoDbClient and callers must not share it; touch
the constructor and the close() method (and any builder that creates the client)
to set the ownership flag accordingly so only internally-created clients are
closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 576-579: The deleteAllOffsets method currently hardcodes
DEFAULT_TABLE_PREFIX and passes None for tableArn, which can target the wrong
table; change the signature of deleteAllOffsets to accept tableArn:
Option[String] and tablePrefix: String (or an Option[String] if you prefer) and
use those values when building offsetTableBaseName and calling
BookkeeperDynamoDb.getFullTableName instead of DEFAULT_TABLE_PREFIX/None; update
all call sites to pass the instance's tableArn and tablePrefix (or defaults) so
deleteAllOffsets operates on the intended table (refer to the deleteAllOffsets
method and BookkeeperDynamoDb.getFullTableName symbols).

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 109-119: The catch blocks around dynamoDbClient.putItem (the
NonFatal(ex) that currently calls log.error) must not swallow failures: change
the write path in JournalDynamoDB (the PutItemRequest / dynamoDbClient.putItem
block) to log the error and then propagate it (e.g., rethrow the exception or
return a failed Try/Either/Left) so callers can detect write failures; do the
same for the corresponding read path mentioned (the similar NonFatal at lines
173-176) so a failed read is distinguishable from "no entries"—use a consistent
failure type (e.g., JournalException or scala.util.Failure/Left) and update
method signatures/return types if necessary so callers can react.
- Around line 191-195: The TOCTOU between describeTable() and
createJournalTable() causes failures when two processes race to create the
table; update createJournalTable() (and any other create-table call sites around
JournalDynamoDB.describeTable/createJournalTable) to catch
ResourceInUseException and, instead of failing, poll DescribeTable until the
table status is ACTIVE (with backoff/retries and timeout) before returning;
leave the existing ResourceNotFoundException handling in describeTable() but
ensure createJournalTable() swallows ResourceInUseException and waits for ACTIVE
so concurrent bootstraps succeed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 298-317: The builder currently creates a DynamoDbClient (client)
and then calls new MetadataManagerDynamoDb(...), but if the constructor (which
runs DescribeTable/CreateTable) throws, the client is leaked; modify build() to
ensure the DynamoDbClient is closed on failure: wrap the construction of
MetadataManagerDynamoDb in a try { new MetadataManagerDynamoDb(dynamoDbClient =
client, tableArn = tableArn, tablePrefix = tablePrefix) } catch { case e =>
client.close(); throw e } (or equivalent finally logic) so the client is closed
if the constructor throws while still returning the manager when successful.
- Around line 181-230: The code assumes a successful DescribeTable means the
table is usable, but DynamoDB may return CREATING; update
createMetadataTableIfNotExists to call describeTable and inspect the table
status (from the DescribeTableResponse) and if it's not ACTIVE call
waitForTableActive(metadataTableFullName, dynamoDbClient) before proceeding;
also update createMetadataTable() to catch ResourceInUseException (another
process created the table) and in that catch call
waitForTableActive(metadataTableFullName, dynamoDbClient) instead of failing so
concurrent startups wait for ACTIVE state. Ensure you reference the methods
createMetadataTableIfNotExists, createMetadataTable, and waitForTableActive in
your changes.

---

Duplicate comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 135-138: startWriteOffsets currently builds compositeKey using
infoDate and millisecond precision createdAtMilli which can collide; change
compositeKey generation in startWriteOffsets to include a uniqueness suffix
(e.g., append a UUID or use nanosecond timestamp) and propagate that full
compositeKey into the returned DataOffsetRequest so downstream methods can
reconstruct keys reliably. Update commitOffsets, commitRerun, and
rollbackOffsets to read the stored compositeKey from DataOffsetRequest (or use
the stored unique createdAt) when performing Put/Get/Delete operations so they
use the exact same key format; ensure DataOffsetRequest contains the new field
(fullCompositeKey or uniqueCreatedAt) and update all usages accordingly.
- Around line 313-334: getOffsetRecords currently issues a single QueryRequest
which can miss results due to DynamoDB pagination and inefficiently uses
filterExpression on ATTR_INFO_DATE; update getOffsetRecords to (1) use a
keyConditionExpression combining ATTR_PRAMEN_TABLE_NAME = :table_name and
begins_with(sortKeyName, :info_date_prefix) (where sortKeyName is the actual
sort key like infoDate#createdAtMilli and :info_date_prefix is
infoDate.toString) instead of filterExpression, (2) page through results using
dynamoDbClient.queryPaginator or loop on QueryResponse.lastEvaluatedKey,
collecting and mapping all pages via itemToOffsetRecord into a single Array, and
(3) preserve current error handling and context (use offsetTableFullName in
error messages) so callers like getOffsets, getUncommittedOffsets, commitRerun,
postCommittedRecords, and getMinMaxOffsets receive complete results.
- Around line 339-366: getMaximumInfoDate currently only reads the first
DynamoDB query page and can miss later items; update getMaximumInfoDate to
paginate through all query pages (either via
dynamoDbClient.queryPaginator(queryRequest) or a loop using QueryRequest with
ExclusiveStartKey and checking QueryResponse.lastEvaluatedKey) and compute the
maximum LocalDate across all pages (parsing ATTR_INFO_DATE from each item)
before returning Some(max) or None when no items; keep existing error handling
and use the same ATTR_INFO_DATE, offsetTableFullName, and dynamoDbClient symbols
to locate and modify the logic.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 129-140: The ScanRequest usage in getEntries() (building
ScanRequest with tableName=journalTableFullName and filterExpression using
ATTR_FINISHED_AT) calls dynamoDbClient.scan once and ignores pagination, so
results beyond the first page are dropped; fix by iterating using the
LastEvaluatedKey from the ScanResponse (or switch to a Query on a time-based
index) — repeatedly call dynamoDbClient.scan with ExclusiveStartKey until
lastEvaluatedKey is empty, accumulating items into the result list returned by
getEntries(), ensuring expressionAttributeValues and the same filterExpression
are reused for each paginated request.
- Around line 79-82: The current primary key uses only
JournalDynamoDB.ATTR_JOB_NAME and JournalDynamoDB.ATTR_FINISHED_AT which allows
collisions when two runs finish in the same second and prevents efficient
cross-job time-range queries; change the item key to include a unique per-entry
component (e.g., add JournalDynamoDB.ATTR_ENTRY_ID and make the primary key
(ATTR_JOB_NAME, ATTR_ENTRY_ID) or append a high-resolution unique suffix to
ATTR_FINISHED_AT) so putItem() won’t overwrite concurrent entries, and update
getEntries() to read using a GSI on JournalDynamoDB.ATTR_FINISHED_AT (create a
GSI that projects the attributes needed) to support efficient cross-job range
reads; adjust any code that builds items (where ATTR_JOB_NAME and
ATTR_FINISHED_AT are set) and any DynamoDB key construction logic to include the
new ATTR_ENTRY_ID and GSI usage.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 84-103: getMetadataFromStorage in MetadataManagerDynamoDb performs
a single QueryRequest and ignores pagination; update it to page through DynamoDB
results by looping: make the QueryRequest (the one built with
QueryRequest.builder() and used with dynamoDbClient.query), collect items into a
mutable buffer, check the response.lastEvaluatedKey()/isEmpty, and if present
set ExclusiveStartKey on the next QueryRequest and repeat until no more pages,
then map accumulated items to the Map[String, MetadataValue]; apply the same
pagination loop fix to the other QueryRequest usage around the delete/bulk-read
logic referenced near the second dynamoDbClient.query (around the code at the
other occurrence) so both reads handle lastEvaluatedKey correctly.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 67-74: The close() method in TokenLockFactoryDynamoDb currently
always closes dynamoDbClient which can shutdown externally-provided/shared
clients; change the constructor and class to track client ownership (e.g. add a
boolean flag like clientOwned or wasClientCreated) and only call
dynamoDbClient.close() in close() when that flag is true, or alternatively
update the class API/docs to clearly state that TokenLockFactoryDynamoDb takes
ownership of the provided dynamoDbClient and callers must not share it; touch
the constructor and the close() method (and any builder that creates the client)
to set the ownership flag accordingly so only internally-created clients are
closed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e94bf69d-170a-4827-96a5-61085b0e0c62

📥 Commits

Reviewing files that changed from the base of the PR and between c06a442 and e21c22e.

📒 Files selected for processing (5)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala

Comment on lines +576 to +579
def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = {
val log = LoggerFactory.getLogger(this.getClass)
val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}"
val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

deleteAllOffsets ignores tableArn, may target wrong table.

This static method hardcodes DEFAULT_TABLE_PREFIX and passes None for tableArn (line 579). If the OffsetManagerDynamoDb instance was created with a custom tableArn or tablePrefix, this method will attempt to delete from a different table than expected.

Consider accepting tableArn and tablePrefix as parameters:

🔧 Proposed fix
-def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = {
+def deleteAllOffsets(
+    tableName: String,
+    dynamoDbClient: DynamoDbClient,
+    tableArn: Option[String] = None,
+    tablePrefix: String = DEFAULT_TABLE_PREFIX
+): Int = {
   val log = LoggerFactory.getLogger(this.getClass)
-  val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}"
-  val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName)
+  val offsetTableBaseName = s"${tablePrefix}_${DEFAULT_OFFSET_TABLE}"
+  val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, offsetTableBaseName)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = {
val log = LoggerFactory.getLogger(this.getClass)
val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}"
val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName)
def deleteAllOffsets(
tableName: String,
dynamoDbClient: DynamoDbClient,
tableArn: Option[String] = None,
tablePrefix: String = DEFAULT_TABLE_PREFIX
): Int = {
val log = LoggerFactory.getLogger(this.getClass)
val offsetTableBaseName = s"${tablePrefix}_${DEFAULT_OFFSET_TABLE}"
val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, offsetTableBaseName)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 576 - 579, The deleteAllOffsets method currently hardcodes
DEFAULT_TABLE_PREFIX and passes None for tableArn, which can target the wrong
table; change the signature of deleteAllOffsets to accept tableArn:
Option[String] and tablePrefix: String (or an Option[String] if you prefer) and
use those values when building offsetTableBaseName and calling
BookkeeperDynamoDb.getFullTableName instead of DEFAULT_TABLE_PREFIX/None; update
all call sites to pass the instance's tableArn and tablePrefix (or defaults) so
deleteAllOffsets operates on the intended table (refer to the deleteAllOffsets
method and BookkeeperDynamoDb.getFullTableName symbols).

Comment on lines +109 to +119
try {
val putRequest = PutItemRequest.builder()
.tableName(journalTableFullName)
.item(itemBuilder.result().asJava)
.build()

dynamoDbClient.putItem(putRequest)
} catch {
case NonFatal(ex) =>
log.error(s"Unable to write to the journal table '$journalTableFullName'.", ex)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't turn DynamoDB failures into success or empty history.

Both paths swallow NonFatal and continue. A failed write becomes silent journal loss, and a failed read is indistinguishable from “no entries in range”. For bookkeeping storage, this should bubble up or return an explicit failure so callers can react.

Also applies to: 173-176

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 109 - 119, The catch blocks around dynamoDbClient.putItem (the
NonFatal(ex) that currently calls log.error) must not swallow failures: change
the write path in JournalDynamoDB (the PutItemRequest / dynamoDbClient.putItem
block) to log the error and then propagate it (e.g., rethrow the exception or
return a failed Try/Either/Left) so callers can detect write failures; do the
same for the corresponding read path mentioned (the similar NonFatal at lines
173-176) so a failed read is distinguishable from "no entries"—use a consistent
failure type (e.g., JournalException or scala.util.Failure/Left) and update
method signatures/return types if necessary so callers can react.

Comment on lines +191 to +195
} catch {
case _: ResourceNotFoundException =>
log.info(s"Creating journal table '$journalTableFullName'")
createJournalTable()
case NonFatal(ex) =>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle the create-table startup race.

The describeTable()createTable() flow is TOCTOU-racy. If two processes bootstrap together, both can observe ResourceNotFoundException; one creates the table, and the other fails on ResourceInUseException even though the table is now valid. Catch ResourceInUseException in createJournalTable() and just wait for ACTIVE.

Suggested hardening
   private def createJournalTable(): Unit = {
     val createRequest = CreateTableRequest.builder()
       .tableName(journalTableFullName)
       .attributeDefinitions(
         AttributeDefinition.builder()
           .attributeName(JournalDynamoDB.ATTR_JOB_NAME)
           .attributeType(ScalarAttributeType.S)
           .build(),
         AttributeDefinition.builder()
           .attributeName(JournalDynamoDB.ATTR_FINISHED_AT)
           .attributeType(ScalarAttributeType.N)
           .build()
       )
       .keySchema(
         KeySchemaElement.builder()
           .attributeName(JournalDynamoDB.ATTR_JOB_NAME)
           .keyType(KeyType.HASH)
           .build(),
         KeySchemaElement.builder()
           .attributeName(JournalDynamoDB.ATTR_FINISHED_AT)
           .keyType(KeyType.RANGE)
           .build()
       )
       .billingMode(BillingMode.PAY_PER_REQUEST)
       .build()
 
-    dynamoDbClient.createTable(createRequest)
+    try {
+      dynamoDbClient.createTable(createRequest)
+    } catch {
+      case _: ResourceInUseException =>
+        log.info(s"Journal table '$journalTableFullName' was created concurrently")
+    }
     waitForTableActive(journalTableFullName, dynamoDbClient)
     log.info(s"Journal table '$journalTableFullName' created successfully")
   }

Also applies to: 204-231

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 191 - 195, The TOCTOU between describeTable() and
createJournalTable() causes failures when two processes race to create the
table; update createJournalTable() (and any other create-table call sites around
JournalDynamoDB.describeTable/createJournalTable) to catch
ResourceInUseException and, instead of failing, poll DescribeTable until the
table status is ACTIVE (with backoff/retries and timeout) before returning;
leave the existing ResourceNotFoundException handling in describeTable() but
ensure createJournalTable() swallows ResourceInUseException and waits for ACTIVE
so concurrent bootstraps succeed.

Comment on lines +181 to +230
private def createMetadataTableIfNotExists(): Unit = {
try {
val describeRequest = DescribeTableRequest.builder()
.tableName(metadataTableFullName)
.build()

dynamoDbClient.describeTable(describeRequest)
log.info(s"Metadata table '$metadataTableFullName' already exists")
} catch {
case _: ResourceNotFoundException =>
log.info(s"Creating metadata table '$metadataTableFullName'")
createMetadataTable()
case NonFatal(ex) =>
log.error(s"Error checking if metadata table exists", ex)
throw ex
}
}

/**
* Creates the metadata table in DynamoDB.
*/
private def createMetadataTable(): Unit = {
val createRequest = CreateTableRequest.builder()
.tableName(metadataTableFullName)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.attributeType(ScalarAttributeType.S)
.build()
)
.keySchema(
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.keyType(KeyType.RANGE)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST)
.build()

dynamoDbClient.createTable(createRequest)
waitForTableActive(metadataTableFullName, dynamoDbClient)
log.info(s"Metadata table '$metadataTableFullName' created successfully")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wait for ACTIVE even when the table already exists.

The happy path at Lines 187-188 treats any successful DescribeTable as ready, but DynamoDB can return a table in CREATING. In a concurrent startup, this instance can proceed and issue CRUD calls before the table becomes usable. The same race can also make createTable() throw ResourceInUseException after another process wins the create.

Suggested fix
   private def createMetadataTableIfNotExists(): Unit = {
     try {
       val describeRequest = DescribeTableRequest.builder()
         .tableName(metadataTableFullName)
         .build()

-      dynamoDbClient.describeTable(describeRequest)
-      log.info(s"Metadata table '$metadataTableFullName' already exists")
+      val table = dynamoDbClient.describeTable(describeRequest).table()
+      if (table.tableStatus() != TableStatus.ACTIVE) {
+        waitForTableActive(metadataTableFullName, dynamoDbClient)
+      }
+      log.info(s"Metadata table '$metadataTableFullName' already exists")
     } catch {
       case _: ResourceNotFoundException =>
         log.info(s"Creating metadata table '$metadataTableFullName'")
         createMetadataTable()
       case NonFatal(ex) =>
@@
   private def createMetadataTable(): Unit = {
     val createRequest = CreateTableRequest.builder()
@@
-    dynamoDbClient.createTable(createRequest)
+    try {
+      dynamoDbClient.createTable(createRequest)
+    } catch {
+      case _: ResourceInUseException =>
+        log.info(s"Metadata table '$metadataTableFullName' is already being created")
+    }
     waitForTableActive(metadataTableFullName, dynamoDbClient)
     log.info(s"Metadata table '$metadataTableFullName' created successfully")
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def createMetadataTableIfNotExists(): Unit = {
try {
val describeRequest = DescribeTableRequest.builder()
.tableName(metadataTableFullName)
.build()
dynamoDbClient.describeTable(describeRequest)
log.info(s"Metadata table '$metadataTableFullName' already exists")
} catch {
case _: ResourceNotFoundException =>
log.info(s"Creating metadata table '$metadataTableFullName'")
createMetadataTable()
case NonFatal(ex) =>
log.error(s"Error checking if metadata table exists", ex)
throw ex
}
}
/**
* Creates the metadata table in DynamoDB.
*/
private def createMetadataTable(): Unit = {
val createRequest = CreateTableRequest.builder()
.tableName(metadataTableFullName)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.attributeType(ScalarAttributeType.S)
.build()
)
.keySchema(
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.keyType(KeyType.RANGE)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST)
.build()
dynamoDbClient.createTable(createRequest)
waitForTableActive(metadataTableFullName, dynamoDbClient)
log.info(s"Metadata table '$metadataTableFullName' created successfully")
private def createMetadataTableIfNotExists(): Unit = {
try {
val describeRequest = DescribeTableRequest.builder()
.tableName(metadataTableFullName)
.build()
val table = dynamoDbClient.describeTable(describeRequest).table()
if (table.tableStatus() != TableStatus.ACTIVE) {
waitForTableActive(metadataTableFullName, dynamoDbClient)
}
log.info(s"Metadata table '$metadataTableFullName' already exists")
} catch {
case _: ResourceNotFoundException =>
log.info(s"Creating metadata table '$metadataTableFullName'")
createMetadataTable()
case NonFatal(ex) =>
log.error(s"Error checking if metadata table exists", ex)
throw ex
}
}
/**
* Creates the metadata table in DynamoDB.
*/
private def createMetadataTable(): Unit = {
val createRequest = CreateTableRequest.builder()
.tableName(metadataTableFullName)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.attributeType(ScalarAttributeType.S)
.build()
)
.keySchema(
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY)
.keyType(KeyType.RANGE)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST)
.build()
try {
dynamoDbClient.createTable(createRequest)
} catch {
case _: ResourceInUseException =>
log.info(s"Metadata table '$metadataTableFullName' is already being created")
}
waitForTableActive(metadataTableFullName, dynamoDbClient)
log.info(s"Metadata table '$metadataTableFullName' created successfully")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 181 - 230, The code assumes a successful DescribeTable means the
table is usable, but DynamoDB may return CREATING; update
createMetadataTableIfNotExists to call describeTable and inspect the table
status (from the DescribeTableResponse) and if it's not ACTIVE call
waitForTableActive(metadataTableFullName, dynamoDbClient) before proceeding;
also update createMetadataTable() to catch ResourceInUseException (another
process created the table) and in that catch call
waitForTableActive(metadataTableFullName, dynamoDbClient) instead of failing so
concurrent startups wait for ACTIVE state. Ensure you reference the methods
createMetadataTableIfNotExists, createMetadataTable, and waitForTableActive in
your changes.

Comment on lines +298 to +317
def build(): MetadataManagerDynamoDb = {
if (region.isEmpty) {
throw new IllegalArgumentException("Region must be provided")
}

val clientBuilder = DynamoDbClient.builder()
.region(Region.of(region.get))

credentialsProvider.foreach(clientBuilder.credentialsProvider)
endpoint.foreach { ep =>
clientBuilder.endpointOverride(URI.create(ep))
}

val client = clientBuilder.build()

new MetadataManagerDynamoDb(
dynamoDbClient = client,
tableArn = tableArn,
tablePrefix = tablePrefix
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Close the client if constructor-side initialization fails.

Line 311 allocates the DynamoDbClient, and the constructor immediately runs DescribeTable/CreateTable. If that throws, build() exits without closing the client, leaking resources on the failure path.

Suggested fix
       val client = clientBuilder.build()

-      new MetadataManagerDynamoDb(
-        dynamoDbClient = client,
-        tableArn = tableArn,
-        tablePrefix = tablePrefix
-      )
+      try {
+        new MetadataManagerDynamoDb(
+          dynamoDbClient = client,
+          tableArn = tableArn,
+          tablePrefix = tablePrefix
+        )
+      } catch {
+        case NonFatal(ex) =>
+          client.close()
+          throw ex
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build(): MetadataManagerDynamoDb = {
if (region.isEmpty) {
throw new IllegalArgumentException("Region must be provided")
}
val clientBuilder = DynamoDbClient.builder()
.region(Region.of(region.get))
credentialsProvider.foreach(clientBuilder.credentialsProvider)
endpoint.foreach { ep =>
clientBuilder.endpointOverride(URI.create(ep))
}
val client = clientBuilder.build()
new MetadataManagerDynamoDb(
dynamoDbClient = client,
tableArn = tableArn,
tablePrefix = tablePrefix
)
def build(): MetadataManagerDynamoDb = {
if (region.isEmpty) {
throw new IllegalArgumentException("Region must be provided")
}
val clientBuilder = DynamoDbClient.builder()
.region(Region.of(region.get))
credentialsProvider.foreach(clientBuilder.credentialsProvider)
endpoint.foreach { ep =>
clientBuilder.endpointOverride(URI.create(ep))
}
val client = clientBuilder.build()
try {
new MetadataManagerDynamoDb(
dynamoDbClient = client,
tableArn = tableArn,
tablePrefix = tablePrefix
)
} catch {
case NonFatal(ex) =>
client.close()
throw ex
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 298 - 317, The builder currently creates a DynamoDbClient (client)
and then calls new MetadataManagerDynamoDb(...), but if the constructor (which
runs DescribeTable/CreateTable) throws, the client is leaked; modify build() to
ensure the DynamoDbClient is closed on failure: wrap the construction of
MetadataManagerDynamoDb in a try { new MetadataManagerDynamoDb(dynamoDbClient =
client, tableArn = tableArn, tablePrefix = tablePrefix) } catch { case e =>
client.close(); throw e } (or equivalent finally logic) so the client is closed
if the constructor throws while still returning the manager when successful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow storing bookkeeping on DynamoDB

1 participant