Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -667,7 +668,8 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
class CreateOrReplaceTests {

@Test
@DisplayName("Should create new document and return true")
@DisplayName(
"Should create new document and return true. Cols not specified should be set of default NULL")
void testCreateOrReplaceNewDocument() throws Exception {

String docId = getRandomDocId(4);
Expand All @@ -691,6 +693,10 @@ void testCreateOrReplaceNewDocument() throws Exception {
assertEquals("New Upsert Item", rs.getString("item"));
assertEquals(500, rs.getInt("price"));
assertEquals(25, rs.getInt("quantity"));
// assert on some fields that they're set to null correctly
assertNull(rs.getObject("sales"));
assertNull(rs.getObject("categoryTags"));
assertNull(rs.getObject("date"));
});
}

Expand All @@ -714,7 +720,6 @@ void testCreateOrReplaceExistingDocument() throws Exception {
ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode();
updatedNode.put("id", docId);
updatedNode.put("item", "Updated Item");
updatedNode.put("price", 999);
updatedNode.put("quantity", 50);
Document updatedDoc = new JSONDocument(updatedNode);

Expand All @@ -727,7 +732,8 @@ void testCreateOrReplaceExistingDocument() throws Exception {
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
assertEquals(999, rs.getInt("price"));
// this should be the default since price is not present in the updated document
assertNull(rs.getObject("price"));
assertEquals(50, rs.getInt("quantity"));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,14 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
parsed.add(quotedPkColumn, key.toString(), pkType, false);

String sql = buildUpsertSql(parsed.getColumns(), quotedPkColumn);
List<String> docColumns = parsed.getColumns();
List<String> allColumns =
schemaRegistry.getSchema(tableName).values().stream()
.map(PostgresColumnMetadata::getName)
.map(PostgresUtils::wrapFieldNamesWithDoubleQuotes)
.collect(Collectors.toList());

String sql = buildCreateOrReplaceSql(allColumns, docColumns, quotedPkColumn);
LOGGER.debug("Upsert SQL: {}", sql);

return executeUpsert(sql, parsed);
Expand All @@ -882,15 +889,16 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
*
* <ul>
* <li>Inserts a new row if no conflict on the primary key
* <li>Updates all non-PK columns if a row with the same PK already exists
* <li>If the row with that PK already exists, it is replaced in entirety. Cols not present in
* the latest upsert are set to their default values (as defined in the schema)
* </ul>
*
* <p><b>Generated SQL pattern:</b>
*
* <pre>{@code
* INSERT INTO table (col1, col2, pk_col)
* INSERT INTO table (col1, col2,, col3, pk_col)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
* INSERT INTO table (col1, col2,, col3, pk_col)
* INSERT INTO table (col1, col2, col3, pk_col)

* VALUES (?, ?, ?)
* ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
* ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2, col3 = DEFAULT
* RETURNING (xmax = 0) AS is_insert
* }</pre>
*
Expand All @@ -909,19 +917,30 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
* <li>Thus, {@code is_insert = true} means INSERT, {@code is_insert = false} means UPDATE
* </ul>
*
* @param columns List of quoted column names to include in the upsert (including PK)
* @param allTableColumns all cols present in the table
* @param docColumns cols present in the document
* @param pkColumn The quoted primary key column name used for conflict detection
* @return The complete upsert SQL statement with placeholders for values
*/
private String buildUpsertSql(List<String> columns, String pkColumn) {
String columnList = String.join(", ", columns);
String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new));

// Build SET clause for non-PK columns: col = EXCLUDED.col
private String buildCreateOrReplaceSql(
List<String> allTableColumns, List<String> docColumns, String pkColumn) {
String columnList = String.join(", ", docColumns);
String placeholders =
String.join(", ", docColumns.stream().map(c -> "?").toArray(String[]::new));
Set<String> docColumnsSet = new HashSet<>(docColumns);

// Build SET clause for non-PK columns.
String setClause =
columns.stream()
allTableColumns.stream()
.filter(col -> !col.equals(pkColumn))
.map(col -> col + " = EXCLUDED." + col)
.map(
col -> {
if (docColumnsSet.contains(col)) {
return col + " = EXCLUDED." + col;
} else {
return col + " = DEFAULT";
}
})
.collect(Collectors.joining(", "));

return String.format(
Expand Down
Loading