diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 45ce51cc..f2eead58 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -667,7 +668,8 @@ public Stream provideArguments(ExtensionContext context) { class CreateOrReplaceTests { @Test - @DisplayName("Should create new document and return true") + @DisplayName( + "Should create new document and return true. Cols not specified should be set of default NULL") void testCreateOrReplaceNewDocument() throws Exception { String docId = getRandomDocId(4); @@ -691,6 +693,10 @@ void testCreateOrReplaceNewDocument() throws Exception { assertEquals("New Upsert Item", rs.getString("item")); assertEquals(500, rs.getInt("price")); assertEquals(25, rs.getInt("quantity")); + // assert on some fields that they're set to null correctly + assertNull(rs.getObject("sales")); + assertNull(rs.getObject("categoryTags")); + assertNull(rs.getObject("date")); }); } @@ -714,7 +720,6 @@ void testCreateOrReplaceExistingDocument() throws Exception { ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); updatedNode.put("id", docId); updatedNode.put("item", "Updated Item"); - updatedNode.put("price", 999); updatedNode.put("quantity", 50); Document updatedDoc = new JSONDocument(updatedNode); @@ -727,7 +732,8 @@ void testCreateOrReplaceExistingDocument() throws Exception { rs -> { assertTrue(rs.next()); assertEquals("Updated Item", rs.getString("item")); - assertEquals(999, rs.getInt("price")); + // this should be the default since price is not present in the updated document + assertNull(rs.getObject("price")); assertEquals(50, rs.getInt("quantity")); }); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 0de58c79..246db7df 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -862,7 +862,14 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); parsed.add(quotedPkColumn, key.toString(), pkType, false); - String sql = buildUpsertSql(parsed.getColumns(), quotedPkColumn); + List docColumns = parsed.getColumns(); + List allColumns = + schemaRegistry.getSchema(tableName).values().stream() + .map(PostgresColumnMetadata::getName) + .map(PostgresUtils::wrapFieldNamesWithDoubleQuotes) + .collect(Collectors.toList()); + + String sql = buildCreateOrReplaceSql(allColumns, docColumns, quotedPkColumn); LOGGER.debug("Upsert SQL: {}", sql); return executeUpsert(sql, parsed); @@ -882,15 +889,16 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR * * * *

Generated SQL pattern: * *

{@code
-   * INSERT INTO table (col1, col2, pk_col)
+   * INSERT INTO table (col1, col2,, col3, pk_col)
    * VALUES (?, ?, ?)
-   * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
+   * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2, col3 = DEFAULT
    * RETURNING (xmax = 0) AS is_insert
    * }
* @@ -909,19 +917,30 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR *
  • Thus, {@code is_insert = true} means INSERT, {@code is_insert = false} means UPDATE * * - * @param columns List of quoted column names to include in the upsert (including PK) + * @param allTableColumns all cols present in the table + * @param docColumns cols present in the document * @param pkColumn The quoted primary key column name used for conflict detection * @return The complete upsert SQL statement with placeholders for values */ - private String buildUpsertSql(List columns, String pkColumn) { - String columnList = String.join(", ", columns); - String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); - - // Build SET clause for non-PK columns: col = EXCLUDED.col + private String buildCreateOrReplaceSql( + List allTableColumns, List docColumns, String pkColumn) { + String columnList = String.join(", ", docColumns); + String placeholders = + String.join(", ", docColumns.stream().map(c -> "?").toArray(String[]::new)); + Set docColumnsSet = new HashSet<>(docColumns); + + // Build SET clause for non-PK columns. String setClause = - columns.stream() + allTableColumns.stream() .filter(col -> !col.equals(pkColumn)) - .map(col -> col + " = EXCLUDED." + col) + .map( + col -> { + if (docColumnsSet.contains(col)) { + return col + " = EXCLUDED." + col; + } else { + return col + " = DEFAULT"; + } + }) .collect(Collectors.joining(", ")); return String.format(