#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730
#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730
Conversation
…prove error handling in builders
WalkthroughAdds optional DynamoDB-backed bookkeeping: new DynamoDB implementations for Bookkeeper components (OffsetManager, Journal, MetadataManager, TokenLock/Factory), traits extended to AutoCloseable, builders and tests, example configs/docs, and AWS SDK DynamoDB dependency updates. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Config as BookkeeperConfig
participant Factory as Bookkeeper.fromConfig
participant Offset as OffsetManagerDynamoDb
participant Journal as JournalDynamoDB
participant LockFactory as TokenLockFactoryDynamoDb
participant Dynamo as AWS DynamoDB
App->>Config: load config (dynamoDbRegion/table.prefix/arn)
App->>Factory: Bookkeeper.fromConfig(config)
Factory->>Offset: build OffsetManagerDynamoDb (builder)
Offset->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
Factory->>Journal: build JournalDynamoDB (builder)
Journal->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
Factory->>LockFactory: build TokenLockFactoryDynamoDb (builder)
LockFactory->>Dynamo: DescribeTable / CreateTable / Poll until ACTIVE
Factory-->>App: Bookkeeper (components ready)
App->>Factory: close()
Factory->>Offset: close()
Factory->>Journal: close()
Factory->>LockFactory: close()
Offset->>Dynamo: client.close()
Journal->>Dynamo: client.close()
LockFactory->>Dynamo: client.close()
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Unit Test Coverage
Files
|
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala (1)
36-92: 🛠️ Refactor suggestion | 🟠 MajorDon’t hide offset-manager cleanup behind a trait-level no-op.
OffsetManagerDynamoDbnow has real shutdown logic, whileOffsetManagerJdbcwill silently inherit this default. Keepingclose()abstract here—or forcing explicit no-op overrides where ownership is delegated—makes resource semantics intentional instead of accidental.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala` around lines 36 - 92, The trait OffsetManager currently provides a no-op default close() implementation which hides required cleanup; remove the default override def close(): Unit = {} from the OffsetManager trait so close() is abstract, and then implement explicit close() methods in concrete classes (e.g., OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up or intentionally no-oped; update the implementations to compile by adding close() where missing.pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala (1)
21-74: 🛠️ Refactor suggestion | 🟠 MajorKeep
close()explicit on resource-owning implementations.Making
close()a concrete no-op here removes the compiler check that concrete managers must opt into their lifecycle semantics. Withpramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerBase.scala:1-50sitting under both JDBC and DynamoDB managers, a missing override now silently degrades into “do nothing” instead of failing fast at compile time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala` around lines 21 - 74, The trait MetadataManager currently provides a concrete no-op close() which prevents the compiler from forcing resource-owning implementations to declare lifecycle behavior; change the signature in MetadataManager from "override def close(): Unit = {}" to an abstract "def close(): Unit" (i.e. remove the default body/override) so concrete classes must explicitly implement/override close; also update any implementations (e.g. classes extending MetadataManagerBase under JDBC/DynamoDB) that rely on the implicit no-op to add an explicit close implementation (even if it's a no-op) to make lifecycle intent explicit.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)
236-243:⚠️ Potential issue | 🔴 CriticalAdd
bookkeeper.close()to the aggregate closer.The
closableblock at lines 236-243 should also callbookkeeper.close(). Concrete implementations likeBookkeeperDynamoDbandBookkeeperJdbchave meaningfulclose()logic that manages critical resources (DynamoDB client, database connections). Without this call, these resources will leak whenclosable.close()is invoked.Relevant code
val closable = new AutoCloseable { override def close(): Unit = { mongoDbConnection.foreach(_.close()) dbOpt.foreach(_.close()) tokenFactory.close() journal.close() metadataManager.close() bookkeeper.close() // Add this line } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala` around lines 236 - 243, The closable AutoCloseable currently omits calling bookkeeper.close(), which leaks resources for implementations like BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable implementation (closable) to invoke bookkeeper.close() inside its close() method after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(), journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete resources are released.
🧹 Nitpick comments (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala (1)
25-97: These tests only prove the fluent API compiles.Each positive case asserts
isInstanceOf[MetadataManagerDynamoDbBuilder], which the method signatures already guarantee. Because none of them callbuild(), malformed values like the ARN on Line 43 still pass and the default-prefix / endpoint / credentials wiring stays untested.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala` around lines 25 - 97, The tests only check that the fluent builder methods compile; fix them to exercise build() and assert resulting instance configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters (withRegion, withTableArn, withTablePrefix, withCredentialsProvider, withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or its accessible config) contains the expected region, tableArn/tablePrefix, endpoint and credentials provider values; keep the existing negative test that intercepts build() without region but change the positive cases to call build() and verify actual wiring rather than isInstanceOf checks.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)
105-112: Use the lock factory’s own default prefix constant here.Line 106 is building
TokenLockFactoryDynamoDbbut falls back toBookkeeperDynamoDb.DEFAULT_TABLE_PREFIX. If those defaults ever diverge, the lock backend will quietly start using a different default naming convention from the one defined by the lock implementation itself.Suggested fix
- val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) + val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala` around lines 105 - 112, The code currently falls back to BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb, which can diverge from the lock factory's own default; change the fallback to use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on bookkeepingConfig.dynamoDbTablePrefix with getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the TokenLockFactoryDynamoDb builder uses its native default prefix.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala (1)
25-97: These assertions don’t exercise the journal builder.The positive cases only check the static return type, so they won’t catch broken default-prefix wiring or malformed inputs such as the ARN on Line 43. Prefer one or two end-to-end
build()smoke tests and keep the negative validation case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala` around lines 25 - 97, Replace the superficial type-only assertions with one or two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify real behavior: e.g. construct a builder via JournalDynamoDB.builder.withRegion(...) (and optionally withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call build(), then assert the resulting JournalDynamoDB instance has the expected tablePrefix (default when not provided), correctly parsed tableArn handling, and any endpoint/credentials set; keep the existing negative test that intercepts IllegalArgumentException from builder.build() when region is missing. Use the builder methods (withRegion, withTableArn, withTablePrefix, withCredentialsProvider, withEndpoint) and the build() method to locate where to change tests.pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala (2)
142-164: Consider adding owner verification in updateTicket.The
updateTicketmethod updates the expiration without verifying ownership. If another process has taken over the lock (e.g., after expiration), this could inadvertently extend the other process's lock. Adding a condition expression would make this safer:🛡️ Suggested improvement
val updateRequest = UpdateItemRequest.builder() .tableName(tableName) .key(Map( ATTR_TOKEN -> AttributeValue.builder().s(escapedToken).build() ).asJava) .updateExpression(s"SET $ATTR_EXPIRES = :expires") + .conditionExpression(s"$ATTR_OWNER = :owner") .expressionAttributeValues(Map( - ":expires" -> AttributeValue.builder().n(newTicket.toString).build() + ":expires" -> AttributeValue.builder().n(newTicket.toString).build(), + ":owner" -> AttributeValue.builder().s(owner).build() ).asJava) .build()This prevents a rare race condition where a lock expires, is taken by another process, and the original holder's watcher thread updates the expiration.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala` around lines 142 - 164, The updateTicket method currently unconditionally sets ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and the UpdateItemRequest built there) to include a condition expression that verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to the current owner/id) so the update only succeeds if the lock is still held by this process, and add handling for the conditional-failure case (do not treat it as success; log or ignore the ConditionalCheckFailedException) so the watcher doesn't silently extend someone else's lock; refer to updateTicket, getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.
178-185: Add null-safety check for required DynamoDB item attributes.The code assumes
ATTR_EXPIRESandATTR_OWNERwill always be present in the item. If the DynamoDB item is malformed (e.g., created by a different version or manually edited), accessing these attributes could throwNullPointerException.Consider adding defensive null checks:
🛡️ Proposed defensive fix
if (response.hasItem && !response.item().isEmpty) { val item = response.item() + val tokenAttr = item.get(ATTR_TOKEN) + val ownerAttr = item.get(ATTR_OWNER) + val expiresAttr = item.get(ATTR_EXPIRES) + + if (tokenAttr == null || ownerAttr == null || expiresAttr == null) { + log.warn(s"Malformed lock item for $escapedToken - missing required attributes") + return None + } + Some(LockTicket( - token = item.get(ATTR_TOKEN).s(), - owner = item.get(ATTR_OWNER).s(), - expires = item.get(ATTR_EXPIRES).n().toLong, + token = tokenAttr.s(), + owner = ownerAttr.s(), + expires = expiresAttr.n().toLong, createdAt = Option(item.get(ATTR_CREATED_AT)).map(_.n().toLong) ))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala` around lines 178 - 185, The DynamoDB item handling in TokenLockDynamoDb assumes ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive null-safety checks before constructing the LockTicket: verify item.containsKey or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep current Option mapping), and if required attrs are missing log a warning and return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pramen/core/pom.xml`:
- Around line 147-151: The Maven dependency for the AWS SDK DynamoDB
(groupId=software.amazon.awssdk, artifactId=dynamodb) is missing a scope and
defaults to compile, causing Maven builds to bundle the SDK; update that
dependency entry to include <scope>provided</scope> so it matches the SBT
Dependencies.scala Provided scope and prevents packaging the AWS SDK into the
artifact.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 495-499: The BookkeeperDynamoDb.deleteTable currently throws
UnsupportedOperationException which violates the Bookkeeper contract; implement
wildcard deletion inside BookkeeperDynamoDb.deleteTable by scanning the DynamoDB
bookkeeper table for items whose table/name attribute matches the provided
wildcard (convert wildcard to a prefix/regex or Dynamo-compatible
FilterExpression), paginate through Scan results, delete matching items using
DeleteItem or BatchWriteItem for efficiency, collect and return the deleted item
names as Seq[String], and ensure proper error handling and logging;
alternatively, if you prefer to disallow wildcards for DynamoDB, move this check
to the BookkeeperDynamoDb constructor/factory and throw a configuration-time
IllegalArgumentException with a clear message so callers fail fast.
- Around line 66-68: The init() method currently creates/checks tables using
bookkeepingTableBaseName and schemaTableBaseName which mismatches the resolved
names used by all CRUDs; update init() to use the resolved private vals
bookkeepingTableName and schemaTableName instead (replace usages of
bookkeepingTableBaseName and schemaTableBaseName in init()), ensuring any calls
that pass names into ensureTable/describe/createTable use bookkeepingTableName
and schemaTableName so the created tables match the identifiers used by the CRUD
methods.
- Around line 141-165: The createBookkeepingTable function currently builds a
DynamoDB key schema with ATTR_TABLE_NAME (HASH) and ATTR_INFO_DATE (RANGE),
which enforces one item per table/date and causes overwrites; modify
createBookkeepingTable to include ATTR_BATCH_ID as an additional sort key
component (or change the sort key to a composite that includes ATTR_INFO_DATE
and ATTR_BATCH_ID) so the primary key uniquely identifies (tableName, infoDate,
batchId). Update attributeDefinitions to include ATTR_BATCH_ID with
ScalarAttributeType.S and adjust any code that assumes the old two-part key
accordingly (references: createBookkeepingTable, ATTR_TABLE_NAME,
ATTR_INFO_DATE, ATTR_BATCH_ID).
- Around line 838-848: The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 123-126: The composite primary key built in startWriteOffsets uses
createdAt.toEpochMilli (createdAtMilli) which can collide for concurrent writes;
change the key generation so it cannot repeat within the same millisecond—for
example, stop reducing Instant to millis and instead incorporate the full
Instant string (createdAt.toString which includes nanoseconds) or append a short
random/UUID suffix to compositeKey (e.g., use createdAt and a UUID) so
compositeKey is unique for concurrent calls to startWriteOffsets and prevents
silent overwrites/races.
- Around line 301-317: getOffsetRecords and getMaximumInfoDate currently query
DynamoDB without handling pagination and use a filterExpression on
ATTR_INFO_DATE, which can miss results when a partition >1MB; update both
methods (getOffsetRecords, getMaximumInfoDate) to iterate pages using the
QueryResponse.lastEvaluatedKey and repeatedly call dynamoDbClient.query until
lastEvaluatedKey is empty, accumulating result.items(); also change the query
keyConditionExpression to use begins_with on the composite sort key (e.g.,
compositeKey begins_with "<infoDate>#") instead of filtering by ATTR_INFO_DATE
to limit data scanned (use expressionAttributeValues for the begins_with prefix)
and remove the post-query filter. Ensure QueryRequest.builder() is built
per-page with ExclusiveStartKey when present and that itemToOffsetRecord is
applied to all accumulated items.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 78-80: The current JournalDynamoDB primary key uses
(ATTR_JOB_NAME, ATTR_FINISHED_AT) causing collisions and preventing efficient
time-range reads; change the write path that builds items (where itemBuilder is
populated and putItem is called) to include a unique per-entry identifier (e.g.,
UUID or nano-timestamp) as a new attribute and include that identifier in the
primary key so simultaneous finishes do not overwrite each other; add a Global
Secondary Index on ATTR_FINISHED_AT (or a numeric finishedAt attribute) so
getEntries can perform a Query over finishedAt ranges instead of a full table
Scan; where getEntries currently performs an unpaginated ScanRequest, replace it
with either a paginated Scan loop that uses ExclusiveStartKey/LastEvaluatedKey
to fetch all pages or — preferably — change getEntries to Query the new GSI with
KeyConditionExpression for the finishedAt range and handle Query pagination
similarly.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 51-53: TokenLockFactoryDynamoDb builds locksTableName via
BookkeeperDynamoDb.getFullTableName(tableArn, locksTableBaseName) but subsequent
DynamoDB calls still use locksTableBaseName, which breaks cross-account/region
behavior; update all usages in TokenLockFactoryDynamoDb (including table
creation/reads/writes around the init/ensure block and the sections currently
referencing locksTableBaseName at lines ~58–60 and ~83–85) to use locksTableName
instead so that all table operations consistently use the resolved identifier
returned by BookkeeperDynamoDb.getFullTableName.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 83-102: getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.
In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf`:
- Around line 125-132: The IAM policy Resource array is missing several
Pramen-created tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.
In `@pramen/examples/dynamodb_bookkeeping/README.md`:
- Line 437: The README references a non-existent example file
TokenLockFactoryDynamoDbExample.scala; update the link to point to the actual
implementation TokenLockFactoryDynamoDb.scala (or remove the reference
entirely). Open pramen/examples/dynamodb_bookkeeping/README.md, find the line
mentioning
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala
and replace it with
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
(or delete the sentence if no example reference is desired) so the documentation
points to an existing symbol/class.
In `@pramen/project/Versions.scala`:
- Line 40: The awsSdkVersion constant in Versions.scala is set to a non-existent
Maven version; update the value of awsSdkVersion to a valid released AWS SDK v2
artifact (e.g., "2.34.0" or a later available version) and ensure any places
that reference Versions.awsSdkVersion (dependency declarations or build files)
will pick up the new value so dependency resolution succeeds.
In `@README.md`:
- Around line 2592-2598: The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.
---
Outside diff comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala`:
- Around line 21-74: The trait MetadataManager currently provides a concrete
no-op close() which prevents the compiler from forcing resource-owning
implementations to declare lifecycle behavior; change the signature in
MetadataManager from "override def close(): Unit = {}" to an abstract "def
close(): Unit" (i.e. remove the default body/override) so concrete classes must
explicitly implement/override close; also update any implementations (e.g.
classes extending MetadataManagerBase under JDBC/DynamoDB) that rely on the
implicit no-op to add an explicit close implementation (even if it's a no-op) to
make lifecycle intent explicit.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 236-243: The closable AutoCloseable currently omits calling
bookkeeper.close(), which leaks resources for implementations like
BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable
implementation (closable) to invoke bookkeeper.close() inside its close() method
after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(),
journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete
resources are released.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala`:
- Around line 36-92: The trait OffsetManager currently provides a no-op default
close() implementation which hides required cleanup; remove the default override
def close(): Unit = {} from the OffsetManager trait so close() is abstract, and
then implement explicit close() methods in concrete classes (e.g.,
OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up
or intentionally no-oped; update the implementations to compile by adding
close() where missing.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 105-112: The code currently falls back to
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb,
which can diverge from the lock factory's own default; change the fallback to
use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the
getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on
bookkeepingConfig.dynamoDbTablePrefix with
getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the
TokenLockFactoryDynamoDb builder uses its native default prefix.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`:
- Around line 142-164: The updateTicket method currently unconditionally sets
ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and
the UpdateItemRequest built there) to include a condition expression that
verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to
the current owner/id) so the update only succeeds if the lock is still held by
this process, and add handling for the conditional-failure case (do not treat it
as success; log or ignore the ConditionalCheckFailedException) so the watcher
doesn't silently extend someone else's lock; refer to updateTicket,
getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.
- Around line 178-185: The DynamoDB item handling in TokenLockDynamoDb assumes
ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive
null-safety checks before constructing the LockTicket: verify item.containsKey
or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER
and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required
attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep
current Option mapping), and if required attrs are missing log a warning and
return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala`:
- Around line 25-97: Replace the superficial type-only assertions with one or
two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify
real behavior: e.g. construct a builder via
JournalDynamoDB.builder.withRegion(...) (and optionally
withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call
build(), then assert the resulting JournalDynamoDB instance has the expected
tablePrefix (default when not provided), correctly parsed tableArn handling, and
any endpoint/credentials set; keep the existing negative test that intercepts
IllegalArgumentException from builder.build() when region is missing. Use the
builder methods (withRegion, withTableArn, withTablePrefix,
withCredentialsProvider, withEndpoint) and the build() method to locate where to
change tests.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala`:
- Around line 25-97: The tests only check that the fluent builder methods
compile; fix them to exercise build() and assert resulting instance
configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters
(withRegion, withTableArn, withTablePrefix, withCredentialsProvider,
withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or
its accessible config) contains the expected region, tableArn/tablePrefix,
endpoint and credentials provider values; keep the existing negative test that
intercepts build() without region but change the positive cases to call build()
and verify actual wiring rather than isInstanceOf checks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a93de128-e535-4c03-9bcd-8d89e08cbee7
📒 Files selected for processing (30)
README.mdpramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scalapramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLockFactory.scalapramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContext.scalapramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scalapramen/core/src/test/scala/za/co/absa/pramen/core/BookkeepingConfigFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockFactoryDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scalapramen/examples/dynamodb_bookkeeping/README.mdpramen/examples/dynamodb_bookkeeping/dynamodb_bookkeeping.confpramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.confpramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scalapramen/pom.xmlpramen/project/Dependencies.scalapramen/project/Versions.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
Outdated
Show resolved
Hide resolved
| def getFullTableName(tableArn: Option[String], tableName: String): String = { | ||
| tableArn match { | ||
| case Some(arn) if arn.nonEmpty => | ||
| // If ARN ends with table/, append the table name, otherwise append /table/tableName | ||
| if (arn.endsWith("/")) { | ||
| s"${arn}table/$tableName" | ||
| } else if (arn.contains("/table/")) { | ||
| arn // ARN already includes table path | ||
| } else { | ||
| s"$arn/table/$tableName" | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "BookkeeperDynamoDb.scala" -type fRepository: AbsaOSS/pramen
Length of output: 145
🏁 Script executed:
find . -name "*test*" -o -name "*Test*" | grep -i dynamo | head -20Repository: AbsaOSS/pramen
Length of output: 40
🏁 Script executed:
cd pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper && head -n 860 BookkeeperDynamoDb.scala | tail -n 50Repository: AbsaOSS/pramen
Length of output: 1549
🌐 Web query:
DynamoDB ARN format specification AWS
💡 Result:
The ARN format specification for DynamoDB resources follows the general AWS ARN structure: arn:partition:dynamodb:region:account-id:resource-type/resource-id. Key DynamoDB resource types and their formats: - Table: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection - Stream: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/stream/{StreamLabel} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection/stream/2024-02-12T18:57:26.492 StreamLabel is a timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmm). - Index: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/index/{IndexName} - Backup: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/backup/{BackupName} - Global Table: Supported via AWS::DynamoDB::GlobalTable in CloudFormation, follows similar table ARN pattern. Partition is typically 'aws' (standard), 'aws-cn' (China), or 'aws-us-gov' (GovCloud). Region, account-id (12-digit without hyphens), and resource identifiers are specific to the resource. These formats are used in IAM policies, API calls, and resource references. Examples appear consistently across AWS docs, CloudFormation, IAM policies, and API references.
Citations:
- 1: https://docs.aws.amazon.com/AWSCloudFormation/latest/TemplateReference/aws-resource-dynamodb-table.html
- 2: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
- 3: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
- 4: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-examples.html
- 5: https://docs.aws.amazon.com/r53recovery/latest/dg/recovery-readiness.resource-types-arns.html
- 6: https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazondynamodb.html
- 7: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
🏁 Script executed:
find . -path "*test*" -name "*BookkeeperDynamoDb*" -o -path "*test*" -name "*Bookkeeper*Dynamo*"Repository: AbsaOSS/pramen
Length of output: 163
🏁 Script executed:
rg "getFullTableName" -nRepository: AbsaOSS/pramen
Length of output: 8271
🏁 Script executed:
rg "tableArn" -n -A 3 -B 3 | head -100Repository: AbsaOSS/pramen
Length of output: 11519
🏁 Script executed:
sed -n '130,155p' pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scalaRepository: AbsaOSS/pramen
Length of output: 805
🏁 Script executed:
cat pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala | sed -n '130,160p'Repository: AbsaOSS/pramen
Length of output: 805
🏁 Script executed:
rg "locksTableName|journalTableFullName|metadataTableFullName|bookkeepingTableName|schemaTableName" -A 5 -B 2 | head -150Repository: AbsaOSS/pramen
Length of output: 17556
🏁 Script executed:
sed -n '100,120p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalaRepository: AbsaOSS/pramen
Length of output: 1141
🏁 Script executed:
sed -n '45,75p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 851
getFullTableName builds invalid DynamoDB ARNs.
The method's logic is flawed for handling DynamoDB ARN prefixes. When input ends with / (e.g., arn:aws:dynamodb:us-east-1:123456789012:table/), it appends table/ producing the malformed result arn:aws:dynamodb:us-east-1:123456789012:table/table/<name>. Additionally, the check for /table/ substring does not align with standard DynamoDB ARN format which uses :table/ (colon-slash), making the logic fragile and error-prone.
This breaks ARN-based configuration for every manager (MetadataManagerDynamoDb, TokenLockFactoryDynamoDb, JournalDynamoDB, OffsetManagerDynamoDb) that depends on this helper to construct valid table identifiers for DynamoDB API calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 838 - 848, The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.
| override def getMetadataFromStorage(tableName: String, infoDate: LocalDate): Map[String, MetadataValue] = { | ||
| try { | ||
| val compositeKey = s"$tableName#$infoDate" | ||
|
|
||
| val queryRequest = QueryRequest.builder() | ||
| .tableName(metadataTableFullName) | ||
| .keyConditionExpression(s"${MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY} = :composite_key") | ||
| .expressionAttributeValues(Map( | ||
| ":composite_key" -> AttributeValue.builder().s(compositeKey).build() | ||
| ).asJava) | ||
| .build() | ||
|
|
||
| val result = dynamoDbClient.query(queryRequest) | ||
|
|
||
| result.items().asScala.map { item => | ||
| val key = item.get(MetadataManagerDynamoDb.ATTR_METADATA_KEY).s() | ||
| val value = item.get(MetadataManagerDynamoDb.ATTR_METADATA_VALUE).s() | ||
| val lastUpdated = Instant.ofEpochSecond(item.get(MetadataManagerDynamoDb.ATTR_LAST_UPDATED).n().toLong) | ||
| key -> MetadataValue(value, lastUpdated) | ||
| }.toMap |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala | sed -n '80,170p'Repository: AbsaOSS/pramen
Length of output: 4532
🏁 Script executed:
rg -n "getMetadataFromStorage|deleteMetadataFromStorage" pramen/core/src/main/scala/ -A 3 -B 1 | head -80Repository: AbsaOSS/pramen
Length of output: 9502
Add pagination to getMetadataFromStorage and deleteMetadataFromStorage.
The query at line 95 in getMetadataFromStorage(tableName, infoDate) and the query at line 164 in deleteMetadataFromStorage(tableName, infoDate) do not loop on lastEvaluatedKey. If a (tableName, infoDate) partition exceeds DynamoDB's 1 MB default page size, getMetadataFromStorage returns incomplete metadata and deleteMetadataFromStorage leaves stale items without warning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 83 - 102, getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.
| # "Resource": [ | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping", | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_schemas", | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_locks" | ||
| # ] | ||
| # } | ||
| # ] | ||
| # } |
There was a problem hiding this comment.
IAM policy example is incomplete - missing tables.
The IAM policy Resource list only includes 3 tables (bookkeeping, schemas, locks) but the documentation above mentions 5+ tables that are automatically created: journal, metadata, and offsets. Users following this example may encounter permission errors.
Consider updating to include all tables:
"Resource": [
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"arn:aws:dynamodb:*:*:table/pramen_production_schemas",
- "arn:aws:dynamodb:*:*:table/pramen_production_locks"
+ "arn:aws:dynamodb:*:*:table/pramen_production_locks",
+ "arn:aws:dynamodb:*:*:table/pramen_production_journal",
+ "arn:aws:dynamodb:*:*:table/pramen_production_metadata",
+ "arn:aws:dynamodb:*:*:table/pramen_production_offsets"
]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf` around lines
125 - 132, The IAM policy Resource array is missing several Pramen-created
tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.
| val scalatestVersion = "3.2.14" | ||
| val mockitoVersion = "2.28.2" | ||
| val httpClientVersion = "4.5.14" | ||
| val awsSdkVersion = "2.42.23" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if AWS SDK version 2.42.23 exists on Maven Central
echo "=== Checking AWS SDK DynamoDB version on Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[] | {version: .latestVersion}'
echo ""
echo "=== Check for security advisories ==="
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: MAVEN, package: "software.amazon.awssdk:dynamodb") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}'Repository: AbsaOSS/pramen
Length of output: 342
🏁 Script executed:
#!/bin/bash
# Query for specific version 2.42.23 and surrounding versions
echo "=== Checking if AWS SDK DynamoDB 2.42.23 exists ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:2.42.23&rows=1&wt=json" | jq '.response'
echo ""
echo "=== Listing all versions in 2.40+ range ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:[2.40+TO+2.50]&rows=20&wt=json" | jq '.response.docs[] | {version: .v}' | sort -u
echo ""
echo "=== Latest 10 versions available ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=10&wt=json" | jq -r '.response.docs[] | .v' | sort -V | tail -10Repository: AbsaOSS/pramen
Length of output: 245
🏁 Script executed:
#!/bin/bash
# Get actual available versions
echo "=== Latest 15 AWS SDK DynamoDB versions ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=100&wt=json" | jq -r '.response.docs | sort_by(.v) | reverse | .[0:15] | .[] | .v'Repository: AbsaOSS/pramen
Length of output: 106
🏁 Script executed:
#!/bin/bash
# Get the complete JSON response first
echo "=== AWS SDK DynamoDB versions from Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=50&wt=json" | jq '.response.docs[0:15] | map({version: .v, timestamp: .timestamp}) | sort_by(.version) | reverse'Repository: AbsaOSS/pramen
Length of output: 174
🏁 Script executed:
#!/bin/bash
# Get raw response structure
echo "=== Raw Maven Central response structure ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[0]'Repository: AbsaOSS/pramen
Length of output: 530
🏁 Script executed:
#!/bin/bash
# Verify the actual version in the Versions.scala file
cat pramen/project/Versions.scala | grep -A 2 -B 2 "awsSdkVersion"Repository: AbsaOSS/pramen
Length of output: 218
AWS SDK version 2.42.23 does not exist in Maven Central.
The specified version does not exist; the latest available version is 2.34.0. This will cause a build failure when the dependency cannot be resolved. Update to an available version, such as 2.34.0 or later.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/project/Versions.scala` at line 40, The awsSdkVersion constant in
Versions.scala is set to a non-existent Maven version; update the value of
awsSdkVersion to a valid released AWS SDK v2 artifact (e.g., "2.34.0" or a later
available version) and ensure any places that reference Versions.awsSdkVersion
(dependency declarations or build files) will pick up the new value so
dependency resolution succeeds.
| ```xml | ||
| <dependency> | ||
| <groupId>software.amazon.awssdk</groupId> | ||
| <artifactId>dynamodb</artifactId> | ||
| <version>${aws.sdk.version}</version> | ||
| </dependency> | ||
| ``` |
There was a problem hiding this comment.
Make the Maven example copy-pasteable.
${aws.sdk.version} is defined in this repository’s POM, not in a downstream user’s project, so the snippet on Line 2596 will fail as pasted. Either inline the version here or show the matching <properties> block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@README.md` around lines 2592 - 2598, The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.
…ey for multiple entries per table and date
…ecord deletion from DynamoDB (Thanks @coderabbitai).
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala (1)
990-1000:⚠️ Potential issue | 🟠 Major
getFullTableName()still mis-normalizes already resolved table ARNs.DynamoDB table ARNs use the
...:table/<name>form, and the APIs this class calls accept a full table ARN inTableName. IftableArnis already resolved to that form, or if it is an ARN prefix without the trailingtable/, the current branches produce malformed identifiers such as...:table/<existing>/table/<name>or.../table/<name>. Treat:table/<name>as terminal and only append when the input really ends with thetable/prefix. (docs.aws.amazon.com)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala` around lines 990 - 1000, getFullTableName currently appends "table/<name>" even when the provided ARN already contains a full table segment (":table/<name>"), producing duplicates; update BookkeeperDynamoDb.getFullTableName to first detect a terminal table ARN (e.g. matches pattern like ".*:table/<non-slash-chars>$" or contains ":table/" followed by a table name) and return it unchanged, otherwise keep the existing normalization logic: if arn ends with "table/" append tableName, else if arn ends with "/" append "table/<tableName>", else append "/table/<tableName>".
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala (1)
40-43: Keep the defaulttablePrefixin one place.
TokenLockFactoryDynamoDbhardcodes"pramen"while the builder usesBookkeeperDynamoDb.DEFAULT_TABLE_PREFIX. They match today, but the public constructor and builder will drift on the next constant change. Reuse the shared constant in both paths.Suggested fix
class TokenLockFactoryDynamoDb( dynamoDbClient: DynamoDbClient, tableArn: Option[String] = None, - tablePrefix: String = "pramen" + tablePrefix: String = BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX ) extends TokenLockFactory with AutoCloseable {Also applies to: 203-208
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala` around lines 40 - 43, The constructor for TokenLockFactoryDynamoDb currently hardcodes the default tablePrefix ("pramen") which can drift from the shared constant; change the default parameter to reuse BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX instead of the literal string and apply the same replacement where the literal is used around the TokenLockFactoryDynamoDb-related methods (e.g., other constructors/factory methods near the class, lines referencing tablePrefix) so both the public constructor and the builder consistently use BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 503-517: The deleteTable method in BookkeeperDynamoDb currently
removes bookkeeping, schema and offsets but omits metadata records, leaving
backend-specific state behind; update deleteTable (the method overriding
deleteTable) to also remove metadata records similar to BookkeeperJdbc by
invoking the DynamoDB metadata deletion routine (e.g., add a call like
deleteTableFromMetadata(tableName) or
MetadataManagerDynamoDb.deleteAllMetadata(tableName, dynamoDbClient)), capture
the returned count/result and append a corresponding message to the results
ListBuffer so the method reports metadata deletion alongside bookkeeping, schema
and offsets.
- Around line 515-516: The call in BookkeeperDynamoDb to
OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) uses the raw
default name and can delete the wrong namespace; resolve the actual offsets
table using the same resolution logic that considers tablePrefix and tableArn
and pass that resolved name into OffsetManagerDynamoDb.deleteAllOffsets instead.
Concretely, in BookkeeperDynamoDb replace the direct use of tableName with the
value produced by OffsetManagerDynamoDb's resolution helper (use the
resolveOffsetsTableName / equivalent that accepts tablePrefix and tableArn) and
then call OffsetManagerDynamoDb.deleteAllOffsets(resolvedTableName,
dynamoDbClient).
- Around line 89-102: The table-initialization has a race: replace the unchecked
createTable calls by catching AWS ResourceInUseException inside
createBookkeepingTable and createSchemaTable (the functions invoked after
tableExists), so when dynamoDbClient.createTable(...) throws
ResourceInUseException you swallow it and continue by calling
waitForTableActive(...) for that table; keep existing logging but ensure any
other exceptions still propagate. Use the existing symbols
createBookkeepingTable, createSchemaTable, tableExists and waitForTableActive to
locate where to add the try/catch around the createTable invocation.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Line 56: The constructor currently calls init() which can throw and leak the
builder-created DynamoDbClient; remove the init() call from the constructor and
instead invoke init() inside the build() method (for TokenLockFactoryDynamoDb
and the analogous builder at lines ~296-302), wrapping the init() call in a
try/catch so that if init() throws you close the DynamoDbClient only when it was
created by the builder (i.e., track whether the client was supplied externally
or created in build()), call client.close() on failure, and then rethrow the
exception; ensure externally supplied clients are never closed by this cleanup
path.
- Around line 126-147: The createLocksTable method can throw a
ResourceInUseException when another process is concurrently creating the same
DynamoDB table; update createLocksTable (in TokenLockFactoryDynamoDb) to catch
the AWS SDK's ResourceInUseException around
dynamoDbClient.createTable(createTableRequest), ignore that specific exception
and continue to call waitForTableActive(tableName), while rethrowing or letting
other exceptions propagate normally so only the "table already being
created/exists" case is swallowed.
- Around line 177-180: The current catch in TokenLockFactoryDynamoDb.scala
swallows all NonFatal exceptions during DescribeTable; change it to only catch
the AWS ResourceNotFoundException
(software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException) and
treat that as the transient case (log.warn and Thread.sleep to retry), while
immediately rethrowing any other exceptions (including NonFatal others) so
credential/permission/validation errors surface; update imports accordingly and
keep the surrounding retry loop logic intact.
---
Duplicate comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 990-1000: getFullTableName currently appends "table/<name>" even
when the provided ARN already contains a full table segment (":table/<name>"),
producing duplicates; update BookkeeperDynamoDb.getFullTableName to first detect
a terminal table ARN (e.g. matches pattern like ".*:table/<non-slash-chars>$" or
contains ":table/" followed by a table name) and return it unchanged, otherwise
keep the existing normalization logic: if arn ends with "table/" append
tableName, else if arn ends with "/" append "table/<tableName>", else append
"/table/<tableName>".
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 40-43: The constructor for TokenLockFactoryDynamoDb currently
hardcodes the default tablePrefix ("pramen") which can drift from the shared
constant; change the default parameter to reuse
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX instead of the literal string and apply
the same replacement where the literal is used around the
TokenLockFactoryDynamoDb-related methods (e.g., other constructors/factory
methods near the class, lines referencing tablePrefix) so both the public
constructor and the builder consistently use
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: de80f2a8-dbfe-428a-9935-893528d64f7e
📒 Files selected for processing (10)
pramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scalapramen/examples/dynamodb_bookkeeping/README.mdpramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf
✅ Files skipped from review due to trivial changes (3)
- pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf
- pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala
- pramen/examples/dynamodb_bookkeeping/README.md
🚧 Files skipped from review as they are similar to previous changes (4)
- pramen/core/pom.xml
- pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala
- pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala
- pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala
| // Initialize bookkeeping table | ||
| if (!tableExists(bookkeepingTableName)) { | ||
| log.info(s"Creating DynamoDB bookkeeping table: $bookkeepingTableName") | ||
| createBookkeepingTable(bookkeepingTableName) | ||
| log.info(s"Successfully created bookkeeping table: $bookkeepingTableName") | ||
| } else { | ||
| log.info(s"DynamoDB bookkeeping table already exists: $bookkeepingTableName") | ||
| } | ||
|
|
||
| // Initialize schema table | ||
| if (!tableExists(schemaTableName)) { | ||
| log.info(s"Creating DynamoDB schema table: $schemaTableName") | ||
| createSchemaTable(schemaTableName) | ||
| log.info(s"Successfully created schema table: $schemaTableName") |
There was a problem hiding this comment.
❓ Verification inconclusive
Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala | head -120Repository: AbsaOSS/pramen
Repository: AbsaOSS/pramen
Exit code: 0
stdout:
1 /*
2 * Copyright 2022 ABSA Group Limited
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package za.co.absa.pramen.core.bookkeeper
18
19 import org.apache.spark.sql.types.StructType
20 import org.slf4j.LoggerFactory
21 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
22 import software.amazon.awssdk.regions.Region
23 import software.amazon.awssdk.services.dynamodb.DynamoDbClient
24 import software.amazon.awssdk.services.dynamodb.model._
25 import za.co.absa.pramen.core.bookkeeper.model.DataAvailability
26 import za.co.absa.pramen.core.model.{DataChunk, TableSchema}
27 import za.co.absa.pramen.core.utils.{AlgorithmUtils, TimeUtils}
28
29 import java.net.URI
30 import java.time.LocalDate
31 import scala.collection.JavaConverters._
32 import scala.util.control.NonFatal
33
34 /**
35 * DynamoDB-based implementation of the Bookkeeper.
36 *
37 * Table schema for bookkeeping:
38 * - Partition key: tableName (String)
39 * - Sort key: infoDateSortKey (String in "yyyy-MM-dd#jobFinishedMillis" format)
40 * The composite sort key allows multiple entries for the same table and date.
41 *
42 * Table schema for schemas:
43 * - Partition key: tableName (String)
44 * - Sort key: infoDate (String in yyyy-MM-dd format)
45 *
46 * `@param` dynamoDbClient The DynamoDB client to use for operations
47 * `@param` batchId The batch ID for this execution
48 * `@param` tableArn Optional ARN prefix for DynamoDB tables (e.g., "arn:aws:dynamodb:region:account-id:table/")
49 * `@param` tablePrefix Prefix for table names to allow multiple bookkeeping sets in the same account (default: "pramen")
50 */
51 class BookkeeperDynamoDb private (
52 dynamoDbClient: DynamoDbClient,
53 batchId: Long,
54 tableArn: Option[String] = None,
55 tablePrefix: String = BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX
56 ) extends BookkeeperBase(isBookkeepingEnabled = true, batchId) {
57
58 import BookkeeperDynamoDb._
59
60 private val log = LoggerFactory.getLogger(this.getClass)
61 private val queryWarningTimeoutMs = 10000L
62
63 // Construct table names with prefix
64 private val bookkeepingTableBaseName = s"${tablePrefix}_$DEFAULT_BOOKKEEPING_TABLE"
65 private val schemaTableBaseName = s"${tablePrefix}_$DEFAULT_SCHEMA_TABLE"
66
67 // Full table names/ARNs
68 private val bookkeepingTableName = getFullTableName(tableArn, bookkeepingTableBaseName)
69 private val schemaTableName = getFullTableName(tableArn, schemaTableBaseName)
70
71 // Offset management
72 private val offsetManagement = new OffsetManagerCached(
73 new OffsetManagerDynamoDb(dynamoDbClient, batchId, tableArn, tablePrefix, closesClient = false)
74 )
75
76 // Initialize tables on construction
77 init()
78
79 override val bookkeepingEnabled: Boolean = true
80
81 /**
82 * Initializes the DynamoDB tables for bookkeeping and schemas.
83 * Checks if tables exist and creates them if they don't.
84 */
85 def init(): Unit = {
86 try {
87 log.info(s"Initializing DynamoDB bookkeeper with tables: bookkeeping='$bookkeepingTableName', schemas='$schemaTableName'")
88
89 // Initialize bookkeeping table
90 if (!tableExists(bookkeepingTableName)) {
91 log.info(s"Creating DynamoDB bookkeeping table: $bookkeepingTableName")
92 createBookkeepingTable(bookkeepingTableName)
93 log.info(s"Successfully created bookkeeping table: $bookkeepingTableName")
94 } else {
95 log.info(s"DynamoDB bookkeeping table already exists: $bookkeepingTableName")
96 }
97
98 // Initialize schema table
99 if (!tableExists(schemaTableName)) {
100 log.info(s"Creating DynamoDB schema table: $schemaTableName")
101 createSchemaTable(schemaTableName)
102 log.info(s"Successfully created schema table: $schemaTableName")
103 } else {
104 log.info(s"DynamoDB schema table already exists: $schemaTableName")
105 }
106
107 log.info(s"DynamoDB bookkeeper initialization complete")
108 } catch {
109 case NonFatal(ex) =>
110 log.error("Error initializing DynamoDB bookkeeper tables", ex)
111 throw new RuntimeException("Failed to initialize DynamoDB bookkeeper", ex)
112 }
113 }
114
115 /**
116 * Checks if a DynamoDB table exists.
117 *
118 * `@param` tableName The name of the table to check
119 * `@return` true if the table exists, false otherwise
120 */
Script executed:
# Find the implementations of createBookkeepingTable and createSchemaTable
rg -n "def createBookkeepingTable|def createSchemaTable|def tableExists|def waitForTableActive" pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala -A 15Repository: AbsaOSS/pramen
Repository: AbsaOSS/pramen
Exit code: 0
stdout:
121: private def tableExists(tableName: String): Boolean = {
122- try {
123- val describeRequest = DescribeTableRequest.builder()
124- .tableName(tableName)
125- .build()
126-
127- dynamoDbClient.describeTable(describeRequest)
128- true
129- } catch {
130- case _: ResourceNotFoundException => false
131- case NonFatal(ex) =>
132- log.warn(s"Error checking if table exists: $tableName", ex)
133- throw ex
134- }
135- }
136-
--
143: private def createBookkeepingTable(tableName: String): Unit = {
144- val createTableRequest = CreateTableRequest.builder()
145- .tableName(tableName)
146- .keySchema(
147- KeySchemaElement.builder()
148- .attributeName(ATTR_TABLE_NAME)
149- .keyType(KeyType.HASH)
150- .build(),
151- KeySchemaElement.builder()
152- .attributeName(ATTR_INFO_DATE_SORT_KEY)
153- .keyType(KeyType.RANGE)
154- .build()
155- )
156- .attributeDefinitions(
157- AttributeDefinition.builder()
158- .attributeName(ATTR_TABLE_NAME)
--
180: private def createSchemaTable(tableName: String): Unit = {
181- val createTableRequest = CreateTableRequest.builder()
182- .tableName(tableName)
183- .keySchema(
184- KeySchemaElement.builder()
185- .attributeName(ATTR_TABLE_NAME)
186- .keyType(KeyType.HASH)
187- .build(),
188- KeySchemaElement.builder()
189- .attributeName(ATTR_INFO_DATE)
190- .keyType(KeyType.RANGE)
191- .build()
192- )
193- .attributeDefinitions(
194- AttributeDefinition.builder()
195- .attributeName(ATTR_TABLE_NAME)
--
218: private def waitForTableActive(tableName: String, maxWaitSeconds: Int = 60): Unit = {
219- val startTime = System.currentTimeMillis()
220- val maxWaitMs = maxWaitSeconds * 1000L
221-
222- var tableActive = false
223- while (!tableActive && (System.currentTimeMillis() - startTime) < maxWaitMs) {
224- try {
225- val describeRequest = DescribeTableRequest.builder()
226- .tableName(tableName)
227- .build()
228-
229- val response = dynamoDbClient.describeTable(describeRequest)
230- val status = response.table().tableStatus()
231-
232- if (status == TableStatus.ACTIVE) {
233- tableActive = true
Script executed:
# Check if there are any try-catch blocks around the create calls
rg -n "try|catch|ResourceInUseException" pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Repository: AbsaOSS/pramen
Exit code: 0
stdout:
86: try {
108: } catch {
122: try {
129: } catch {
224: try {
239: } catch {
252: try {
288: } catch {
296: try {
320: } catch {
328: try {
360: } catch {
368: try {
386: } catch {
394: try {
406: } catch {
422: try {
438: } catch {
446: try {
483: try {
485: } catch {
496: } catch {
504: try {
521: } catch {
564: try {
567: } catch {
615: try {
618: } catch {
631: try {
656: } catch {
664: try {
678: } catch {
690: try {
694: } catch {
Script executed:
sed -n '143,179p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Repository: AbsaOSS/pramen
Exit code: 0
stdout:
private def createBookkeepingTable(tableName: String): Unit = {
val createTableRequest = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder()
.attributeName(ATTR_TABLE_NAME)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(ATTR_INFO_DATE_SORT_KEY)
.keyType(KeyType.RANGE)
.build()
)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(ATTR_TABLE_NAME)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(ATTR_INFO_DATE_SORT_KEY)
.attributeType(ScalarAttributeType.S)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
.build()
dynamoDbClient.createTable(createTableRequest)
// Wait for table to become active
waitForTableActive(tableName)
}
/**
* Creates the schema table with the appropriate schema.
*
* `@param` tableName The name of the table to create
*/
Script executed:
sed -n '180,217p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Repository: AbsaOSS/pramen
Exit code: 0
stdout:
private def createSchemaTable(tableName: String): Unit = {
val createTableRequest = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder()
.attributeName(ATTR_TABLE_NAME)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(ATTR_INFO_DATE)
.keyType(KeyType.RANGE)
.build()
)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(ATTR_TABLE_NAME)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(ATTR_INFO_DATE)
.attributeType(ScalarAttributeType.S)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
.build()
dynamoDbClient.createTable(createTableRequest)
// Wait for table to become active
waitForTableActive(tableName)
}
/**
* Waits for a table to become active after creation.
*
* `@param` tableName The name of the table to wait for
* `@param` maxWaitSeconds Maximum time to wait in seconds (default: 60)
*/
Catch ResourceInUseException during concurrent table initialization.
The check-then-create pattern on lines 90–92 and 99–101 has a race condition. When two jobs initialize the backend concurrently, both can pass the tableExists() check before either calls createTable(). The second job's createTable() call will then throw ResourceInUseException, which propagates uncaught to the outer try-catch in init() and causes initialization to fail even though the table already exists or is being created.
Wrap the dynamoDbClient.createTable() calls in createBookkeepingTable() and createSchemaTable() to catch ResourceInUseException, then proceed to waitForTableActive(). This follows DynamoDB's documented behavior where CreateTable is asynchronous and returns that exception when the table already exists or is already being created.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 89 - 102, The table-initialization has a race: replace the
unchecked createTable calls by catching AWS ResourceInUseException inside
createBookkeepingTable and createSchemaTable (the functions invoked after
tableExists), so when dynamoDbClient.createTable(...) throws
ResourceInUseException you swallow it and continue by calling
waitForTableActive(...) for that table; keep existing logging but ensure any
other exceptions still propagate. Use the existing symbols
createBookkeepingTable, createSchemaTable, tableExists and waitForTableActive to
locate where to add the try/catch around the createTable invocation.
| override def deleteTable(tableName: String): Seq[String] = { | ||
| try { | ||
| val results = scala.collection.mutable.ListBuffer[String]() | ||
|
|
||
| // Delete from bookkeeping table | ||
| val bookkeepingCount = deleteTableFromBookkeeping(tableName) | ||
| results += s"Deleted $bookkeepingCount bookkeeping records for table '$tableName'" | ||
|
|
||
| // Delete from schema table | ||
| val schemaCount = deleteTableFromSchemas(tableName) | ||
| results += s"Deleted $schemaCount schema records for table '$tableName'" | ||
|
|
||
| // Delete offsets | ||
| val offsetResults = OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) | ||
| results += s"Deleted $offsetResults offset records for table '$tableName'" |
There was a problem hiding this comment.
deleteTable() still leaves metadata behind.
This clears bookkeeping, schemas, and offsets only. The JDBC backend also clears metadata as part of the same operation, so Dynamo mode now leaves backend-specific state behind for the same logical table. Based on learnings, BookkeeperJdbc's deleteTable method intentionally spans BookkeepingRecords, SchemaRecords, OffsetRecords, and MetadataRecords, with partial deletion accepted for later retries.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 503 - 517, The deleteTable method in BookkeeperDynamoDb currently
removes bookkeeping, schema and offsets but omits metadata records, leaving
backend-specific state behind; update deleteTable (the method overriding
deleteTable) to also remove metadata records similar to BookkeeperJdbc by
invoking the DynamoDB metadata deletion routine (e.g., add a call like
deleteTableFromMetadata(tableName) or
MetadataManagerDynamoDb.deleteAllMetadata(tableName, dynamoDbClient)), capture
the returned count/result and append a corresponding message to the results
ListBuffer so the method reports metadata deletion alongside bookkeeping, schema
and offsets.
| // Delete offsets | ||
| val offsetResults = OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) |
There was a problem hiding this comment.
Delete offsets from the resolved DynamoDB table, not the default one.
Line 516 calls OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient), but that helper resolves the offset table with DEFAULT_TABLE_PREFIX and None for tableArn in pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala:610-650. With a custom tablePrefix or tableArn, deleteTable() will miss the real offsets and can delete another namespace's offsets instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 515 - 516, The call in BookkeeperDynamoDb to
OffsetManagerDynamoDb.deleteAllOffsets(tableName, dynamoDbClient) uses the raw
default name and can delete the wrong namespace; resolve the actual offsets
table using the same resolution logic that considers tablePrefix and tableArn
and pass that resolved name into OffsetManagerDynamoDb.deleteAllOffsets instead.
Concretely, in BookkeeperDynamoDb replace the direct use of tableName with the
value produced by OffsetManagerDynamoDb's resolution helper (use the
resolveOffsetsTableName / equivalent that accepts tablePrefix and tableArn) and
then call OffsetManagerDynamoDb.deleteAllOffsets(resolvedTableName,
dynamoDbClient).
| private val locksTableName = BookkeeperDynamoDb.getFullTableName(tableArn, locksTableBaseName) | ||
|
|
||
| // Initialize table on construction | ||
| init() |
There was a problem hiding this comment.
Close the builder-owned client when constructor-side init fails.
Line 56 performs network initialization during construction. If init() throws, build() leaks the freshly created DynamoDbClient. Handle the cleanup in build() rather than in the constructor so externally supplied clients are not closed unexpectedly.
Suggested fix
val client = clientBuilder.build()
- new TokenLockFactoryDynamoDb(
- dynamoDbClient = client,
- tableArn = tableArn,
- tablePrefix = tablePrefix
- )
+ try {
+ new TokenLockFactoryDynamoDb(
+ dynamoDbClient = client,
+ tableArn = tableArn,
+ tablePrefix = tablePrefix
+ )
+ } catch {
+ case NonFatal(ex) =>
+ try {
+ client.close()
+ } catch {
+ case NonFatal(closeEx) => ex.addSuppressed(closeEx)
+ }
+ throw ex
+ }Also applies to: 296-302
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
at line 56, The constructor currently calls init() which can throw and leak the
builder-created DynamoDbClient; remove the init() call from the constructor and
instead invoke init() inside the build() method (for TokenLockFactoryDynamoDb
and the analogous builder at lines ~296-302), wrapping the init() call in a
try/catch so that if init() throws you close the DynamoDbClient only when it was
created by the builder (i.e., track whether the client was supplied externally
or created in build()), call client.close() on failure, and then rethrow the
exception; ensure externally supplied clients are never closed by this cleanup
path.
| private def createLocksTable(tableName: String): Unit = { | ||
| val createTableRequest = CreateTableRequest.builder() | ||
| .tableName(tableName) | ||
| .keySchema( | ||
| KeySchemaElement.builder() | ||
| .attributeName(ATTR_TOKEN) | ||
| .keyType(KeyType.HASH) | ||
| .build() | ||
| ) | ||
| .attributeDefinitions( | ||
| AttributeDefinition.builder() | ||
| .attributeName(ATTR_TOKEN) | ||
| .attributeType(ScalarAttributeType.S) | ||
| .build() | ||
| ) | ||
| .billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing | ||
| .build() | ||
|
|
||
| dynamoDbClient.createTable(createTableRequest) | ||
|
|
||
| // Wait for table to become active | ||
| waitForTableActive(tableName) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 11989
🏁 Script executed:
# Check what exceptions are imported
rg "import.*exception|import.*Exception" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala -i
# Search for ResourceInUseException usage in the codebase
rg "ResourceInUseException" --type scala
# Search for other DynamoDB exception handling patterns
rg "catch.*Exception|catch.*:.*=>" pramen/core/src/main/scala -A 2 -B 2 --type scala | head -50Repository: AbsaOSS/pramen
Length of output: 40
🏁 Script executed:
# Search for all exception handling in DynamoDB-related files
find . -name "*.scala" -path "*/lock/*" -o -path "*/dynamodb/*" | xargs grep -l "catch" 2>/dev/null | head -5
# Look for other DynamoDB client usages
rg "dynamoDbClient\." pramen/core/src/main/scala -A 2 --type scala | head -40Repository: AbsaOSS/pramen
Length of output: 4149
🏁 Script executed:
# Check MetadataManagerDynamoDb for table creation exception handling
rg "createTable|ResourceInUseException|ResourceNotFoundException" pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala -B 3 -A 5 --type scala | head -60Repository: AbsaOSS/pramen
Length of output: 698
🌐 Web query:
AWS SDK v2 DynamoDB ResourceInUseException Java Scala exception
💡 Result:
In the AWS SDK for Java v2 (usable from Scala), ResourceInUseException from DynamoDB generally means the resource is not in a usable state yet—most commonly:
- You tried to create a table that already exists, or re-create a table name that’s still being deleted.
- You tried to delete or update a table while it’s in
CREATINGorUPDATING(or it’s stillDELETINGafterDeleteTable). [1], [3] - Less common: for on-demand tables, DynamoDB can return
ResourceInUseExceptionfor very high request rates on some data-plane operations. [1]
What to do (best practice)
- Wait for the table to reach the right state (instead of immediately retrying blindly). Use SDK v2 waiters:
waitUntilTableExistsafterCreateTablewaitUntilTableNotExistsafterDeleteTable(so you can safely re-create) [2]
Scala example (AWS SDK for Java v2)
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model._
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter
val ddb = DynamoDbClient.create()
val waiter = DynamoDbWaiter.create()
try {
ddb.createTable(CreateTableRequest.builder()
.tableName("my_table")
// ... key schema, attrs, billing, etc.
.build()
)
waiter.waitUntilTableExists(b => b.tableName("my_table")) // wait for ACTIVE
} catch {
case e: ResourceInUseException =>
// table name already in use / still deleting / table not in a valid state
// typically: describe + wait, or backoff + retry, depending on your workflow
throw e
}Quick diagnostic tip
Call DescribeTable and check tableStatus; if it’s CREATING, UPDATING, or DELETING, you need to wait before the next step. This is exactly the situation that triggers ResourceInUseException in many workflows. [1], [3]
Sources: DynamoDB error meaning and examples [1], SDK for Java v2 waiters (incl. DynamoDB table wait example) [2], explanation of failures when re-creating during deletion [3].
🏁 Script executed:
# Check if ResourceInUseException is available in the AWS SDK model package
rg "class ResourceInUseException|ResourceInUseException" pramen/ --type scala
# Look at the import statement more carefully
rg "software.amazon.awssdk.services.dynamodb.model" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 113
🏁 Script executed:
# Check the builder.build() method for exception handling
sed -n '282,303p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 654
Catch ResourceInUseException to handle concurrent table creation.
CreateTable is asynchronous, and AWS DynamoDB throws ResourceInUseException when attempting to create a table that already exists or is still in a transient state. With the current check-then-create pattern (lines 83–85), concurrent initializers can trigger this exception and abort startup even though the table is already being created or exists. Catch the exception and proceed to waitForTableActive(tableName) to allow the waiter to detect the table's ACTIVE state.
Suggested fix
private def createLocksTable(tableName: String): Unit = {
val createTableRequest = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder()
.attributeName(ATTR_TOKEN)
.keyType(KeyType.HASH)
.build()
)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(ATTR_TOKEN)
.attributeType(ScalarAttributeType.S)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing
.build()
- dynamoDbClient.createTable(createTableRequest)
+ try {
+ dynamoDbClient.createTable(createTableRequest)
+ } catch {
+ case _: ResourceInUseException =>
+ log.info(s"DynamoDB locks table is already being created: $tableName")
+ }
// Wait for table to become active
waitForTableActive(tableName)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 126 - 147, The createLocksTable method can throw a
ResourceInUseException when another process is concurrently creating the same
DynamoDB table; update createLocksTable (in TokenLockFactoryDynamoDb) to catch
the AWS SDK's ResourceInUseException around
dynamoDbClient.createTable(createTableRequest), ignore that specific exception
and continue to call waitForTableActive(tableName), while rethrowing or letting
other exceptions propagate normally so only the "table already being
created/exists" case is swallowed.
| } catch { | ||
| case NonFatal(ex) => | ||
| log.warn(s"Error checking table status for $tableName", ex) | ||
| Thread.sleep(2000) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's see the full method context around lines 177-180
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala | sed -n '160,200p'Repository: AbsaOSS/pramen
Length of output: 1641
🏁 Script executed:
# Let's also check the imports to understand what AWS SDK is being used
head -50 pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1862
🏁 Script executed:
# Check for any ResourceNotFoundException imports or usage in the file
rg "ResourceNotFoundException|import.*exception|catch.*Exception" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala -A 2 -B 2Repository: AbsaOSS/pramen
Length of output: 226
🏁 Script executed:
# Check what DynamoDB exceptions are available in AWS SDK v2
web_search "AWS SDK v2 DynamoDB DescribeTable transient exceptions"Repository: AbsaOSS/pramen
Length of output: 108
🏁 Script executed:
# Also verify what exceptions are thrown by describeTable in AWS SDK v2
web_search "software.amazon.awssdk DynamoDB describeTable ResourceNotFoundException"Repository: AbsaOSS/pramen
Length of output: 108
🏁 Script executed:
# Check if there are any other exception catches or handling patterns in the file
rg "case.*:" pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala | head -20Repository: AbsaOSS/pramen
Length of output: 106
🏁 Script executed:
# Verify the full context of the method signature and where it's called
sed -n '155,187p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1194
🌐 Web query:
AWS SDK 2.0 Java DynamoDB DescribeTable ResourceNotFoundException exception
💡 Result:
In AWS SDK for Java v2, DynamoDbClient.describeTable(...) can throw ResourceNotFoundException for a few common reasons:
- You’re calling
DescribeTabletoo soon afterCreateTable
DescribeTableis eventually consistent; right after creating a table, DynamoDB may temporarily returnResourceNotFoundException. Wait a few seconds and retry, or use the SDK waiter. [1]
Example (SDK v2 waiter):
dynamoDbClient.waiter().waitUntilTableExists(b -> b.tableName(tableName));The enhanced client docs explicitly recommend waitUntilTableExists(...) because table creation is asynchronous. [2]
-
Wrong AWS Region and/or AWS account (credentials)
DynamoDB tables are regional. If your client is configured for the wrong region or is using credentials for a different account, the table will “not exist” from that client’s perspective, resulting inResourceNotFoundException. AWS’s guidance is to confirm the correct Account and Region when you see this error. [3] -
Resource name/identifier mismatch
DescribeTablerequiresTableNameand AWS notes you can also pass the table ARN there (works in AWS, but some local emulators may not fully support ARNs). [1] Ensure the exact table name/ARN you pass matches what exists in that region/account. -
Table exists but isn’t
ACTIVEyet
The SDK model forResourceNotFoundExceptionnotes the resource may be unspecified/incorrect or its status may not beACTIVE. [4]
If you paste your DynamoDbClient builder (region/endpoint/credentials) and the exact table name (and whether you’re using AWS, LocalStack, or dynamodb-local), I can pinpoint which of the above is most likely in your case.
Sources: [1] [2] [3] [4]
🌐 Web query:
AWS DynamoDB DescribeTable immediate post-CreateTable visibility delay ResourceNotFoundException
💡 Result:
Yes—this can be expected behavior in DynamoDB.
CreateTableis asynchronous: it returns immediately withTableStatus = CREATING, and only later transitions toACTIVE(at which point the table is fully ready) [1].DescribeTableis eventually consistent for metadata: if you callDescribeTabletoo soon afterCreateTable, DynamoDB can returnResourceNotFoundExceptionbecause the table metadata may not be visible yet [2]. (Even after it becomes visible, updated values may lag briefly for the same reason [2].)
What to do (recommended)
Wait/retry until the table exists and becomes ACTIVE, instead of calling DescribeTable just once.
-
AWS CLI:
aws dynamodb wait table-exists --table-name MyTableThis polls
DescribeTableuntilTableStatusisACTIVE[3]. -
Boto3 (Python):
waiter = dynamodb_client.get_waiter('table_exists') waiter.wait(TableName='MyTable')
This polls
describe_table()until successful (or times out) [4].
If it keeps happening “too long”
Also confirm you’re using the same AWS account and region for both calls (a region mismatch also produces ResourceNotFoundException, but that’s not a “delay” issue).
Sources
- AWS DynamoDB API Reference –
CreateTablereturns immediately withTableStatus=CREATING, laterACTIVE[1] - DynamoDB Developer Guide / API docs –
DescribeTableis eventually consistent; too-early calls can returnResourceNotFoundException[2] - AWS CLI
wait table-exists– waits untilDescribeTableshowsACTIVE[3] - Boto3 waiter
TableExists– pollsdescribe_table()until the table exists [4]
Retry only the transient ResourceNotFoundException you expect after CreateTable.
AWS documents that DescribeTable is eventually consistent and can return ResourceNotFoundException immediately after CreateTable before table metadata becomes visible. However, catching all NonFatal exceptions here will also swallow unrelated errors—such as credential issues, access denied, or validation errors—and turn them into generic timeouts, making the real startup error much harder to diagnose. Catch ResourceNotFoundException specifically to retry the expected transient case, and rethrow other failures immediately. (AWS DynamoDB API Reference)
Suggested fix
} catch {
- case NonFatal(ex) =>
- log.warn(s"Error checking table status for $tableName", ex)
- Thread.sleep(2000)
+ case _: ResourceNotFoundException =>
+ log.debug(s"Table $tableName metadata is not visible yet, waiting...")
+ Thread.sleep(2000)
+ case NonFatal(ex) =>
+ throw ex
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } catch { | |
| case NonFatal(ex) => | |
| log.warn(s"Error checking table status for $tableName", ex) | |
| Thread.sleep(2000) | |
| } catch { | |
| case _: ResourceNotFoundException => | |
| log.debug(s"Table $tableName metadata is not visible yet, waiting...") | |
| Thread.sleep(2000) | |
| case NonFatal(ex) => | |
| throw ex | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`
around lines 177 - 180, The current catch in TokenLockFactoryDynamoDb.scala
swallows all NonFatal exceptions during DescribeTable; change it to only catch
the AWS ResourceNotFoundException
(software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException) and
treat that as the transient case (log.warn and Thread.sleep to retry), while
immediately rethrowing any other exceptions (including NonFatal others) so
credential/permission/validation errors surface; update imports accordingly and
keep the surrounding retry loop logic intact.
…ger and OffsetManager traits to ensure the logic is defined by the implementation.
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala (2)
129-140:⚠️ Potential issue | 🟠 MajorScan pagination is missing.
scan()only returns one page. Once the journal exceeds a single page,getEntries()silently drops the remaining matching items. Iterate onlastEvaluatedKey()until exhaustion, or switch this to a paginatedqueryon a time-based index.Suggested fallback if you keep Scan for now
- val result = dynamoDbClient.scan(scanRequest) - - result.items().asScala.map { item => + `@annotation.tailrec` + def scanAll( + startKey: java.util.Map[String, AttributeValue], + acc: Vector[java.util.Map[String, AttributeValue]] + ): Vector[java.util.Map[String, AttributeValue]] = { + val requestBuilder = ScanRequest.builder() + .tableName(journalTableFullName) + .filterExpression(s"${JournalDynamoDB.ATTR_FINISHED_AT} >= :from_time AND ${JournalDynamoDB.ATTR_FINISHED_AT} <= :to_time") + .expressionAttributeValues(Map( + ":from_time" -> AttributeValue.builder().n(fromSec.toString).build(), + ":to_time" -> AttributeValue.builder().n(toSec.toString).build() + ).asJava) + + Option(startKey).filterNot(_.isEmpty).foreach(requestBuilder.exclusiveStartKey) + + val response = dynamoDbClient.scan(requestBuilder.build()) + val nextAcc = acc ++ response.items().asScala + val nextKey = response.lastEvaluatedKey() + + if (nextKey == null || nextKey.isEmpty) nextAcc + else scanAll(nextKey, nextAcc) + } + + scanAll(null, Vector.empty).map { item =>🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala` around lines 129 - 140, The ScanRequest usage in getEntries() (building ScanRequest with tableName=journalTableFullName and filterExpression using ATTR_FINISHED_AT) calls dynamoDbClient.scan once and ignores pagination, so results beyond the first page are dropped; fix by iterating using the LastEvaluatedKey from the ScanResponse (or switch to a Query on a time-based index) — repeatedly call dynamoDbClient.scan with ExclusiveStartKey until lastEvaluatedKey is empty, accumulating items into the result list returned by getEntries(), ensuring expressionAttributeValues and the same filterExpression are reused for each paginated request.
79-82:⚠️ Potential issue | 🔴 CriticalComposite key can overwrite distinct completions.
This table is keyed only by
(jobName, finishedAt), so two runs of the same job finishing in the same second collapse to the same item andputItem()replaces the earlier entry. It also leavesgetEntries()without a key shape that supports cross-job time-range reads efficiently. Add a unique per-entry key component and index the time field for range queries.Also applies to: 110-115, 204-228
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala` around lines 79 - 82, The current primary key uses only JournalDynamoDB.ATTR_JOB_NAME and JournalDynamoDB.ATTR_FINISHED_AT which allows collisions when two runs finish in the same second and prevents efficient cross-job time-range queries; change the item key to include a unique per-entry component (e.g., add JournalDynamoDB.ATTR_ENTRY_ID and make the primary key (ATTR_JOB_NAME, ATTR_ENTRY_ID) or append a high-resolution unique suffix to ATTR_FINISHED_AT) so putItem() won’t overwrite concurrent entries, and update getEntries() to read using a GSI on JournalDynamoDB.ATTR_FINISHED_AT (create a GSI that projects the attributes needed) to support efficient cross-job range reads; adjust any code that builds items (where ATTR_JOB_NAME and ATTR_FINISHED_AT are set) and any DynamoDB key construction logic to include the new ATTR_ENTRY_ID and GSI usage.pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala (1)
84-103:⚠️ Potential issue | 🟠 MajorStill missing DynamoDB query pagination.
Lines 96 and 165 each execute a single
QueryRequestand ignorelastEvaluatedKey. Once a(tableName, infoDate)partition grows past DynamoDB's 1 MB page boundary, bulk reads return incomplete metadata and bulk deletes leave stale rows behind.Also applies to: 152-171
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala` around lines 84 - 103, getMetadataFromStorage in MetadataManagerDynamoDb performs a single QueryRequest and ignores pagination; update it to page through DynamoDB results by looping: make the QueryRequest (the one built with QueryRequest.builder() and used with dynamoDbClient.query), collect items into a mutable buffer, check the response.lastEvaluatedKey()/isEmpty, and if present set ExclusiveStartKey on the next QueryRequest and repeat until no more pages, then map accumulated items to the Map[String, MetadataValue]; apply the same pagination loop fix to the other QueryRequest usage around the delete/bulk-read logic referenced near the second dynamoDbClient.query (around the code at the other occurrence) so both reads handle lastEvaluatedKey correctly.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala (3)
135-138:⚠️ Potential issue | 🟠 MajorMillisecond precision in composite key can cause collisions for concurrent writes.
When two
startWriteOffsetscalls occur for the same table andinfoDatewithin the same millisecond, they generate identicalcompositeKeyvalues (infoDate#createdAtMilli). The secondPutItemsilently overwrites the first, causing data loss. Consider appending a UUID suffix or using the full nanosecond timestamp.🔧 Proposed fix to ensure uniqueness
+import java.util.UUID + override def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = { val createdAt = Instant.now() val createdAtMilli = createdAt.toEpochMilli - val compositeKey = s"${infoDate.toString}#${createdAtMilli}" + val uniqueSuffix = UUID.randomUUID().toString.take(8) + val compositeKey = s"${infoDate.toString}#${createdAtMilli}#${uniqueSuffix}"Note: This requires corresponding changes to
commitOffsets,commitRerun, androllbackOffsetsto reconstruct the key consistently, likely by storing the fullcompositeKeyinDataOffsetRequestor by adding the suffix tocreatedAt.,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala` around lines 135 - 138, startWriteOffsets currently builds compositeKey using infoDate and millisecond precision createdAtMilli which can collide; change compositeKey generation in startWriteOffsets to include a uniqueness suffix (e.g., append a UUID or use nanosecond timestamp) and propagate that full compositeKey into the returned DataOffsetRequest so downstream methods can reconstruct keys reliably. Update commitOffsets, commitRerun, and rollbackOffsets to read the stored compositeKey from DataOffsetRequest (or use the stored unique createdAt) when performing Put/Get/Delete operations so they use the exact same key format; ensure DataOffsetRequest contains the new field (fullCompositeKey or uniqueCreatedAt) and update all usages accordingly.
313-334:⚠️ Potential issue | 🟠 MajorMissing pagination in
getOffsetRecordscan silently drop results.This method doesn't handle DynamoDB's 1MB result limit. When a partition exceeds this threshold, records beyond the first page are silently omitted. This affects multiple callers:
getOffsets,getUncommittedOffsets,commitRerun,postCommittedRecords, andgetMinMaxOffsets.Additionally, using
filterExpressiononinfoDateis inefficient since the sort key isinfoDate#createdAtMilli. Usingbegins_withon the sort key would reduce data transfer.🔧 Proposed fix with pagination and efficient key condition
private[core] def getOffsetRecords(table: String, infoDate: LocalDate): Array[OffsetRecord] = { try { - val queryRequest = QueryRequest.builder() - .tableName(offsetTableFullName) - .keyConditionExpression(s"${ATTR_PRAMEN_TABLE_NAME} = :table_name") - .filterExpression(s"${ATTR_INFO_DATE} = :info_date") - .expressionAttributeValues(Map( - ":table_name" -> AttributeValue.builder().s(table).build(), - ":info_date" -> AttributeValue.builder().s(infoDate.toString).build() - ).asJava) - .build() - - val result = dynamoDbClient.query(queryRequest) - - result.items().asScala - .map(itemToOffsetRecord) - .toArray + var allItems = Seq.empty[java.util.Map[String, AttributeValue]] + var lastEvaluatedKey: java.util.Map[String, AttributeValue] = null + val infoDatePrefix = s"${infoDate.toString}#" + + do { + val queryRequestBuilder = QueryRequest.builder() + .tableName(offsetTableFullName) + .keyConditionExpression(s"$ATTR_PRAMEN_TABLE_NAME = :table_name AND begins_with($ATTR_COMPOSITE_KEY, :prefix)") + .expressionAttributeValues(Map( + ":table_name" -> AttributeValue.builder().s(table).build(), + ":prefix" -> AttributeValue.builder().s(infoDatePrefix).build() + ).asJava) + + if (lastEvaluatedKey != null) { + queryRequestBuilder.exclusiveStartKey(lastEvaluatedKey) + } + + val result = dynamoDbClient.query(queryRequestBuilder.build()) + allItems = allItems ++ result.items().asScala + lastEvaluatedKey = result.lastEvaluatedKey() + } while (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty) + + allItems.map(itemToOffsetRecord).toArray } catch {,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala` around lines 313 - 334, getOffsetRecords currently issues a single QueryRequest which can miss results due to DynamoDB pagination and inefficiently uses filterExpression on ATTR_INFO_DATE; update getOffsetRecords to (1) use a keyConditionExpression combining ATTR_PRAMEN_TABLE_NAME = :table_name and begins_with(sortKeyName, :info_date_prefix) (where sortKeyName is the actual sort key like infoDate#createdAtMilli and :info_date_prefix is infoDate.toString) instead of filterExpression, (2) page through results using dynamoDbClient.queryPaginator or loop on QueryResponse.lastEvaluatedKey, collecting and mapping all pages via itemToOffsetRecord into a single Array, and (3) preserve current error handling and context (use offsetTableFullName in error messages) so callers like getOffsets, getUncommittedOffsets, commitRerun, postCommittedRecords, and getMinMaxOffsets receive complete results.
339-366:⚠️ Potential issue | 🟠 MajorMissing pagination in
getMaximumInfoDatecan return incorrect results.This method queries all items for a partition but doesn't handle DynamoDB's pagination. If there are more than 1MB of records, only the first page is processed, potentially returning an incorrect maximum date.
🔧 Proposed fix with pagination
private[core] def getMaximumInfoDate(table: String): Option[LocalDate] = { try { - val queryRequest = QueryRequest.builder() - .tableName(offsetTableFullName) - .keyConditionExpression(s"${ATTR_PRAMEN_TABLE_NAME} = :table_name") - .expressionAttributeValues(Map( - ":table_name" -> AttributeValue.builder().s(table).build() - ).asJava) - .projectionExpression(ATTR_INFO_DATE) - .build() - - val result = dynamoDbClient.query(queryRequest) - - if (result.items().isEmpty) { - None - } else { - // Use maxBy with compareTo to avoid needing implicit Ordering - val maxInfoDate = result.items().asScala - .map(item => LocalDate.parse(item.get(ATTR_INFO_DATE).s())) - .maxBy(_.toEpochDay) - - Some(maxInfoDate) - } + var maxDate: Option[LocalDate] = None + var lastEvaluatedKey: java.util.Map[String, AttributeValue] = null + + do { + val queryRequestBuilder = QueryRequest.builder() + .tableName(offsetTableFullName) + .keyConditionExpression(s"$ATTR_PRAMEN_TABLE_NAME = :table_name") + .expressionAttributeValues(Map( + ":table_name" -> AttributeValue.builder().s(table).build() + ).asJava) + .projectionExpression(ATTR_INFO_DATE) + + if (lastEvaluatedKey != null) { + queryRequestBuilder.exclusiveStartKey(lastEvaluatedKey) + } + + val result = dynamoDbClient.query(queryRequestBuilder.build()) + + if (!result.items().isEmpty) { + val pageMax = result.items().asScala + .map(item => LocalDate.parse(item.get(ATTR_INFO_DATE).s())) + .maxBy(_.toEpochDay) + maxDate = Some(maxDate.map(d => if (d.toEpochDay > pageMax.toEpochDay) d else pageMax).getOrElse(pageMax)) + } + + lastEvaluatedKey = result.lastEvaluatedKey() + } while (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty) + + maxDate } catch {,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala` around lines 339 - 366, getMaximumInfoDate currently only reads the first DynamoDB query page and can miss later items; update getMaximumInfoDate to paginate through all query pages (either via dynamoDbClient.queryPaginator(queryRequest) or a loop using QueryRequest with ExclusiveStartKey and checking QueryResponse.lastEvaluatedKey) and compute the maximum LocalDate across all pages (parsing ATTR_INFO_DATE from each item) before returning Some(max) or None when no items; keep existing error handling and use the same ATTR_INFO_DATE, offsetTableFullName, and dynamoDbClient symbols to locate and modify the logic.
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala (1)
67-74: Clarify client ownership semantics.The
close()method unconditionally closes thedynamoDbClient, but the class accepts an externally-provided client via its constructor. If a caller passes a shared client (not created by the builder),close()will shut it down unexpectedly.Consider either:
- Tracking whether the client was builder-created and only closing owned clients, or
- Documenting that the factory takes ownership of the client and callers must not share it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala` around lines 67 - 74, The close() method in TokenLockFactoryDynamoDb currently always closes dynamoDbClient which can shutdown externally-provided/shared clients; change the constructor and class to track client ownership (e.g. add a boolean flag like clientOwned or wasClientCreated) and only call dynamoDbClient.close() in close() when that flag is true, or alternatively update the class API/docs to clearly state that TokenLockFactoryDynamoDb takes ownership of the provided dynamoDbClient and callers must not share it; touch the constructor and the close() method (and any builder that creates the client) to set the ownership flag accordingly so only internally-created clients are closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 576-579: The deleteAllOffsets method currently hardcodes
DEFAULT_TABLE_PREFIX and passes None for tableArn, which can target the wrong
table; change the signature of deleteAllOffsets to accept tableArn:
Option[String] and tablePrefix: String (or an Option[String] if you prefer) and
use those values when building offsetTableBaseName and calling
BookkeeperDynamoDb.getFullTableName instead of DEFAULT_TABLE_PREFIX/None; update
all call sites to pass the instance's tableArn and tablePrefix (or defaults) so
deleteAllOffsets operates on the intended table (refer to the deleteAllOffsets
method and BookkeeperDynamoDb.getFullTableName symbols).
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 109-119: The catch blocks around dynamoDbClient.putItem (the
NonFatal(ex) that currently calls log.error) must not swallow failures: change
the write path in JournalDynamoDB (the PutItemRequest / dynamoDbClient.putItem
block) to log the error and then propagate it (e.g., rethrow the exception or
return a failed Try/Either/Left) so callers can detect write failures; do the
same for the corresponding read path mentioned (the similar NonFatal at lines
173-176) so a failed read is distinguishable from "no entries"—use a consistent
failure type (e.g., JournalException or scala.util.Failure/Left) and update
method signatures/return types if necessary so callers can react.
- Around line 191-195: The TOCTOU between describeTable() and
createJournalTable() causes failures when two processes race to create the
table; update createJournalTable() (and any other create-table call sites around
JournalDynamoDB.describeTable/createJournalTable) to catch
ResourceInUseException and, instead of failing, poll DescribeTable until the
table status is ACTIVE (with backoff/retries and timeout) before returning;
leave the existing ResourceNotFoundException handling in describeTable() but
ensure createJournalTable() swallows ResourceInUseException and waits for ACTIVE
so concurrent bootstraps succeed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 298-317: The builder currently creates a DynamoDbClient (client)
and then calls new MetadataManagerDynamoDb(...), but if the constructor (which
runs DescribeTable/CreateTable) throws, the client is leaked; modify build() to
ensure the DynamoDbClient is closed on failure: wrap the construction of
MetadataManagerDynamoDb in a try { new MetadataManagerDynamoDb(dynamoDbClient =
client, tableArn = tableArn, tablePrefix = tablePrefix) } catch { case e =>
client.close(); throw e } (or equivalent finally logic) so the client is closed
if the constructor throws while still returning the manager when successful.
- Around line 181-230: The code assumes a successful DescribeTable means the
table is usable, but DynamoDB may return CREATING; update
createMetadataTableIfNotExists to call describeTable and inspect the table
status (from the DescribeTableResponse) and if it's not ACTIVE call
waitForTableActive(metadataTableFullName, dynamoDbClient) before proceeding;
also update createMetadataTable() to catch ResourceInUseException (another
process created the table) and in that catch call
waitForTableActive(metadataTableFullName, dynamoDbClient) instead of failing so
concurrent startups wait for ACTIVE state. Ensure you reference the methods
createMetadataTableIfNotExists, createMetadataTable, and waitForTableActive in
your changes.
---
Duplicate comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 135-138: startWriteOffsets currently builds compositeKey using
infoDate and millisecond precision createdAtMilli which can collide; change
compositeKey generation in startWriteOffsets to include a uniqueness suffix
(e.g., append a UUID or use nanosecond timestamp) and propagate that full
compositeKey into the returned DataOffsetRequest so downstream methods can
reconstruct keys reliably. Update commitOffsets, commitRerun, and
rollbackOffsets to read the stored compositeKey from DataOffsetRequest (or use
the stored unique createdAt) when performing Put/Get/Delete operations so they
use the exact same key format; ensure DataOffsetRequest contains the new field
(fullCompositeKey or uniqueCreatedAt) and update all usages accordingly.
- Around line 313-334: getOffsetRecords currently issues a single QueryRequest
which can miss results due to DynamoDB pagination and inefficiently uses
filterExpression on ATTR_INFO_DATE; update getOffsetRecords to (1) use a
keyConditionExpression combining ATTR_PRAMEN_TABLE_NAME = :table_name and
begins_with(sortKeyName, :info_date_prefix) (where sortKeyName is the actual
sort key like infoDate#createdAtMilli and :info_date_prefix is
infoDate.toString) instead of filterExpression, (2) page through results using
dynamoDbClient.queryPaginator or loop on QueryResponse.lastEvaluatedKey,
collecting and mapping all pages via itemToOffsetRecord into a single Array, and
(3) preserve current error handling and context (use offsetTableFullName in
error messages) so callers like getOffsets, getUncommittedOffsets, commitRerun,
postCommittedRecords, and getMinMaxOffsets receive complete results.
- Around line 339-366: getMaximumInfoDate currently only reads the first
DynamoDB query page and can miss later items; update getMaximumInfoDate to
paginate through all query pages (either via
dynamoDbClient.queryPaginator(queryRequest) or a loop using QueryRequest with
ExclusiveStartKey and checking QueryResponse.lastEvaluatedKey) and compute the
maximum LocalDate across all pages (parsing ATTR_INFO_DATE from each item)
before returning Some(max) or None when no items; keep existing error handling
and use the same ATTR_INFO_DATE, offsetTableFullName, and dynamoDbClient symbols
to locate and modify the logic.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 129-140: The ScanRequest usage in getEntries() (building
ScanRequest with tableName=journalTableFullName and filterExpression using
ATTR_FINISHED_AT) calls dynamoDbClient.scan once and ignores pagination, so
results beyond the first page are dropped; fix by iterating using the
LastEvaluatedKey from the ScanResponse (or switch to a Query on a time-based
index) — repeatedly call dynamoDbClient.scan with ExclusiveStartKey until
lastEvaluatedKey is empty, accumulating items into the result list returned by
getEntries(), ensuring expressionAttributeValues and the same filterExpression
are reused for each paginated request.
- Around line 79-82: The current primary key uses only
JournalDynamoDB.ATTR_JOB_NAME and JournalDynamoDB.ATTR_FINISHED_AT which allows
collisions when two runs finish in the same second and prevents efficient
cross-job time-range queries; change the item key to include a unique per-entry
component (e.g., add JournalDynamoDB.ATTR_ENTRY_ID and make the primary key
(ATTR_JOB_NAME, ATTR_ENTRY_ID) or append a high-resolution unique suffix to
ATTR_FINISHED_AT) so putItem() won’t overwrite concurrent entries, and update
getEntries() to read using a GSI on JournalDynamoDB.ATTR_FINISHED_AT (create a
GSI that projects the attributes needed) to support efficient cross-job range
reads; adjust any code that builds items (where ATTR_JOB_NAME and
ATTR_FINISHED_AT are set) and any DynamoDB key construction logic to include the
new ATTR_ENTRY_ID and GSI usage.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 84-103: getMetadataFromStorage in MetadataManagerDynamoDb performs
a single QueryRequest and ignores pagination; update it to page through DynamoDB
results by looping: make the QueryRequest (the one built with
QueryRequest.builder() and used with dynamoDbClient.query), collect items into a
mutable buffer, check the response.lastEvaluatedKey()/isEmpty, and if present
set ExclusiveStartKey on the next QueryRequest and repeat until no more pages,
then map accumulated items to the Map[String, MetadataValue]; apply the same
pagination loop fix to the other QueryRequest usage around the delete/bulk-read
logic referenced near the second dynamoDbClient.query (around the code at the
other occurrence) so both reads handle lastEvaluatedKey correctly.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 67-74: The close() method in TokenLockFactoryDynamoDb currently
always closes dynamoDbClient which can shutdown externally-provided/shared
clients; change the constructor and class to track client ownership (e.g. add a
boolean flag like clientOwned or wasClientCreated) and only call
dynamoDbClient.close() in close() when that flag is true, or alternatively
update the class API/docs to clearly state that TokenLockFactoryDynamoDb takes
ownership of the provided dynamoDbClient and callers must not share it; touch
the constructor and the close() method (and any builder that creates the client)
to set the ownership flag accordingly so only internally-created clients are
closed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e94bf69d-170a-4827-96a5-61085b0e0c62
📒 Files selected for processing (5)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala
| def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = { | ||
| val log = LoggerFactory.getLogger(this.getClass) | ||
| val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}" | ||
| val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName) |
There was a problem hiding this comment.
deleteAllOffsets ignores tableArn, may target wrong table.
This static method hardcodes DEFAULT_TABLE_PREFIX and passes None for tableArn (line 579). If the OffsetManagerDynamoDb instance was created with a custom tableArn or tablePrefix, this method will attempt to delete from a different table than expected.
Consider accepting tableArn and tablePrefix as parameters:
🔧 Proposed fix
-def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = {
+def deleteAllOffsets(
+ tableName: String,
+ dynamoDbClient: DynamoDbClient,
+ tableArn: Option[String] = None,
+ tablePrefix: String = DEFAULT_TABLE_PREFIX
+): Int = {
val log = LoggerFactory.getLogger(this.getClass)
- val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}"
- val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName)
+ val offsetTableBaseName = s"${tablePrefix}_${DEFAULT_OFFSET_TABLE}"
+ val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, offsetTableBaseName)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def deleteAllOffsets(tableName: String, dynamoDbClient: DynamoDbClient): Int = { | |
| val log = LoggerFactory.getLogger(this.getClass) | |
| val offsetTableBaseName = s"${DEFAULT_TABLE_PREFIX}_${DEFAULT_OFFSET_TABLE}" | |
| val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(None, offsetTableBaseName) | |
| def deleteAllOffsets( | |
| tableName: String, | |
| dynamoDbClient: DynamoDbClient, | |
| tableArn: Option[String] = None, | |
| tablePrefix: String = DEFAULT_TABLE_PREFIX | |
| ): Int = { | |
| val log = LoggerFactory.getLogger(this.getClass) | |
| val offsetTableBaseName = s"${tablePrefix}_${DEFAULT_OFFSET_TABLE}" | |
| val offsetTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, offsetTableBaseName) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 576 - 579, The deleteAllOffsets method currently hardcodes
DEFAULT_TABLE_PREFIX and passes None for tableArn, which can target the wrong
table; change the signature of deleteAllOffsets to accept tableArn:
Option[String] and tablePrefix: String (or an Option[String] if you prefer) and
use those values when building offsetTableBaseName and calling
BookkeeperDynamoDb.getFullTableName instead of DEFAULT_TABLE_PREFIX/None; update
all call sites to pass the instance's tableArn and tablePrefix (or defaults) so
deleteAllOffsets operates on the intended table (refer to the deleteAllOffsets
method and BookkeeperDynamoDb.getFullTableName symbols).
| try { | ||
| val putRequest = PutItemRequest.builder() | ||
| .tableName(journalTableFullName) | ||
| .item(itemBuilder.result().asJava) | ||
| .build() | ||
|
|
||
| dynamoDbClient.putItem(putRequest) | ||
| } catch { | ||
| case NonFatal(ex) => | ||
| log.error(s"Unable to write to the journal table '$journalTableFullName'.", ex) | ||
| } |
There was a problem hiding this comment.
Don't turn DynamoDB failures into success or empty history.
Both paths swallow NonFatal and continue. A failed write becomes silent journal loss, and a failed read is indistinguishable from “no entries in range”. For bookkeeping storage, this should bubble up or return an explicit failure so callers can react.
Also applies to: 173-176
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 109 - 119, The catch blocks around dynamoDbClient.putItem (the
NonFatal(ex) that currently calls log.error) must not swallow failures: change
the write path in JournalDynamoDB (the PutItemRequest / dynamoDbClient.putItem
block) to log the error and then propagate it (e.g., rethrow the exception or
return a failed Try/Either/Left) so callers can detect write failures; do the
same for the corresponding read path mentioned (the similar NonFatal at lines
173-176) so a failed read is distinguishable from "no entries"—use a consistent
failure type (e.g., JournalException or scala.util.Failure/Left) and update
method signatures/return types if necessary so callers can react.
| } catch { | ||
| case _: ResourceNotFoundException => | ||
| log.info(s"Creating journal table '$journalTableFullName'") | ||
| createJournalTable() | ||
| case NonFatal(ex) => |
There was a problem hiding this comment.
Handle the create-table startup race.
The describeTable() → createTable() flow is TOCTOU-racy. If two processes bootstrap together, both can observe ResourceNotFoundException; one creates the table, and the other fails on ResourceInUseException even though the table is now valid. Catch ResourceInUseException in createJournalTable() and just wait for ACTIVE.
Suggested hardening
private def createJournalTable(): Unit = {
val createRequest = CreateTableRequest.builder()
.tableName(journalTableFullName)
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName(JournalDynamoDB.ATTR_JOB_NAME)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(JournalDynamoDB.ATTR_FINISHED_AT)
.attributeType(ScalarAttributeType.N)
.build()
)
.keySchema(
KeySchemaElement.builder()
.attributeName(JournalDynamoDB.ATTR_JOB_NAME)
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName(JournalDynamoDB.ATTR_FINISHED_AT)
.keyType(KeyType.RANGE)
.build()
)
.billingMode(BillingMode.PAY_PER_REQUEST)
.build()
- dynamoDbClient.createTable(createRequest)
+ try {
+ dynamoDbClient.createTable(createRequest)
+ } catch {
+ case _: ResourceInUseException =>
+ log.info(s"Journal table '$journalTableFullName' was created concurrently")
+ }
waitForTableActive(journalTableFullName, dynamoDbClient)
log.info(s"Journal table '$journalTableFullName' created successfully")
}Also applies to: 204-231
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
around lines 191 - 195, The TOCTOU between describeTable() and
createJournalTable() causes failures when two processes race to create the
table; update createJournalTable() (and any other create-table call sites around
JournalDynamoDB.describeTable/createJournalTable) to catch
ResourceInUseException and, instead of failing, poll DescribeTable until the
table status is ACTIVE (with backoff/retries and timeout) before returning;
leave the existing ResourceNotFoundException handling in describeTable() but
ensure createJournalTable() swallows ResourceInUseException and waits for ACTIVE
so concurrent bootstraps succeed.
| private def createMetadataTableIfNotExists(): Unit = { | ||
| try { | ||
| val describeRequest = DescribeTableRequest.builder() | ||
| .tableName(metadataTableFullName) | ||
| .build() | ||
|
|
||
| dynamoDbClient.describeTable(describeRequest) | ||
| log.info(s"Metadata table '$metadataTableFullName' already exists") | ||
| } catch { | ||
| case _: ResourceNotFoundException => | ||
| log.info(s"Creating metadata table '$metadataTableFullName'") | ||
| createMetadataTable() | ||
| case NonFatal(ex) => | ||
| log.error(s"Error checking if metadata table exists", ex) | ||
| throw ex | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Creates the metadata table in DynamoDB. | ||
| */ | ||
| private def createMetadataTable(): Unit = { | ||
| val createRequest = CreateTableRequest.builder() | ||
| .tableName(metadataTableFullName) | ||
| .attributeDefinitions( | ||
| AttributeDefinition.builder() | ||
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | ||
| .attributeType(ScalarAttributeType.S) | ||
| .build(), | ||
| AttributeDefinition.builder() | ||
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | ||
| .attributeType(ScalarAttributeType.S) | ||
| .build() | ||
| ) | ||
| .keySchema( | ||
| KeySchemaElement.builder() | ||
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | ||
| .keyType(KeyType.HASH) | ||
| .build(), | ||
| KeySchemaElement.builder() | ||
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | ||
| .keyType(KeyType.RANGE) | ||
| .build() | ||
| ) | ||
| .billingMode(BillingMode.PAY_PER_REQUEST) | ||
| .build() | ||
|
|
||
| dynamoDbClient.createTable(createRequest) | ||
| waitForTableActive(metadataTableFullName, dynamoDbClient) | ||
| log.info(s"Metadata table '$metadataTableFullName' created successfully") |
There was a problem hiding this comment.
Wait for ACTIVE even when the table already exists.
The happy path at Lines 187-188 treats any successful DescribeTable as ready, but DynamoDB can return a table in CREATING. In a concurrent startup, this instance can proceed and issue CRUD calls before the table becomes usable. The same race can also make createTable() throw ResourceInUseException after another process wins the create.
Suggested fix
private def createMetadataTableIfNotExists(): Unit = {
try {
val describeRequest = DescribeTableRequest.builder()
.tableName(metadataTableFullName)
.build()
- dynamoDbClient.describeTable(describeRequest)
- log.info(s"Metadata table '$metadataTableFullName' already exists")
+ val table = dynamoDbClient.describeTable(describeRequest).table()
+ if (table.tableStatus() != TableStatus.ACTIVE) {
+ waitForTableActive(metadataTableFullName, dynamoDbClient)
+ }
+ log.info(s"Metadata table '$metadataTableFullName' already exists")
} catch {
case _: ResourceNotFoundException =>
log.info(s"Creating metadata table '$metadataTableFullName'")
createMetadataTable()
case NonFatal(ex) =>
@@
private def createMetadataTable(): Unit = {
val createRequest = CreateTableRequest.builder()
@@
- dynamoDbClient.createTable(createRequest)
+ try {
+ dynamoDbClient.createTable(createRequest)
+ } catch {
+ case _: ResourceInUseException =>
+ log.info(s"Metadata table '$metadataTableFullName' is already being created")
+ }
waitForTableActive(metadataTableFullName, dynamoDbClient)
log.info(s"Metadata table '$metadataTableFullName' created successfully")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private def createMetadataTableIfNotExists(): Unit = { | |
| try { | |
| val describeRequest = DescribeTableRequest.builder() | |
| .tableName(metadataTableFullName) | |
| .build() | |
| dynamoDbClient.describeTable(describeRequest) | |
| log.info(s"Metadata table '$metadataTableFullName' already exists") | |
| } catch { | |
| case _: ResourceNotFoundException => | |
| log.info(s"Creating metadata table '$metadataTableFullName'") | |
| createMetadataTable() | |
| case NonFatal(ex) => | |
| log.error(s"Error checking if metadata table exists", ex) | |
| throw ex | |
| } | |
| } | |
| /** | |
| * Creates the metadata table in DynamoDB. | |
| */ | |
| private def createMetadataTable(): Unit = { | |
| val createRequest = CreateTableRequest.builder() | |
| .tableName(metadataTableFullName) | |
| .attributeDefinitions( | |
| AttributeDefinition.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | |
| .attributeType(ScalarAttributeType.S) | |
| .build(), | |
| AttributeDefinition.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | |
| .attributeType(ScalarAttributeType.S) | |
| .build() | |
| ) | |
| .keySchema( | |
| KeySchemaElement.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | |
| .keyType(KeyType.HASH) | |
| .build(), | |
| KeySchemaElement.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | |
| .keyType(KeyType.RANGE) | |
| .build() | |
| ) | |
| .billingMode(BillingMode.PAY_PER_REQUEST) | |
| .build() | |
| dynamoDbClient.createTable(createRequest) | |
| waitForTableActive(metadataTableFullName, dynamoDbClient) | |
| log.info(s"Metadata table '$metadataTableFullName' created successfully") | |
| private def createMetadataTableIfNotExists(): Unit = { | |
| try { | |
| val describeRequest = DescribeTableRequest.builder() | |
| .tableName(metadataTableFullName) | |
| .build() | |
| val table = dynamoDbClient.describeTable(describeRequest).table() | |
| if (table.tableStatus() != TableStatus.ACTIVE) { | |
| waitForTableActive(metadataTableFullName, dynamoDbClient) | |
| } | |
| log.info(s"Metadata table '$metadataTableFullName' already exists") | |
| } catch { | |
| case _: ResourceNotFoundException => | |
| log.info(s"Creating metadata table '$metadataTableFullName'") | |
| createMetadataTable() | |
| case NonFatal(ex) => | |
| log.error(s"Error checking if metadata table exists", ex) | |
| throw ex | |
| } | |
| } | |
| /** | |
| * Creates the metadata table in DynamoDB. | |
| */ | |
| private def createMetadataTable(): Unit = { | |
| val createRequest = CreateTableRequest.builder() | |
| .tableName(metadataTableFullName) | |
| .attributeDefinitions( | |
| AttributeDefinition.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | |
| .attributeType(ScalarAttributeType.S) | |
| .build(), | |
| AttributeDefinition.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | |
| .attributeType(ScalarAttributeType.S) | |
| .build() | |
| ) | |
| .keySchema( | |
| KeySchemaElement.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY) | |
| .keyType(KeyType.HASH) | |
| .build(), | |
| KeySchemaElement.builder() | |
| .attributeName(MetadataManagerDynamoDb.ATTR_METADATA_KEY) | |
| .keyType(KeyType.RANGE) | |
| .build() | |
| ) | |
| .billingMode(BillingMode.PAY_PER_REQUEST) | |
| .build() | |
| try { | |
| dynamoDbClient.createTable(createRequest) | |
| } catch { | |
| case _: ResourceInUseException => | |
| log.info(s"Metadata table '$metadataTableFullName' is already being created") | |
| } | |
| waitForTableActive(metadataTableFullName, dynamoDbClient) | |
| log.info(s"Metadata table '$metadataTableFullName' created successfully") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 181 - 230, The code assumes a successful DescribeTable means the
table is usable, but DynamoDB may return CREATING; update
createMetadataTableIfNotExists to call describeTable and inspect the table
status (from the DescribeTableResponse) and if it's not ACTIVE call
waitForTableActive(metadataTableFullName, dynamoDbClient) before proceeding;
also update createMetadataTable() to catch ResourceInUseException (another
process created the table) and in that catch call
waitForTableActive(metadataTableFullName, dynamoDbClient) instead of failing so
concurrent startups wait for ACTIVE state. Ensure you reference the methods
createMetadataTableIfNotExists, createMetadataTable, and waitForTableActive in
your changes.
| def build(): MetadataManagerDynamoDb = { | ||
| if (region.isEmpty) { | ||
| throw new IllegalArgumentException("Region must be provided") | ||
| } | ||
|
|
||
| val clientBuilder = DynamoDbClient.builder() | ||
| .region(Region.of(region.get)) | ||
|
|
||
| credentialsProvider.foreach(clientBuilder.credentialsProvider) | ||
| endpoint.foreach { ep => | ||
| clientBuilder.endpointOverride(URI.create(ep)) | ||
| } | ||
|
|
||
| val client = clientBuilder.build() | ||
|
|
||
| new MetadataManagerDynamoDb( | ||
| dynamoDbClient = client, | ||
| tableArn = tableArn, | ||
| tablePrefix = tablePrefix | ||
| ) |
There was a problem hiding this comment.
Close the client if constructor-side initialization fails.
Line 311 allocates the DynamoDbClient, and the constructor immediately runs DescribeTable/CreateTable. If that throws, build() exits without closing the client, leaking resources on the failure path.
Suggested fix
val client = clientBuilder.build()
- new MetadataManagerDynamoDb(
- dynamoDbClient = client,
- tableArn = tableArn,
- tablePrefix = tablePrefix
- )
+ try {
+ new MetadataManagerDynamoDb(
+ dynamoDbClient = client,
+ tableArn = tableArn,
+ tablePrefix = tablePrefix
+ )
+ } catch {
+ case NonFatal(ex) =>
+ client.close()
+ throw ex
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def build(): MetadataManagerDynamoDb = { | |
| if (region.isEmpty) { | |
| throw new IllegalArgumentException("Region must be provided") | |
| } | |
| val clientBuilder = DynamoDbClient.builder() | |
| .region(Region.of(region.get)) | |
| credentialsProvider.foreach(clientBuilder.credentialsProvider) | |
| endpoint.foreach { ep => | |
| clientBuilder.endpointOverride(URI.create(ep)) | |
| } | |
| val client = clientBuilder.build() | |
| new MetadataManagerDynamoDb( | |
| dynamoDbClient = client, | |
| tableArn = tableArn, | |
| tablePrefix = tablePrefix | |
| ) | |
| def build(): MetadataManagerDynamoDb = { | |
| if (region.isEmpty) { | |
| throw new IllegalArgumentException("Region must be provided") | |
| } | |
| val clientBuilder = DynamoDbClient.builder() | |
| .region(Region.of(region.get)) | |
| credentialsProvider.foreach(clientBuilder.credentialsProvider) | |
| endpoint.foreach { ep => | |
| clientBuilder.endpointOverride(URI.create(ep)) | |
| } | |
| val client = clientBuilder.build() | |
| try { | |
| new MetadataManagerDynamoDb( | |
| dynamoDbClient = client, | |
| tableArn = tableArn, | |
| tablePrefix = tablePrefix | |
| ) | |
| } catch { | |
| case NonFatal(ex) => | |
| client.close() | |
| throw ex | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 298 - 317, The builder currently creates a DynamoDbClient (client)
and then calls new MetadataManagerDynamoDb(...), but if the constructor (which
runs DescribeTable/CreateTable) throws, the client is leaked; modify build() to
ensure the DynamoDbClient is closed on failure: wrap the construction of
MetadataManagerDynamoDb in a try { new MetadataManagerDynamoDb(dynamoDbClient =
client, tableArn = tableArn, tablePrefix = tablePrefix) } catch { case e =>
client.close(); throw e } (or equivalent finally logic) so the client is closed
if the constructor throws while still returning the manager when successful.
Closes #181
Summary by CodeRabbit
New Features
Documentation
Chores
Tests