From 81d2168925e73b51bca3accef152175e1c2f6507 Mon Sep 17 00:00:00 2001 From: ColdL Date: Thu, 29 Jan 2026 10:26:08 +0800 Subject: [PATCH 1/2] add vector type definition, add APIs in DataGetters/DataTypeVisitor, add basic impls --- .../apache/paimon/types/DataTypeChecks.java | 5 + .../paimon/types/DataTypeDefaultVisitor.java | 5 + .../paimon/types/DataTypeJsonParser.java | 18 + .../org/apache/paimon/types/DataTypeRoot.java | 2 + .../apache/paimon/types/DataTypeVisitor.java | 2 + .../org/apache/paimon/types/DataTypes.java | 9 + .../apache/paimon/types/ReassignFieldId.java | 8 + .../org/apache/paimon/types/VectorType.java | 180 ++++++++ .../arrow/ArrowFieldTypeConversion.java | 7 + .../Arrow2PaimonVectorConverter.java | 6 + .../ArrowFieldWriterFactoryVisitor.java | 6 + .../apache/paimon/codegen/GenerateUtils.scala | 18 +- .../paimon/codegen/ScalarOperatorGens.scala | 11 +- .../codegen/EqualiserCodeGeneratorTest.java | 11 + .../org/apache/paimon/PartitionSettedRow.java | 8 + .../apache/paimon/casting/CastedArray.java | 6 + .../org/apache/paimon/casting/CastedRow.java | 6 + .../apache/paimon/casting/CastedVector.java | 113 +++++ .../paimon/casting/DefaultValueRow.java | 9 + .../paimon/casting/FallbackMappingRow.java | 9 + .../paimon/data/AbstractBinaryWriter.java | 36 ++ .../org/apache/paimon/data/BinaryArray.java | 5 + .../org/apache/paimon/data/BinaryRow.java | 6 + .../org/apache/paimon/data/BinaryVector.java | 393 ++++++++++++++++++ .../org/apache/paimon/data/BinaryWriter.java | 14 + .../org/apache/paimon/data/DataGetters.java | 2 + .../org/apache/paimon/data/GenericArray.java | 5 + .../org/apache/paimon/data/GenericRow.java | 5 + .../org/apache/paimon/data/InternalRow.java | 3 + .../apache/paimon/data/InternalVector.java | 90 ++++ .../org/apache/paimon/data/JoinedRow.java | 9 + .../apache/paimon/data/LazyGenericRow.java | 5 + .../org/apache/paimon/data/NestedRow.java | 5 + .../paimon/data/columnar/ColumnarArray.java | 6 + .../paimon/data/columnar/ColumnarRow.java | 6 + .../data/columnar/RowToColumnConverter.java | 6 + .../data/columnar/VectorizedColumnBatch.java | 5 + .../paimon/data/safe/SafeBinaryArray.java | 6 + .../paimon/data/safe/SafeBinaryRow.java | 6 + .../data/serializer/InternalSerializers.java | 5 + .../serializer/InternalVectorSerializer.java | 113 +++++ .../fileindex/bitmap/BitmapTypeVisitor.java | 6 + .../fileindex/bloomfilter/FastHash.java | 6 + .../paimon/memory/MemorySegmentUtils.java | 20 + .../paimon/reader/DataEvolutionArray.java | 6 + .../paimon/reader/DataEvolutionRow.java | 6 + .../paimon/sort/hilbert/HilbertIndexer.java | 6 + .../apache/paimon/sort/zorder/ZIndexer.java | 6 + .../types/InternalRowToSizeVisitor.java | 23 + .../apache/paimon/utils/InternalRowUtils.java | 2 + .../apache/paimon/utils/KeyProjectedRow.java | 6 + .../apache/paimon/utils/ProjectedArray.java | 8 +- .../org/apache/paimon/utils/ProjectedRow.java | 6 + .../apache/paimon/utils/ProjectedVector.java | 128 ++++++ .../apache/paimon/utils/TypeCheckUtils.java | 7 + .../org/apache/paimon/utils/TypeUtils.java | 19 + .../paimon/utils/VectorMappingUtils.java | 6 + .../org/apache/paimon/data/BinaryRowTest.java | 34 ++ .../apache/paimon/data/BinaryVectorTest.java | 119 ++++++ .../InternalVectorSerializerTest.java | 92 ++++ .../datagen/RandomGeneratorVisitor.java | 6 + .../paimon/types/DataTypeChecksTest.java | 7 + .../apache/paimon/utils/TypeUtilsTest.java | 9 + .../paimon/stats/SimpleStatsEvolution.java | 6 + .../org/apache/paimon/utils/OffsetRow.java | 6 + .../org/apache/paimon/utils/PartialRow.java | 6 + .../paimon/append/VectorTypeTableTest.java | 97 +++++ .../paimon/codegen/CodeGenUtilsTest.java | 19 + .../paimon/schema/DataTypeJsonParserTest.java | 10 + .../paimon/flink/DataTypeToLogicalType.java | 6 + .../apache/paimon/flink/FlinkRowWrapper.java | 11 + .../paimon/format/json/JsonFileFormat.java | 1 + .../paimon/format/json/JsonFileReader.java | 11 + .../paimon/format/json/JsonFormatWriter.java | 13 + .../format/orc/writer/FieldWriterFactory.java | 6 + .../reader/ParquetVectorUpdaterFactory.java | 6 + .../filter2/predicate/ParquetFilters.java | 6 + .../format/json/JsonFileFormatTest.java | 16 + .../hive/objectinspector/HivePaimonArray.java | 6 + .../paimon/format/lance/LanceFileFormat.java | 6 + .../paimon/spark/SparkInternalRowWrapper.java | 11 + .../org/apache/paimon/spark/SparkRow.java | 11 + 82 files changed, 1944 insertions(+), 8 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/types/VectorType.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/ProjectedVector.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/BinaryVectorTest.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalVectorSerializerTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/append/VectorTypeTableTest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java index 20eed4d03842..5f11c2e95d7b 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeChecks.java @@ -240,6 +240,11 @@ public List visit(ArrayType arrayType) { return Collections.singletonList(arrayType.getElementType()); } + @Override + public List visit(VectorType vectorType) { + return Collections.singletonList(vectorType.getElementType()); + } + @Override public List visit(MultisetType multisetType) { return Collections.singletonList(multisetType.getElementType()); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java index b3dc8a3cd994..af680ede62e2 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java @@ -124,6 +124,11 @@ public R visit(ArrayType arrayType) { return defaultMethod(arrayType); } + @Override + public R visit(VectorType vectorType) { + return defaultMethod(vectorType); + } + @Override public R visit(MultisetType multisetType) { return defaultMethod(multisetType); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 13637265d45f..4079dd8c47c0 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -78,6 +78,10 @@ public static DataType parseDataType(JsonNode json, AtomicInteger fieldId) { if (typeString.startsWith("ARRAY")) { DataType element = parseDataType(json.get("element"), fieldId); return new ArrayType(!typeString.contains("NOT NULL"), element); + } else if (typeString.startsWith("VECTOR")) { + DataType element = parseDataType(json.get("element"), fieldId); + int length = json.get("length").asInt(); + return new VectorType(!typeString.contains("NOT NULL"), length, element); } else if (typeString.startsWith("MULTISET")) { DataType element = parseDataType(json.get("element"), fieldId); return new MultisetType(!typeString.contains("NOT NULL"), element); @@ -318,6 +322,7 @@ private enum Keyword { SECOND, TO, ARRAY, + VECTOR, MULTISET, MAP, ROW, @@ -544,6 +549,8 @@ private DataType parseTypeByKeyword() { return new VariantType(); case BLOB: return new BlobType(); + case VECTOR: + return parseVectorType(); default: throw parsingError("Unsupported type: " + token().value); } @@ -665,5 +672,16 @@ private int parseOptionalPrecision(int defaultPrecision) { } return precision; } + + private DataType parseVectorType() { + // VECTOR + nextToken(TokenType.BEGIN_SUBTYPE); + DataType elementType = parseTypeWithNullability(); + nextToken(TokenType.LIST_SEPARATOR); + nextToken(TokenType.LITERAL_INT); + int length = tokenAsInt(); + nextToken(TokenType.END_SUBTYPE); + return DataTypes.VECTOR(length, elementType); + } } } diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java index 1b339765986c..f55da9c4706f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java @@ -106,6 +106,8 @@ public enum DataTypeRoot { ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), + VECTOR(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), + MULTISET(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), MAP(DataTypeFamily.CONSTRUCTED, DataTypeFamily.EXTENSION), diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java index cdeb4204b17c..6e377309f237 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java @@ -68,6 +68,8 @@ public interface DataTypeVisitor { R visit(ArrayType arrayType); + R visit(VectorType vectorType); + R visit(MultisetType multisetType); R visit(MapType mapType); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java index 33a5c9e5e442..39b180651ef5 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java @@ -59,6 +59,10 @@ public static ArrayType ARRAY(DataType element) { return new ArrayType(element); } + public static VectorType VECTOR(int length, DataType element) { + return new VectorType(length, element); + } + public static CharType CHAR(int length) { return new CharType(length); } @@ -221,6 +225,11 @@ public OptionalInt visit(VarBinaryType varBinaryType) { return OptionalInt.of(varBinaryType.getLength()); } + @Override + public OptionalInt visit(VectorType vectorType) { + return OptionalInt.of(vectorType.getLength()); + } + @Override protected OptionalInt defaultMethod(DataType dataType) { return OptionalInt.empty(); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java index 2aacfeaf889a..85f104d99320 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java @@ -38,6 +38,14 @@ public DataType visit(ArrayType arrayType) { return new ArrayType(arrayType.isNullable(), arrayType.getElementType().accept(this)); } + @Override + public DataType visit(VectorType vectorType) { + return new VectorType( + vectorType.isNullable(), + vectorType.getLength(), + vectorType.getElementType().accept(this)); + } + @Override public DataType visit(MultisetType multisetType) { return new MultisetType( diff --git a/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java new file mode 100644 index 000000000000..a98bd6fa30d5 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.types; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.utils.Preconditions; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.util.Objects; +import java.util.Set; + +/** + * Data type of fixed-size vector type. The elements are densely stored. + * + * @since 2.0.0 + */ +@Public +public class VectorType extends DataType { + + private static final long serialVersionUID = 1L; + + public static final int MIN_LENGTH = 1; + + public static final int MAX_LENGTH = Integer.MAX_VALUE; + + public static final String FORMAT = "VECTOR<%s, %d>"; + + private final DataType elementType; + + private final int length; + + public VectorType(boolean isNullable, int length, DataType elementType) { + super(isNullable, DataTypeRoot.VECTOR); + // TODO: should we support nullable for vector type? + // Preconditions.checkArgument(!isNullable, "Nullable is not supported for VectorType."); + this.elementType = + Preconditions.checkNotNull(elementType, "Element type must not be null."); + Preconditions.checkArgument( + isValidElementType(elementType), "Invalid element type for vector: " + elementType); + if (length < MIN_LENGTH) { + throw new IllegalArgumentException( + String.format( + "Vector length must be between %d and %d (both inclusive).", + MIN_LENGTH, MAX_LENGTH)); + } + this.length = length; + } + + public VectorType(int length, DataType elementType) { + this(false, length, elementType); // For vector type we prefer NOT NULL + } + + public int getLength() { + return length; + } + + public DataType getElementType() { + return elementType; + } + + public static boolean isValidElementType(DataType elementType) { + switch (elementType.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + return true; + default: + return false; + } + } + + @Override + public int defaultSize() { + return elementType.defaultSize() * length; + } + + @Override + public DataType copy(boolean isNullable) { + return new VectorType(isNullable, length, elementType.copy()); + } + + @Override + public String asSQLString() { + return withNullability(FORMAT, elementType.asSQLString(), length); + } + + @Override + public void serializeJson(JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField("type", isNullable() ? "VECTOR" : "VECTOR NOT NULL"); + generator.writeFieldName("element"); + elementType.serializeJson(generator); + generator.writeFieldName("length"); + generator.writeNumber(length); + generator.writeEndObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + VectorType vectorType = (VectorType) o; + return elementType.equals(vectorType.elementType) && length == vectorType.length; + } + + @Override + public boolean equalsIgnoreFieldId(DataType o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + VectorType vectorType = (VectorType) o; + return elementType.equalsIgnoreFieldId(vectorType.elementType) + && length == vectorType.length; + } + + @Override + public boolean isPrunedFrom(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + VectorType vectorType = (VectorType) o; + return elementType.isPrunedFrom(vectorType.elementType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), elementType, length); + } + + @Override + public R accept(DataTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public void collectFieldIds(Set fieldIds) { + elementType.collectFieldIds(fieldIds); + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index e6bc1281a74d..33defc8f9a01 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -42,6 +42,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; @@ -179,6 +180,12 @@ public FieldType visit(ArrayType arrayType) { return new FieldType(arrayType.isNullable(), Types.MinorType.LIST.getType(), null); } + @Override + public FieldType visit(VectorType vectorType) { + ArrowType arrowType = new ArrowType.FixedSizeList(vectorType.getLength()); + return new FieldType(vectorType.isNullable(), arrowType, null); + } + @Override public FieldType visit(MultisetType multisetType) { throw new UnsupportedOperationException("Doesn't support MultisetType."); diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java index 4afd976c1747..c78aa324ae27 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java @@ -66,6 +66,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -482,6 +483,11 @@ public ColumnVector getColumnVector() { }; } + @Override + public Arrow2PaimonVectorConverter visit(VectorType vectorType) { + throw new UnsupportedOperationException("Doesn't support VectorType."); + } + @Override public Arrow2PaimonVectorConverter visit(MultisetType multisetType) { throw new UnsupportedOperationException("Doesn't support MultisetType."); diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java index a20e6fc4814c..d07762106ebc 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.ListVector; @@ -165,6 +166,11 @@ public ArrowFieldWriterFactory visit(ArrayType arrayType) { isNullable); } + @Override + public ArrowFieldWriterFactory visit(VectorType vectorType) { + throw new UnsupportedOperationException("Doesn't support VectorType."); + } + @Override public ArrowFieldWriterFactory visit(MultisetType multisetType) { throw new UnsupportedOperationException("Doesn't support MultisetType."); diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala index 6897e5424857..967d58ad30db 100644 --- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala +++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala @@ -127,10 +127,13 @@ object GenerateUtils { s"$sortUtil.compareBinary($leftTerm, $rightTerm)" case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DATE | TIME_WITHOUT_TIME_ZONE => s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)" - case ARRAY => - val at = t.asInstanceOf[ArrayType] + case ARRAY | VECTOR => + val elementType = t.getTypeRoot match { + case ARRAY => t.asInstanceOf[ArrayType].getElementType + case VECTOR => t.asInstanceOf[VectorType].getElementType + } val compareFunc = newName("compareArray") - val compareCode = generateArrayCompare(ctx, nullsIsLast = false, at, "a", "b") + val compareCode = generateArrayCompare(ctx, nullsIsLast = false, elementType, "a", "b") val funcCode: String = s""" public int $compareFunc($ARRAY_DATA a, $ARRAY_DATA b) { @@ -188,11 +191,10 @@ object GenerateUtils { def generateArrayCompare( ctx: CodeGeneratorContext, nullsIsLast: Boolean, - arrayType: ArrayType, + elementType: DataType, leftTerm: String, rightTerm: String): String = { val nullIsLastRet = if (nullsIsLast) 1 else -1 - val elementType = arrayType.getElementType val fieldA = newName("fieldA") val isNullA = newName("isNullA") val lengthA = newName("lengthA") @@ -379,6 +381,7 @@ object GenerateUtils { case DOUBLE => className[JDouble] case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[Timestamp] case ARRAY => className[InternalArray] + case VECTOR => className[InternalVector] case MULTISET | MAP => className[InternalMap] case ROW => className[InternalRow] case VARIANT => className[Variant] @@ -417,6 +420,8 @@ object GenerateUtils { s"$rowTerm.getTimestamp($indexTerm, ${getPrecision(t)})" case ARRAY => s"$rowTerm.getArray($indexTerm)" + case VECTOR => + s"$rowTerm.getVector($indexTerm)" case MULTISET | MAP => s"$rowTerm.getMap($indexTerm)" case ROW => @@ -606,6 +611,9 @@ object GenerateUtils { case ARRAY => val ser = addSerializer(t) s"$writerTerm.writeArray($indexTerm, $fieldValTerm, $ser)" + case VECTOR => + val ser = addSerializer(t) + s"$writerTerm.writeVector($indexTerm, $fieldValTerm, $ser)" case MULTISET | MAP => val ser = addSerializer(t) s"$writerTerm.writeMap($indexTerm, $fieldValTerm, $ser)" diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala index 5dfa4bff6835..46217b1c4cf1 100644 --- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala +++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala @@ -60,7 +60,13 @@ object ScalarOperatorGens { } // array types else if (isArray(left.resultType) && canEqual) { - generateArrayComparison(ctx, left, right, resultType) + val elementType = left.resultType.asInstanceOf[ArrayType].getElementType + generateArrayComparison(ctx, left, right, elementType, resultType) + } + // vector type + else if (isVector(left.resultType) && canEqual) { + val elementType = left.resultType.asInstanceOf[VectorType].getElementType + generateArrayComparison(ctx, left, right, elementType, resultType) } // map types else if (isMap(left.resultType) && canEqual) { @@ -196,6 +202,7 @@ object ScalarOperatorGens { ctx: CodeGeneratorContext, left: GeneratedExpression, right: GeneratedExpression, + elementType: DataType, resultType: DataType): GeneratedExpression = { generateCallWithStmtIfArgsNotNull(ctx, resultType, Seq(left, right)) { args => @@ -204,7 +211,6 @@ object ScalarOperatorGens { val resultTerm = newName("compareResult") - val elementType = left.resultType.asInstanceOf[ArrayType].getElementType val elementCls = primitiveTypeTermForType(elementType) val elementDefault = primitiveDefaultValue(elementType) @@ -225,6 +231,7 @@ object ScalarOperatorGens { rightElementExpr, new BooleanType(elementType.isNullable)) + // TODO: With BinaryVector available, we can use it here. val stmt = s""" |boolean $resultTerm; diff --git a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java index e662449858e7..7e977291e81f 100644 --- a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java +++ b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.data.serializer.InternalArraySerializer; import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalVectorSerializer; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.types.DataType; @@ -133,6 +134,16 @@ public class EqualiserCodeGeneratorTest { castFromString("[1,2,3]", DataTypes.ARRAY(new VarCharType())), castFromString("[4,5,6]", DataTypes.ARRAY(new VarCharType()))), new InternalArraySerializer(DataTypes.VARCHAR(1)))); + TEST_DATA.put( + DataTypeRoot.VECTOR, + new GeneratedData( + DataTypes.VECTOR(3, DataTypes.FLOAT()), + Pair.of( + castFromString( + "[1.1,2.2,3.3]", DataTypes.VECTOR(3, DataTypes.FLOAT())), + castFromString( + "[4.4,5.5,6.6]", DataTypes.VECTOR(3, DataTypes.FLOAT()))), + new InternalVectorSerializer(DataTypes.FLOAT(), 3))); TEST_DATA.put( DataTypeRoot.MULTISET, new GeneratedData( diff --git a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java index 01b3ae48a2da..a62464d1981f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; @@ -176,6 +177,13 @@ public InternalArray getArray(int pos) { : row.getArray(partitionInfo.getRealIndex(pos)); } + @Override + public InternalVector getVector(int pos) { + return partitionInfo.inPartitionRow(pos) + ? partition.getVector(partitionInfo.getRealIndex(pos)) + : row.getVector(partitionInfo.getRealIndex(pos)); + } + @Override public InternalMap getMap(int pos) { return partitionInfo.inPartitionRow(pos) diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java index 821be67e2a55..4e95c9db8dcf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; @@ -201,6 +202,11 @@ public InternalArray getArray(int pos) { return castElementGetter.getElementOrNull(array, pos); } + @Override + public InternalVector getVector(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + @Override public InternalMap getMap(int pos) { return castElementGetter.getElementOrNull(array, pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java index a60ea635cf45..76a1366d4784 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -148,6 +149,11 @@ public InternalArray getArray(int pos) { return castMapping[pos].getFieldOrNull(row); } + @Override + public InternalVector getVector(int pos) { + return castMapping[pos].getFieldOrNull(row); + } + @Override public InternalMap getMap(int pos) { return castMapping[pos].getFieldOrNull(row); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java new file mode 100644 index 000000000000..c7bf0303467a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedVector.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.casting; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; + +/** + * An implementation of {@link InternalVector} which provides a casted view of the underlying {@link + * InternalVector}. + * + *

It reads data from underlying {@link InternalVector} according to source logical type and + * casts it with specific {@link CastExecutor}. + */ +public class CastedVector extends CastedArray implements InternalVector { + + protected CastedVector(CastElementGetter castElementGetter) { + super(castElementGetter); + } + + /** + * Replaces the underlying {@link InternalVector} backing this {@link CastedVector}. + * + *

This method replaces the vector in place and does not return a new object. This is done + * for performance reasons. + */ + public static CastedVector from(CastElementGetter castElementGetter) { + return new CastedVector(castElementGetter); + } + + public CastedVector replaceVector(InternalVector vector) { + super.replaceArray(vector); + return this; + } + + @Override + public CastedArray replaceArray(InternalArray array) { + throw new IllegalArgumentException("CastedVector does not support replaceArray."); + } + + @Override + public BinaryString getString(int pos) { + throw new UnsupportedOperationException("CastedVector does not support String."); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + throw new UnsupportedOperationException("CastedVector does not support Decimal."); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + throw new UnsupportedOperationException("CastedVector does not support Timestamp."); + } + + @Override + public byte[] getBinary(int pos) { + throw new UnsupportedOperationException("CastedVector does not support Binary."); + } + + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException("CastedVector does not support Variant."); + } + + @Override + public Blob getBlob(int pos) { + throw new UnsupportedOperationException("CastedVector does not support Blob."); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException("CastedVector does not support nested Array."); + } + + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("CastedVector does not support nested VectorType."); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException("CastedVector does not support Map."); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException("CastedVector does not support nested Row."); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java index 25b7453861dd..555f065c7031 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataField; @@ -176,6 +177,14 @@ public InternalArray getArray(int pos) { return defaultValueRow.getArray(pos); } + @Override + public InternalVector getVector(int pos) { + if (!row.isNullAt(pos)) { + return row.getVector(pos); + } + return defaultValueRow.getVector(pos); + } + @Override public InternalMap getMap(int pos) { if (!row.isNullAt(pos)) { diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java index b4bf853230fc..b981d552876d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -174,6 +175,14 @@ public InternalArray getArray(int pos) { return main.getArray(pos); } + @Override + public InternalVector getVector(int pos) { + if (mappings[pos] != -1 && main.isNullAt(pos)) { + return fallbackRow.getVector(mappings[pos]); + } + return main.getVector(pos); + } + @Override public InternalMap getMap(int pos) { if (mappings[pos] != -1 && main.isNullAt(pos)) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java index 85d044594851..bb632d5b31ed 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.serializer.InternalArraySerializer; import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalVectorSerializer; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; @@ -91,6 +92,12 @@ public void writeArray(int pos, InternalArray input, InternalArraySerializer ser pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes()); } + @Override + public void writeVector(int pos, InternalVector input, InternalVectorSerializer serializer) { + BinaryVector binary = serializer.toBinaryVector(input); + writeVectorToVarLenPart(pos, binary); + } + @Override public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) { BinaryMap binary = serializer.toBinaryMap(input); @@ -270,6 +277,35 @@ private void writeBytesToVarLenPart(int pos, byte[] bytes, int offset, int len) cursor += roundedSize; } + private void writeVectorToVarLenPart(int pos, BinaryVector vector) { + // Memory layout: [numElements][segments] + final int numElementsWidth = 4; + final int size = vector.getSizeInBytes(); + + final int roundedSize = roundNumberOfBytesToNearestWord(size + numElementsWidth); + + // grow the global buffer before writing data. + ensureCapacity(roundedSize); + + zeroOutPaddingBytes(size + numElementsWidth); + + // write numElements value first + segment.putInt(cursor, vector.size()); + cursor += numElementsWidth; + + // then vector values + if (vector.getSegments().length == 1) { + vector.getSegments()[0].copyTo(vector.getOffset(), segment, cursor, size); + } else { + writeMultiSegmentsToVarLenPart(vector.getSegments(), vector.getOffset(), size); + } + + setOffsetAndSize(pos, cursor - numElementsWidth, size + numElementsWidth); + + // move the cursor forward. + cursor += (roundedSize - numElementsWidth); + } + /** Increases the capacity to ensure that it can hold at least the minimum capacity argument. */ private void grow(int minCapacity) { int oldCapacity = segment.size(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java index d8b649d4a64c..afac3f8d3d35 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java @@ -256,6 +256,11 @@ public InternalArray getArray(int pos) { return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos)); } + @Override + public InternalVector getVector(int pos) { + throw new IllegalArgumentException("Unsupported type: VectorType"); + } + @Override public InternalMap getMap(int pos) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java index 70d2ec7a01a8..ff5406f7b326 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java @@ -355,6 +355,12 @@ public InternalArray getArray(int pos) { return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos)); } + @Override + public InternalVector getVector(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.readVectorData(segments, offset, getLong(pos)); + } + @Override public InternalMap getMap(int pos) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java new file mode 100644 index 000000000000..7ef07a85a45c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryVector.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.types.DataType; + +import java.util.Objects; + +import static org.apache.paimon.memory.MemorySegment.UNSAFE; + +/** + * A binary implementation of {@link InternalVector} which is backed by {@link MemorySegment}s. + * + *

Compared to {@link BinaryArray}, {@link BinaryVector} stores only primitive types, and + * nullable is not supported yet. Thus, the memory layout is very simple. + * + *

The binary layout of {@link BinaryVector}: + * + *

+ * [values]
+ * 
+ * + * @since 2.0.0 + */ +@Public +public final class BinaryVector extends BinarySection implements InternalVector, DataSetters { + + private static final long serialVersionUID = 1L; + + /** Offset for Arrays. */ + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); + private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); + private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); + private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); + private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class); + + public static int getPrimitiveElementSize(DataType type) { + // ordered by type root definition + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + return 1; + case SMALLINT: + return 2; + case INTEGER: + case FLOAT: + return 4; + case BIGINT: + case DOUBLE: + return 8; + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + + // The number of elements in this vector + private final int size; + + public BinaryVector(int size) { + this.size = size; + } + + private void assertIndexIsValid(int ordinal) { + assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0"; + assert ordinal < size : "ordinal (" + ordinal + ") should < " + size; + } + + private int getElementOffset(int ordinal, int elementSize) { + return offset + ordinal * elementSize; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isNullAt(int pos) { + assertIndexIsValid(pos); + return false; + } + + @Override + public void setNullAt(int pos) { + assertIndexIsValid(pos); + throw new UnsupportedOperationException("Nullable is not supported yet for VectorType."); + } + + @Override + public long getLong(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getLong(segments, getElementOffset(pos, 8)); + } + + @Override + public void setLong(int pos, long value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setLong(segments, getElementOffset(pos, 8), value); + } + + @Override + public int getInt(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getInt(segments, getElementOffset(pos, 4)); + } + + @Override + public void setInt(int pos, int value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setInt(segments, getElementOffset(pos, 4), value); + } + + @Override + public BinaryString getString(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support String."); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + throw new UnsupportedOperationException("BinaryVector does not support Decimal."); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + throw new UnsupportedOperationException("BinaryVector does not support Timestamp."); + } + + @Override + public byte[] getBinary(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support Binary."); + } + + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support Variant."); + } + + @Override + public Blob getBlob(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support Blob."); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support nested Array."); + } + + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support nested Vector."); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException("BinaryVector does not support Map."); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException("BinaryVector does not support nested Row."); + } + + @Override + public boolean getBoolean(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getBoolean(segments, getElementOffset(pos, 1)); + } + + @Override + public void setBoolean(int pos, boolean value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), value); + } + + @Override + public byte getByte(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getByte(segments, getElementOffset(pos, 1)); + } + + @Override + public void setByte(int pos, byte value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setByte(segments, getElementOffset(pos, 1), value); + } + + @Override + public short getShort(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getShort(segments, getElementOffset(pos, 2)); + } + + @Override + public void setShort(int pos, short value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setShort(segments, getElementOffset(pos, 2), value); + } + + @Override + public float getFloat(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getFloat(segments, getElementOffset(pos, 4)); + } + + @Override + public void setFloat(int pos, float value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setFloat(segments, getElementOffset(pos, 4), value); + } + + @Override + public double getDouble(int pos) { + assertIndexIsValid(pos); + return MemorySegmentUtils.getDouble(segments, getElementOffset(pos, 8)); + } + + @Override + public void setDouble(int pos, double value) { + assertIndexIsValid(pos); + MemorySegmentUtils.setDouble(segments, getElementOffset(pos, 8), value); + } + + @Override + public void setDecimal(int pos, Decimal value, int precision) { + throw new UnsupportedOperationException("BinaryVector does not support Decimal."); + } + + @Override + public void setTimestamp(int pos, Timestamp value, int precision) { + throw new UnsupportedOperationException("BinaryVector does not support Timestamp."); + } + + @Override + public boolean[] toBooleanArray() { + boolean[] values = new boolean[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, BOOLEAN_ARRAY_OFFSET, size); + return values; + } + + @Override + public byte[] toByteArray() { + byte[] values = new byte[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, BYTE_ARRAY_BASE_OFFSET, size); + return values; + } + + @Override + public short[] toShortArray() { + short[] values = new short[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, SHORT_ARRAY_OFFSET, size * 2); + return values; + } + + @Override + public int[] toIntArray() { + int[] values = new int[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, INT_ARRAY_OFFSET, size * 4); + return values; + } + + @Override + public long[] toLongArray() { + long[] values = new long[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, LONG_ARRAY_OFFSET, size * 8); + return values; + } + + @Override + public float[] toFloatArray() { + float[] values = new float[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, FLOAT_ARRAY_OFFSET, size * 4); + return values; + } + + @Override + public double[] toDoubleArray() { + double[] values = new double[size]; + MemorySegmentUtils.copyToUnsafe(segments, offset, values, DOUBLE_ARRAY_OFFSET, size * 8); + return values; + } + + public BinaryVector copy() { + return copy(new BinaryVector(size)); + } + + public BinaryVector copy(BinaryVector reuse) { + byte[] bytes = MemorySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + @Override + public int hashCode() { + return Objects.hash(MemorySegmentUtils.hashByWords(segments, offset, sizeInBytes), size); + } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + public static BinaryVector fromPrimitiveArray(boolean[] arr) { + return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, arr.length, 1); + } + + public static BinaryVector fromPrimitiveArray(byte[] arr) { + return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, arr.length, 1); + } + + public static BinaryVector fromPrimitiveArray(short[] arr) { + return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2); + } + + public static BinaryVector fromPrimitiveArray(int[] arr) { + return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4); + } + + public static BinaryVector fromPrimitiveArray(long[] arr) { + return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 8); + } + + public static BinaryVector fromPrimitiveArray(float[] arr) { + return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 4); + } + + public static BinaryVector fromPrimitiveArray(double[] arr) { + return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 8); + } + + private static BinaryVector fromPrimitiveArray( + Object arr, int offset, int length, int elementSize) { + // must align by 8 bytes + final long valueRegionInBytes = (long) elementSize * length; + long totalSizeInLongs = (valueRegionInBytes + 7) / 8; + if (totalSizeInLongs > Integer.MAX_VALUE / 8) { + throw new UnsupportedOperationException( + "Cannot convert this vector to unsafe format as " + "it's too big."); + } + long totalSize = totalSizeInLongs * 8; + + final byte[] data = new byte[(int) totalSize]; + + UNSAFE.copyMemory(arr, offset, data, BYTE_ARRAY_BASE_OFFSET, valueRegionInBytes); + + BinaryVector result = new BinaryVector(length); + result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize); + return result; + } + + public static BinaryVector fromInternalArray(InternalArray array, DataType elementType) { + switch (elementType.getTypeRoot()) { + case BOOLEAN: + return BinaryVector.fromPrimitiveArray(array.toBooleanArray()); + case TINYINT: + return BinaryVector.fromPrimitiveArray(array.toByteArray()); + case SMALLINT: + return BinaryVector.fromPrimitiveArray(array.toShortArray()); + case INTEGER: + return BinaryVector.fromPrimitiveArray(array.toIntArray()); + case BIGINT: + return BinaryVector.fromPrimitiveArray(array.toLongArray()); + case FLOAT: + return BinaryVector.fromPrimitiveArray(array.toFloatArray()); + case DOUBLE: + return BinaryVector.fromPrimitiveArray(array.toDoubleArray()); + default: + throw new UnsupportedOperationException( + "Unsupported element type for vector " + elementType); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index 06b763a0eac1..2e0cd5701b71 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.data.serializer.InternalVectorSerializer; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; @@ -74,6 +75,8 @@ public interface BinaryWriter { void writeArray(int pos, InternalArray value, InternalArraySerializer serializer); + void writeVector(int pos, InternalVector value, InternalVectorSerializer serializer); + void writeMap(int pos, InternalMap value, InternalMapSerializer serializer); void writeRow(int pos, InternalRow value, InternalRowSerializer serializer); @@ -133,6 +136,9 @@ static void write( case ARRAY: writer.writeArray(pos, (InternalArray) o, (InternalArraySerializer) serializer); break; + case VECTOR: + writer.writeVector(pos, (InternalVector) o, (InternalVectorSerializer) serializer); + break; case MAP: case MULTISET: writer.writeMap(pos, (InternalMap) o, (InternalMapSerializer) serializer); @@ -210,6 +216,14 @@ static ValueSetter createValueSetter(DataType elementType, Serializer seriali pos, (InternalArray) value, (InternalArraySerializer) arraySerializer); + case VECTOR: + final Serializer vectorSerializer = + serializer == null ? InternalSerializers.create(elementType) : serializer; + return (writer, pos, value) -> + writer.writeVector( + pos, + (InternalVector) value, + (InternalVectorSerializer) vectorSerializer); case MULTISET: case MAP: final Serializer mapSerializer = diff --git a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java index d60ca316f7f2..1043b7e3ba4f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java @@ -84,6 +84,8 @@ public interface DataGetters { /** Returns the array value at the given position. */ InternalArray getArray(int pos); + InternalVector getVector(int pos); + /** Returns the map value at the given position. */ InternalMap getMap(int pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java index 0d0898ea4edc..8c1ba4e28ac9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java @@ -240,6 +240,11 @@ public InternalArray getArray(int pos) { return (InternalArray) getObject(pos); } + @Override + public InternalVector getVector(int pos) { + return (InternalVector) getObject(pos); + } + @Override public InternalMap getMap(int pos) { return (InternalMap) getObject(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java index 37be2386b44b..10aefbafdd07 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java @@ -202,6 +202,11 @@ public InternalArray getArray(int pos) { return (InternalArray) this.fields[pos]; } + @Override + public InternalVector getVector(int pos) { + return (InternalVector) this.fields[pos]; + } + @Override public InternalMap getMap(int pos) { return (InternalMap) this.fields[pos]; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java index 9ec838cd273b..3bbb85f49963 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java @@ -213,6 +213,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { case ARRAY: fieldGetter = row -> row.getArray(fieldPos); break; + case VECTOR: + fieldGetter = row -> row.getVector(fieldPos); + break; case MULTISET: case MAP: fieldGetter = row -> row.getMap(fieldPos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java new file mode 100644 index 000000000000..4b4cedeb0279 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalVector.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VectorType; + +/** + * Base interface of an internal data structure representing data of {@link VectorType}. + * + *

Note: All elements of this data structure must be internal data structures and must be of the + * same type. See {@link InternalRow} for more information about internal data structures. + * + * @see BinaryVector + * @since 2.0.0 + */ +@Public +public interface InternalVector extends InternalArray { + + // ------------------------------------------------------------------------------------------ + // Access Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an accessor for getting elements in an internal vector data structure at the given + * position. + * + * @param elementType the element type of the vector + */ + static ElementGetter createElementGetter(DataType elementType) { + final ElementGetter elementGetter; + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case BOOLEAN: + elementGetter = InternalArray::getBoolean; + break; + case TINYINT: + elementGetter = InternalArray::getByte; + break; + case SMALLINT: + elementGetter = InternalArray::getShort; + break; + case INTEGER: + elementGetter = InternalArray::getInt; + break; + case BIGINT: + elementGetter = InternalArray::getLong; + break; + case FLOAT: + elementGetter = InternalArray::getFloat; + break; + case DOUBLE: + elementGetter = InternalArray::getDouble; + break; + default: + String msg = + String.format( + "type %s not support in %s", + elementType.getTypeRoot().toString(), + InternalVector.class.getName()); + throw new IllegalArgumentException(msg); + } + if (!elementType.isNullable()) { + return elementGetter; + } + return (vector, pos) -> { + if (vector.isNullAt(pos)) { + return null; + } + return elementGetter.getElementOrNull(vector, pos); + }; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java index 62aa7358082b..fee5552f8c5c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java @@ -252,6 +252,15 @@ public InternalArray getArray(int pos) { } } + @Override + public InternalVector getVector(int pos) { + if (pos < row1.getFieldCount()) { + return row1.getVector(pos); + } else { + return row2.getVector(pos - row1.getFieldCount()); + } + } + @Override public InternalMap getMap(int pos) { if (pos < row1.getFieldCount()) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java index 680e3be0df4d..6d2e8b141f57 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java @@ -158,6 +158,11 @@ public InternalArray getArray(int pos) { return (InternalArray) getField(pos); } + @Override + public InternalVector getVector(int pos) { + return (InternalVector) getField(pos); + } + @Override public InternalMap getMap(int pos) { return (InternalMap) getField(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java index 708c2bc60ce6..afc4f0c47fb0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java @@ -306,6 +306,11 @@ public InternalArray getArray(int pos) { return MemorySegmentUtils.readArrayData(segments, offset, getLong(pos)); } + @Override + public InternalVector getVector(int pos) { + throw new IllegalArgumentException("Unsupported type: VectorType"); + } + @Override public InternalMap getMap(int pos) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java index e91fa26b4e56..ad04f647b977 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.Variant; @@ -143,6 +144,11 @@ public InternalArray getArray(int pos) { return ((ArrayColumnVector) data).getArray(offset + pos); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Unsupported type: VectorType"); + } + @Override public InternalMap getMap(int pos) { return ((MapColumnVector) data).getMap(offset + pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java index 468c44188a69..d7423608af45 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -163,6 +164,11 @@ public InternalArray getArray(int pos) { return vectorizedColumnBatch.getArray(rowId, pos); } + @Override + public InternalVector getVector(int pos) { + return vectorizedColumnBatch.getVector(rowId, pos); + } + @Override public InternalMap getMap(int pos) { return vectorizedColumnBatch.getMap(rowId, pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java index d2a378846c6c..de962ad86a39 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java @@ -62,6 +62,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import java.io.Serializable; import java.util.List; @@ -281,6 +282,11 @@ public TypeConverter visit(ArrayType arrayType) { }); } + @Override + public TypeConverter visit(VectorType vectorType) { + throw new UnsupportedOperationException(); + } + @Override public TypeConverter visit(MultisetType multisetType) { throw new UnsupportedOperationException(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java index 18eecd29ed59..297d0b0f9082 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.columnar.BytesColumnVector.Bytes; import org.apache.paimon.data.variant.GenericVariant; @@ -124,6 +125,10 @@ public InternalArray getArray(int rowId, int colId) { return ((ArrayColumnVector) columns[colId]).getArray(rowId); } + public InternalVector getVector(int rowId, int colId) { + throw new UnsupportedOperationException("Unsupported type: VectorType"); + } + public InternalRow getRow(int rowId, int colId) { return ((RowColumnVector) columns[colId]).getRow(rowId); } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java index c2ac7291c211..78d717ee1b0b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.BytesUtils; @@ -164,6 +165,11 @@ public InternalArray getArray(int pos) { throw new UnsupportedOperationException(); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalMap getMap(int pos) { throw new UnsupportedOperationException(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java index bdc864dfcb4f..2c285c30a6b4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.BytesUtils; @@ -186,6 +187,11 @@ private static InternalRow readNestedRow( return new SafeBinaryRow(numFields, bytes, offset + baseOffset); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalMap getMap(int pos) { throw new UnsupportedOperationException(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java index 38e9c4678e49..6669f347ff27 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import static org.apache.paimon.types.DataTypeChecks.getFieldTypes; import static org.apache.paimon.types.DataTypeChecks.getPrecision; @@ -75,6 +76,10 @@ private static Serializer createInternal(DataType type) { return new TimestampSerializer(getPrecision(type)); case ARRAY: return new InternalArraySerializer(((ArrayType) type).getElementType()); + case VECTOR: + VectorType vectorType = (VectorType) type; + return new InternalVectorSerializer( + vectorType.getElementType(), vectorType.getLength()); case MULTISET: return new InternalMapSerializer( ((MultisetType) type).getElementType(), new IntType(false)); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java new file mode 100644 index 000000000000..6ae5f23a9547 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalVectorSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.types.DataType; + +import java.io.IOException; + +/** Serializer for {@link InternalVector}. */ +public class InternalVectorSerializer implements Serializer { + private static final long serialVersionUID = 1L; + + private final DataType eleType; + private final Serializer eleSer; + private final int length; + + public InternalVectorSerializer(DataType eleType, int length) { + this(eleType, InternalSerializers.create(eleType), length); + } + + private InternalVectorSerializer(DataType eleType, Serializer eleSer, int length) { + this.eleType = eleType; + this.eleSer = eleSer; + this.length = length; + } + + @Override + public InternalVectorSerializer duplicate() { + return new InternalVectorSerializer(eleType, eleSer.duplicate(), length); + } + + @Override + public InternalVector copy(InternalVector from) { + if (from instanceof BinaryVector) { + return ((BinaryVector) from).copy(); + } else { + return toBinaryVector(from).copy(); + } + } + + @Override + public void serialize(InternalVector record, DataOutputView target) throws IOException { + if (record.size() != length) { + throw new IOException("Invalid size to serialize: " + record.size()); + } + BinaryVector binaryVector = toBinaryVector(record); + target.writeInt(binaryVector.getSizeInBytes()); + MemorySegmentUtils.copyToView( + binaryVector.getSegments(), + binaryVector.getOffset(), + binaryVector.getSizeInBytes(), + target); + } + + @Override + public InternalVector deserialize(DataInputView source) throws IOException { + int sizeInBytes = source.readInt(); + byte[] bytes = new byte[sizeInBytes]; + source.readFully(bytes); + BinaryVector vector = new BinaryVector(length); + vector.pointTo(MemorySegment.wrap(bytes), 0, bytes.length); + return vector; + } + + public BinaryVector toBinaryVector(InternalVector from) { + if (from instanceof BinaryVector) { + return (BinaryVector) from; + } else { + return BinaryVector.fromInternalArray(from, eleType); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + InternalVectorSerializer that = (InternalVectorSerializer) o; + + return eleType.equals(that.eleType) && length == that.length; + } + + @Override + public int hashCode() { + return eleType.hashCode(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java index fbcd06d8dbda..4183bfbb2bf8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; /** Simplified visitor for bitmap index. */ public abstract class BitmapTypeVisitor implements DataTypeVisitor { @@ -146,6 +147,11 @@ public final R visit(ArrayType arrayType) { throw new UnsupportedOperationException("Does not support type array"); } + @Override + public final R visit(VectorType vectorType) { + throw new UnsupportedOperationException("Does not support type vector"); + } + @Override public final R visit(MultisetType multisetType) { throw new UnsupportedOperationException("Does not support type mutiset"); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java index 91330bae460d..322847f849ab 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import net.openhft.hashing.LongHashFunction; @@ -176,6 +177,11 @@ public FastHash visit(ArrayType arrayType) { throw new UnsupportedOperationException("Does not support type array"); } + @Override + public FastHash visit(VectorType vectorType) { + throw new UnsupportedOperationException("Does not support type vector"); + } + @Override public FastHash visit(MultisetType multisetType) { throw new UnsupportedOperationException("Does not support type mutiset"); diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java index 691c1e60e37d..a1f181b04d7f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java @@ -22,16 +22,19 @@ import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.BinaryMap; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.NestedRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.utils.MurmurHashUtils; +import org.apache.paimon.utils.Preconditions; import java.io.IOException; @@ -1153,6 +1156,23 @@ public static InternalArray readArrayData( return array; } + public static InternalVector readVectorData( + MemorySegment[] segments, int baseOffset, long offsetAndSize) { + final int numElementsWidth = 4; + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + Preconditions.checkArgument( + size >= numElementsWidth, + "size (" + size + ") should >= numElementsWidth (" + numElementsWidth + ")"); + + int vectorSize = MemorySegmentUtils.getInt(segments, offset + baseOffset); + Preconditions.checkArgument(vectorSize >= 0, "Invalid vector size: %s", vectorSize); + + BinaryVector vector = new BinaryVector(vectorSize); + vector.pointTo(segments, offset + baseOffset + numElementsWidth, size - numElementsWidth); + return vector; + } + /** Gets an instance of {@link InternalRow} from underlying {@link MemorySegment}. */ public static InternalRow readRowData( MemorySegment[] segments, int numFields, int baseOffset, long offsetAndSize) { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java index 7ebccba6b2c8..8b89c1ce7f8a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; @@ -148,6 +149,11 @@ public InternalArray getArray(int pos) { return chooseArray(pos).getArray(offsetInRow(pos)); } + @Override + public InternalVector getVector(int pos) { + return chooseArray(pos).getVector(offsetInRow(pos)); + } + @Override public InternalMap getMap(int pos) { return chooseArray(pos).getMap(offsetInRow(pos)); diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java index cc5cf2b18be3..08c6d24d2b79 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -172,6 +173,11 @@ public InternalArray getArray(int pos) { return chooseRow(pos).getArray(offsetInRow(pos)); } + @Override + public InternalVector getVector(int pos) { + return chooseRow(pos).getVector(offsetInRow(pos)); + } + @Override public InternalMap getMap(int pos) { return chooseRow(pos).getMap(offsetInRow(pos)); diff --git a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java index a65455c04524..241dc6100379 100644 --- a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java @@ -47,6 +47,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.ConvertBinaryUtil; import org.davidmoten.hilbert.HilbertCurve; @@ -276,6 +277,11 @@ public HProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); } + @Override + public HProcessFunction visit(VectorType vectorType) { + throw new RuntimeException("Unsupported type"); + } + @Override public HProcessFunction visit(MultisetType multisetType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java index e6ce1d3e3e55..1d40fe75e776 100644 --- a/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java @@ -48,6 +48,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import java.io.Serializable; import java.nio.ByteBuffer; @@ -364,6 +365,11 @@ public ZProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); } + @Override + public ZProcessFunction visit(VectorType vectorType) { + throw new RuntimeException("Unsupported type"); + } + @Override public ZProcessFunction visit(MultisetType multisetType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java index dd4548572f35..dbac55a07dde 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import java.util.List; import java.util.function.BiFunction; @@ -253,6 +254,28 @@ public BiFunction visit(ArrayType arrayType) { }; } + @Override + public BiFunction visit(VectorType vectorType) { + return (row, index) -> { + if (row.isNullAt(index)) { + return NULL_SIZE; + } else { + // If it is ensured that the element type + // must be primitive type, then this can be simplified. + BiFunction function = + vectorType.getElementType().accept(this); + InternalVector internalVector = row.getVector(index); + + int size = 0; + for (int i = 0; i < internalVector.size(); i++) { + size += function.apply(internalVector, i); + } + + return size; + } + }; + } + @Override public BiFunction visit(MultisetType multisetType) { return (row, index) -> { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java index 14751217d4cc..4cfe35e39851 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java @@ -318,6 +318,8 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) { pos, decimalType.getPrecision(), decimalType.getScale()); case ARRAY: return dataGetters.getArray(pos); + case VECTOR: + return dataGetters.getVector(pos); case MAP: case MULTISET: return dataGetters.getMap(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java index 7eedd7b8f24c..d4999dcf57dc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -140,6 +141,11 @@ public InternalArray getArray(int pos) { return row.getArray(indexMapping[pos]); } + @Override + public InternalVector getVector(int pos) { + return row.getVector(indexMapping[pos]); + } + @Override public InternalMap getMap(int pos) { return row.getMap(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java index b05914271be6..015fb022edbb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; @@ -42,7 +43,7 @@ public class ProjectedArray implements InternalArray { private InternalArray array; - private ProjectedArray(int[] indexMapping) { + protected ProjectedArray(int[] indexMapping) { this.indexMapping = indexMapping; } @@ -142,6 +143,11 @@ public InternalArray getArray(int pos) { return array.getArray(indexMapping[pos]); } + @Override + public InternalVector getVector(int pos) { + return array.getVector(indexMapping[pos]); + } + @Override public InternalMap getMap(int pos) { return array.getMap(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java index 18c7e5db798d..a9dc3e9253d7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; @@ -157,6 +158,11 @@ public InternalArray getArray(int pos) { return row.getArray(indexMapping[pos]); } + @Override + public InternalVector getVector(int pos) { + return row.getVector(indexMapping[pos]); + } + @Override public InternalMap getMap(int pos) { return row.getMap(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedVector.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedVector.java new file mode 100644 index 000000000000..889c444ec40c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedVector.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.DataType; + +/** + * An implementation of {@link InternalVector} which provides a projected view of the underlying + * {@link InternalVector}. + * + *

Projection includes both reducing the accessible fields and reordering them. + * + *

Note: This class supports only top-level projections, not nested projections. + */ +public class ProjectedVector extends ProjectedArray implements InternalVector { + + protected ProjectedVector(int[] indexMapping) { + super(indexMapping); + } + + /** + * Replaces the underlying {@link InternalVector} backing this {@link ProjectedVector}. + * + *

This method replaces the row data in place and does not return a new object. This is done + * for performance reasons. + */ + public ProjectedVector replaceVector(InternalVector vector) { + super.replaceArray(vector); + return this; + } + + @Override + public ProjectedArray replaceArray(InternalArray array) { + throw new IllegalArgumentException("ProjectedVector does not support replaceArray."); + } + + // --------------------------------------------------------------------------------------------- + + @Override + public BinaryString getString(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support String."); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + throw new UnsupportedOperationException("ProjectedVector does not support Decimal."); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + throw new UnsupportedOperationException("ProjectedVector does not support Timestamp."); + } + + @Override + public byte[] getBinary(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support Binary."); + } + + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support Variant."); + } + + @Override + public Blob getBlob(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support Blob."); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support nested Array."); + } + + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException( + "ProjectedVector does not support nested VectorType."); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException("ProjectedVector does not support Map."); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException("ProjectedVector does not support nested Row."); + } + + /** + * Create an empty {@link ProjectedVector} starting from a {@code projection} vector. + * + *

The vector represents the mapping of the fields of the original {@link DataType}. For + * example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd + * field and the 2nd field of the row. + * + * @see Projection + * @see ProjectedVector + */ + public static ProjectedVector from(int[] projection) { + return new ProjectedVector(projection); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java index 68450f8712b3..c1520be34107 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java @@ -33,6 +33,7 @@ import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; import static org.apache.paimon.types.DataTypeRoot.VARIANT; +import static org.apache.paimon.types.DataTypeRoot.VECTOR; /** Utils for type. */ public class TypeCheckUtils { @@ -85,6 +86,10 @@ public static boolean isArray(DataType type) { return type.getTypeRoot() == ARRAY; } + public static boolean isVector(DataType type) { + return type.getTypeRoot() == VECTOR; + } + public static boolean isMap(DataType type) { return type.getTypeRoot() == MAP; } @@ -110,6 +115,7 @@ public static boolean isComparable(DataType type) { && !isMultiset(type) && !isRow(type) && !isArray(type) + && !isVector(type) && !isVariant(type) && !isBlob(type); } @@ -120,6 +126,7 @@ public static boolean isMutable(DataType type) { case CHAR: case VARCHAR: // the internal representation of String is BinaryString which is mutable case ARRAY: + case VECTOR: case MULTISET: case MAP: case ROW: diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index e80b6621e9b1..bcb3f91b76df 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -19,21 +19,25 @@ package org.apache.paimon.utils; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeChecks; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -212,6 +216,20 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC throw new RuntimeException( String.format("Failed to parse Json String %s", s), e); } + case VECTOR: + // Step 1: parse the string to an array + VectorType vectorType = (VectorType) type; + DataType vectorElementType = vectorType.getElementType(); + Object vectorArrayObject = + castFromStringInternal(s, DataTypes.ARRAY(vectorElementType), isCdcValue); + if (!(vectorArrayObject instanceof InternalArray)) { + throw new RuntimeException( + "Unexpected parsed type during building a vector: " + + vectorArrayObject.getClass()); + } + // Step 2: build a vector + return BinaryVector.fromInternalArray( + (InternalArray) vectorArrayObject, vectorElementType); case MAP: MapType mapType = (MapType) type; DataType keyType = mapType.getKeyType(); @@ -333,6 +351,7 @@ public static boolean isInteroperable(DataType t1, DataType t2) { switch (t1.getTypeRoot()) { case ARRAY: + case VECTOR: case MAP: case MULTISET: case ROW: diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index b57eff100ac0..b7f92d06d95c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -64,6 +64,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; /** * This is a util about how to expand the {@link ColumnVector}s with the partition row and index @@ -354,6 +355,11 @@ public ColumnVector getColumnVector() { }; } + @Override + public ColumnVector visit(VectorType vectorType) { + throw new UnsupportedOperationException("VectorType is not supported."); + } + @Override public ColumnVector visit(MultisetType multisetType) { return new MapColumnVector() { diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java index 04181eb2a881..6dd3c23bdb87 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.data.serializer.InternalVectorSerializer; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.memory.MemorySegment; @@ -518,6 +519,39 @@ public void testGenericArray() { assertThat(array2.getInt(2)).isEqualTo(666); } + @Test + public void testBinaryVector() { + // 1. vector test + final Random rnd = new Random(System.currentTimeMillis()); + float[] vectorValues = new float[rnd.nextInt(128) + 1]; + { + byte[] bytes = new byte[vectorValues.length]; + rnd.nextBytes(bytes); + for (int i = 0; i < vectorValues.length; i++) { + vectorValues[i] = bytes[i]; + } + } + BinaryVector vector = BinaryVector.fromPrimitiveArray(vectorValues); + + assertThat(vectorValues.length).isEqualTo(vector.size()); + int[] checkIndexList = {0, rnd.nextInt(vectorValues.length), vectorValues.length - 1}; + for (int checkIndex : checkIndexList) { + assertThat(vectorValues[checkIndex]).isEqualTo(vector.getFloat(checkIndex)); + } + + // 2. test write vector to binary row + BinaryRow row = new BinaryRow(1); + BinaryRowWriter rowWriter = new BinaryRowWriter(row); + InternalVectorSerializer serializer = + new InternalVectorSerializer(DataTypes.FLOAT(), vector.size()); + rowWriter.writeVector(0, vector, serializer); + rowWriter.complete(); + + InternalVector vector2 = row.getVector(0); + assertThat(vector2.size()).isEqualTo(vector.size()); + assertThat(vector2.toFloatArray()).isEqualTo(vector.toFloatArray()); + } + @Test public void testBinaryMap() { BinaryArray array1 = new BinaryArray(); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryVectorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryVectorTest.java new file mode 100644 index 000000000000..ff3ea0caeb0a --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryVectorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BinaryVector}. */ +public class BinaryVectorTest { + + @Test + public void testFromPrimitiveArray() { + int[] ints = new int[] {1, 2, 3}; + BinaryVector intVector = BinaryVector.fromPrimitiveArray(ints); + assertThat(intVector.size()).isEqualTo(ints.length); + assertThat(intVector.getInt(1)).isEqualTo(2); + assertThat(intVector.toIntArray()).isEqualTo(ints); + + float[] floats = new float[] {1.0f, 2.5f}; + BinaryVector floatVector = BinaryVector.fromPrimitiveArray(floats); + assertThat(floatVector.size()).isEqualTo(floats.length); + assertThat(floatVector.getFloat(0)).isEqualTo(1.0f); + assertThat(floatVector.toFloatArray()).isEqualTo(floats); + + boolean[] booleans = new boolean[] {true, false, true}; + BinaryVector booleanVector = BinaryVector.fromPrimitiveArray(booleans); + assertThat(booleanVector.size()).isEqualTo(booleans.length); + assertThat(booleanVector.getBoolean(2)).isTrue(); + assertThat(booleanVector.toBooleanArray()).isEqualTo(booleans); + + byte[] bytes = new byte[] {1, 2, 3}; + BinaryVector byteVector = BinaryVector.fromPrimitiveArray(bytes); + assertThat(byteVector.size()).isEqualTo(bytes.length); + assertThat(byteVector.getByte(2)).isEqualTo((byte) 3); + assertThat(byteVector.toByteArray()).isEqualTo(bytes); + + short[] shorts = new short[] {4, 5, 6}; + BinaryVector shortVector = BinaryVector.fromPrimitiveArray(shorts); + assertThat(shortVector.size()).isEqualTo(shorts.length); + assertThat(shortVector.getShort(1)).isEqualTo((short) 5); + assertThat(shortVector.toShortArray()).isEqualTo(shorts); + + long[] longs = new long[] {7L, 8L, 9L}; + BinaryVector longVector = BinaryVector.fromPrimitiveArray(longs); + assertThat(longVector.size()).isEqualTo(longs.length); + assertThat(longVector.getLong(0)).isEqualTo(7L); + assertThat(longVector.toLongArray()).isEqualTo(longs); + + double[] doubles = new double[] {1.2d, 3.4d}; + BinaryVector doubleVector = BinaryVector.fromPrimitiveArray(doubles); + assertThat(doubleVector.size()).isEqualTo(doubles.length); + assertThat(doubleVector.getDouble(1)).isEqualTo(3.4d); + assertThat(doubleVector.toDoubleArray()).isEqualTo(doubles); + } + + @Test + public void testFromGenericArray() { + float[] values = new float[] {1.0f, -2.0f, 3.5f}; + GenericArray array = new GenericArray(values); + BinaryVector vector = BinaryVector.fromInternalArray(array, DataTypes.FLOAT()); + assertThat(vector.size()).isEqualTo(values.length); + assertThat(vector.toFloatArray()).isEqualTo(values); + } + + @Test + public void testFromBinaryArray() { + float[] values = new float[] {1.0f, -2.0f, 3.5f}; + BinaryArray array = BinaryArray.fromPrimitiveArray(values); + BinaryVector vector = BinaryVector.fromInternalArray(array, DataTypes.FLOAT()); + assertThat(vector.size()).isEqualTo(values.length); + assertThat(vector.toFloatArray()).isEqualTo(values); + } + + @Test + public void testCopiedBinaryVector() { + float[] values = new float[] {1.0f, -2.0f, 3.5f}; + BinaryVector vector = BinaryVector.fromPrimitiveArray(values); + BinaryVector copied = vector.copy(); + + // Assert that the copied vector is a new object + assertThat(copied == vector).isFalse(); + assertThat(copied.getSegments() == vector.getSegments()).isFalse(); + assertThat(copied.getSegments()[0] == vector.getSegments()[0]).isFalse(); + + // Assert that the copied vector has the same values as the original vector + assertThat(copied.toFloatArray()).isEqualTo(values); + assertThat(copied.equals(vector)).isTrue(); + } + + @Test + public void testPrimitiveElementSize() { + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.BOOLEAN())).isEqualTo(1); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.TINYINT())).isEqualTo(1); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.SMALLINT())).isEqualTo(2); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.INT())).isEqualTo(4); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.BIGINT())).isEqualTo(8); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.FLOAT())).isEqualTo(4); + assertThat(BinaryVector.getPrimitiveElementSize(DataTypes.DOUBLE())).isEqualTo(8); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalVectorSerializerTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalVectorSerializerTest.java new file mode 100644 index 000000000000..972b2cf480f5 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalVectorSerializerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.io.DataOutputSerializer; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link InternalVectorSerializer}. */ +class InternalVectorSerializerTest extends SerializerTestBase { + + @Override + protected InternalVectorSerializer createSerializer() { + return new InternalVectorSerializer(DataTypes.FLOAT(), 3); + } + + @Override + protected boolean deepEquals(InternalVector vector1, InternalVector vector2) { + if (vector1.size() != vector2.size()) { + return false; + } + float[] left = vector1.toFloatArray(); + float[] right = vector2.toFloatArray(); + if (left.length != right.length) { + return false; + } + for (int i = 0; i < left.length; i++) { + if (Float.compare(left[i], right[i]) != 0) { + return false; + } + } + return true; + } + + @Override + protected InternalVector[] getTestData() { + return new InternalVector[] { + BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f}), + BinaryVector.fromPrimitiveArray(new float[] {-1.0f, 0.5f, 2.0f}), + createCustomVector(new float[] {0.0f, -2.0f, 4.5f}) + }; + } + + @Override + protected InternalVector[] getSerializableTestData() { + InternalVector[] testData = getTestData(); + return Arrays.copyOfRange(testData, 0, testData.length - 1); + } + + @Test + public void testSerializeWithInvalidSize() { + InternalVectorSerializer serializer = new InternalVectorSerializer(DataTypes.FLOAT(), 2); + InternalVector vector = BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f}); + DataOutputSerializer out = new DataOutputSerializer(32); + assertThatThrownBy(() -> serializer.serialize(vector, out)).isInstanceOf(IOException.class); + } + + private static InternalVector createCustomVector(float[] values) { + BinaryVector vector = BinaryVector.fromPrimitiveArray(values); + Object proxy = + Proxy.newProxyInstance( + InternalVectorSerializerTest.class.getClassLoader(), + new Class[] {InternalVector.class}, + (obj, method, args) -> method.invoke(vector, args)); + return (InternalVector) proxy; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/datagen/RandomGeneratorVisitor.java b/paimon-common/src/test/java/org/apache/paimon/datagen/RandomGeneratorVisitor.java index c2cc190f733c..b64e034593c3 100644 --- a/paimon-common/src/test/java/org/apache/paimon/datagen/RandomGeneratorVisitor.java +++ b/paimon-common/src/test/java/org/apache/paimon/datagen/RandomGeneratorVisitor.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.Preconditions; import java.math.BigDecimal; @@ -236,6 +237,11 @@ public DataGeneratorContainer visit(ArrayType arrayType) { container.getOptions().toArray(new ConfigOption[0])); } + @Override + public DataGeneratorContainer visit(VectorType vectorType) { + throw new RuntimeException("RandomGenerator for VectorType has not been implemented yet."); + } + @Override public DataGeneratorContainer visit(MultisetType multisetType) { ConfigOption lenOption = diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java index aa2cae53885e..7fedf3a30d8f 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java @@ -62,4 +62,11 @@ void testFieldCountExtraction() { new DataField(1, "f1", STRING_TYPE))); assertThat(DataTypeChecks.getFieldCount(dataType)).isEqualTo(2); } + + @Test + void testVectorNestedTypesAndLength() { + DataType vectorType = DataTypes.VECTOR(4, DataTypes.FLOAT()); + assertThat(DataTypeChecks.getNestedTypes(vectorType)).containsExactly(DataTypes.FLOAT()); + assertThat(DataTypeChecks.getLength(vectorType)).isEqualTo(4); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/TypeUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/TypeUtilsTest.java index d7dab1614a48..54c1af27d8c4 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/TypeUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/TypeUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.utils; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; @@ -116,6 +117,14 @@ public void testArrayStringCastFromString() { assertThat(result).isEqualTo(expected); } + @Test + public void testVectorCastFromString() { + String value = "[1.0, 2.5, 3.5]"; + Object result = TypeUtils.castFromString(value, DataTypes.VECTOR(3, DataTypes.FLOAT())); + BinaryVector vector = (BinaryVector) result; + assertThat(vector.toFloatArray()).isEqualTo(new float[] {1.0f, 2.5f, 3.5f}); + } + @Test public void testLongCastFromString() { String value = "12"; diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index 346cae52b3ec..906d97501f4b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.format.SimpleColStats; @@ -281,6 +282,11 @@ public InternalArray getArray(int pos) { throw new UnsupportedOperationException(); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalMap getMap(int pos) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java index 4858eebeb73c..70b7e24514db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -141,6 +142,11 @@ public InternalArray getArray(int pos) { return row.getArray(offset + pos); } + @Override + public InternalVector getVector(int pos) { + return row.getVector(offset + pos); + } + @Override public InternalMap getMap(int pos) { return row.getMap(offset + pos); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java index 18b22886ce3e..2e172dadf38a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; @@ -139,6 +140,11 @@ public InternalArray getArray(int pos) { return row.getArray(pos); } + @Override + public InternalVector getVector(int pos) { + return row.getVector(pos); + } + @Override public InternalMap getMap(int pos) { return row.getMap(pos); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/VectorTypeTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/VectorTypeTableTest.java new file mode 100644 index 000000000000..afa2e1db1826 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/VectorTypeTableTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests for table with vector. */ +public class VectorTypeTableTest extends TableTestBase { + + private final float[] testVector = randomVector(); + + @Test + public void testBasic() throws Exception { + createTableDefault(); + + commitDefault(writeDataDefault(100, 1)); + + AtomicInteger integer = new AtomicInteger(0); + + readDefault( + row -> { + integer.incrementAndGet(); + if (integer.get() % 50 == 0) { + Assertions.assertArrayEquals( + row.getVector(2).toFloatArray(), testVector, 0); + } + }); + + assertThat(integer.get()).isEqualTo(100); + } + + @Override + protected Schema schemaDefault() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.VECTOR(testVector.length, DataTypes.FLOAT())); + // schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), "json"); + schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none"); + return schemaBuilder.build(); + } + + @Override + protected InternalRow dataDefault(int time, int size) { + return GenericRow.of( + RANDOM.nextInt(), + BinaryString.fromBytes(randomBytes()), + BinaryVector.fromPrimitiveArray(testVector)); + } + + @Override + protected byte[] randomBytes() { + byte[] binary = new byte[RANDOM.nextInt(1024) + 1]; + RANDOM.nextBytes(binary); + return binary; + } + + private float[] randomVector() { + byte[] randomBytes = randomBytes(); + float[] vector = new float[randomBytes.length]; + for (int i = 0; i < vector.length; i++) { + vector[i] = randomBytes[i]; + } + return vector; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java index c0da56f54454..5872750dd788 100644 --- a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java @@ -30,8 +30,10 @@ import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator; import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser; import static org.apache.paimon.types.DataTypes.DOUBLE; +import static org.apache.paimon.types.DataTypes.FLOAT; import static org.apache.paimon.types.DataTypes.INT; import static org.apache.paimon.types.DataTypes.STRING; +import static org.apache.paimon.types.DataTypes.VECTOR; import static org.assertj.core.api.Assertions.assertThat; class CodeGenUtilsTest { @@ -54,6 +56,15 @@ public void testProjectionCodegenCacheMiss() { new int[] {0, 1, 2})); } + @Test + public void testProjectionWithVector() { + assertClassEquals( + () -> + newProjection( + RowType.builder().fields(INT(), VECTOR(3, FLOAT())).build(), + new int[] {1})); + } + @Test public void testNormalizedKeyComputerCodegenCache() { assertClassEquals( @@ -74,6 +85,14 @@ public void testRecordComparatorCodegenCache() { () -> newRecordComparator(Arrays.asList(STRING(), INT()), new int[] {0, 1}, true)); } + @Test + public void testRecordComparatorCodegenCacheWithVector() { + assertClassEquals( + () -> + newRecordComparator( + Arrays.asList(STRING(), VECTOR(3, INT())), new int[] {0, 1}, true)); + } + @Test public void testRecordComparatorCodegenCacheMiss() { assertClassNotEquals( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java index 0ca5159782b6..5ec613e2f696 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.JsonSerdeUtil; import org.junit.jupiter.api.Test; @@ -114,6 +115,15 @@ private static Stream testData() { TestSpec.forString("TIMESTAMP_LTZ(3)").expectType(new LocalZonedTimestampType(3)), TestSpec.forString("VARIANT").expectType(new VariantType()), TestSpec.forString("BLOB").expectType(new BlobType()), + TestSpec.forString("VECTOR NOT NULL") + .expectType(DataTypes.VECTOR(3, DataTypes.FLOAT())), + TestSpec.forString("VECTOR") + .expectType(new VectorType(true, 5, DataTypes.INT())), + TestSpec.forString( + "{\"type\":\"VECTOR NOT NULL\",\"element\":\"BOOLEAN\",\"length\":7}") + .expectType(DataTypes.VECTOR(7, DataTypes.BOOLEAN())), + TestSpec.forString("{\"type\":\"VECTOR\",\"element\":\"TINYINT\",\"length\":11}") + .expectType(new VectorType(true, 11, DataTypes.TINYINT())), TestSpec.forString( "{\"type\":\"ARRAY\",\"element\":\"TIMESTAMP(3) WITH LOCAL TIME ZONE\"}") .expectType(new ArrayType(new LocalZonedTimestampType(3))), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java index ff9c7151d25d..6fc3016d4f95 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.flink.table.types.logical.LogicalType; @@ -159,6 +160,11 @@ public LogicalType visit(ArrayType arrayType) { arrayType.isNullable(), arrayType.getElementType().accept(this)); } + @Override + public LogicalType visit(VectorType vectorType) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public LogicalType visit(MultisetType multisetType) { return new org.apache.flink.table.types.logical.MultisetType( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index 1f632212d506..ae8a4fc59211 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.Variant; @@ -160,6 +161,11 @@ public InternalArray getArray(int pos) { return new FlinkArrayWrapper(row.getArray(pos)); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int pos) { return new FlinkMapWrapper(row.getMap(pos)); @@ -260,6 +266,11 @@ public InternalArray getArray(int pos) { return new FlinkArrayWrapper(array.getArray(pos)); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int pos) { return new FlinkMapWrapper(array.getMap(pos)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java index c1892a944eb9..3abbb6882c97 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java @@ -91,6 +91,7 @@ private void validateDataType(DataType dataType) { case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case ARRAY: + case VECTOR: case MAP: case ROW: // All types are supported in JSON diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java index 7f9b7a1545f8..d5ecf3dc8fd2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java @@ -21,6 +21,7 @@ import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; @@ -34,6 +35,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -102,6 +104,8 @@ private Object convertJsonValue(JsonNode node, DataType dataType, JsonOptions op } case ARRAY: return convertJsonArray(node, (ArrayType) dataType, options); + case VECTOR: + return convertJsonVector(node, (VectorType) dataType, options); case MAP: return convertJsonMap(node, (MapType) dataType, options); case ROW: @@ -136,6 +140,13 @@ private GenericArray convertJsonArray( return new GenericArray(elements.toArray()); } + private BinaryVector convertJsonVector( + JsonNode vectorNode, VectorType vectorType, JsonOptions options) { + ArrayType arrayType = DataTypes.ARRAY(vectorType.getElementType()); + GenericArray array = convertJsonArray(vectorNode, arrayType, options); + return BinaryVector.fromInternalArray(array, vectorType.getElementType()); + } + private GenericMap convertJsonMap(JsonNode objectNode, MapType mapType, JsonOptions options) { if (!objectNode.isObject()) { return handleParseError( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java index 01a55e9ae735..2aeb9a5c1bce 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java @@ -24,14 +24,17 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.format.text.AbstractTextFileWriter; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.JsonSerdeUtil; @@ -112,6 +115,8 @@ private Object convertRowValue(Object value, DataType dataType) { return BASE64_ENCODER.encodeToString((byte[]) value); case ARRAY: return convertRowArray((InternalArray) value, (ArrayType) dataType); + case VECTOR: + return convertRowVector((InternalVector) value, (VectorType) dataType); case MAP: return convertRowMap((InternalMap) value, (MapType) dataType); case ROW: @@ -133,6 +138,14 @@ private List convertRowArray(InternalArray array, ArrayType arrayType) { return result; } + private List convertRowVector(InternalVector vector, VectorType vectorType) { + if (vector.size() != vectorType.getLength()) { + throw new IllegalArgumentException( + "Size " + vector.size() + " != " + vectorType.getLength() + " in JsonWriter"); + } + return convertRowArray(vector, DataTypes.ARRAY(vectorType.getElementType())); + } + private Map convertRowMap(InternalMap map, MapType mapType) { int size = map.size(); Map result = new LinkedHashMap<>(size); // Pre-allocate capacity diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java index 6d8e771a616e..762037deb215 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java @@ -46,6 +46,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -293,6 +294,11 @@ public FieldWriter visit(ArrayType arrayType) { }; } + @Override + public FieldWriter visit(VectorType vectorType) { + throw new UnsupportedOperationException("Unsupported type: " + vectorType); + } + @Override public FieldWriter visit(MapType mapType) { FieldWriter keyWriter = mapType.getKeyType().accept(this); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java index 79c343ed64be..880af82af181 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java @@ -57,6 +57,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -226,6 +227,11 @@ public UpdaterFactory visit(ArrayType arrayType) { throw new RuntimeException("Array type is not supported"); } + @Override + public UpdaterFactory visit(VectorType vectorType) { + throw new RuntimeException("Vector type is not supported"); + } + @Override public UpdaterFactory visit(MultisetType multisetType) { throw new RuntimeException("Multiset type is not supported"); diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index 6641b9304eef..dacd12f492c1 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -49,6 +49,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.io.api.Binary; @@ -420,6 +421,11 @@ public Operators.Column visit(ArrayType arrayType) { throw new UnsupportedOperationException(); } + @Override + public Operators.Column visit(VectorType vectorType) { + throw new UnsupportedOperationException(); + } + @Override public Operators.Column visit(MultisetType multisetType) { throw new UnsupportedOperationException(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java index 92ad9c009819..40234094bb09 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -378,6 +379,21 @@ public void testWithCustomLineDelimiters() throws IOException { } } + @Test + public void testVectorTypeReadWrite() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.VECTOR(3, DataTypes.FLOAT())); + + float[] values = new float[] {1.0f, 2.0f, 3.0f}; + List testData = + Arrays.asList(GenericRow.of(1, BinaryVector.fromPrimitiveArray(values))); + + List result = writeThenRead(new Options(), rowType, testData, "test_vector"); + + assertThat(result).hasSize(1); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getVector(1).toFloatArray()).isEqualTo(values); + } + @Override public boolean supportDataFileWithoutExtension() { return true; diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java index 291664851ad5..02d14d360625 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.ArrayType; @@ -133,6 +134,11 @@ public InternalArray getArray(int i) { ((HivePaimonArray) this.getAs(i)).getList()); } + @Override + public InternalVector getVector(int i) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int i) { return getAs(i); diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java index 1592b6100cb3..64b4e2887f82 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java @@ -48,6 +48,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import javax.annotation.Nullable; @@ -192,6 +193,11 @@ public Void visit(ArrayType arrayType) { return null; } + @Override + public Void visit(VectorType vectorType) { + return null; + } + @Override public Void visit(MultisetType multisetType) { return null; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 7d0f051756a9..3ad3666e9760 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.spark.util.shim.TypeUtils$; @@ -262,6 +263,11 @@ public InternalArray getArray(int pos) { ((ArrayType) (tableSchema.fields()[pos].dataType())).elementType()); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int pos) { int actualPos = getActualFieldPosition(pos); @@ -427,6 +433,11 @@ public InternalArray getArray(int pos) { arrayData.getArray(pos), ((ArrayType) elementType).elementType()); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int pos) { MapType mapType = (MapType) elementType; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index 9ac1e5999422..cc947c7ea04b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.spark.util.shim.TypeUtils; @@ -181,6 +182,11 @@ public InternalArray getArray(int i) { return new PaimonArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i)); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int i) { return toPaimonMap((MapType) type.getTypeAt(i), row.getJavaMap(i)); @@ -353,6 +359,11 @@ public InternalArray getArray(int i) { return new PaimonArray(((ArrayType) elementType).getElementType(), array); } + @Override + public InternalVector getVector(int pos) { + throw new UnsupportedOperationException("Not support VectorType yet."); + } + @Override public InternalMap getMap(int i) { Object o = getAs(i); From 20ee66122a69591637454f7a16f1840062f2f9ee Mon Sep 17 00:00:00 2001 From: ColdL Date: Wed, 4 Feb 2026 19:34:52 +0800 Subject: [PATCH 2/2] vector type elements must be nonNull --- .../org/apache/paimon/types/DataTypes.java | 6 ++++- .../org/apache/paimon/types/VectorType.java | 5 ++-- .../paimon/types/DataTypeChecksTest.java | 24 +++++++++++++++++-- .../paimon/schema/DataTypeJsonParserTest.java | 14 +++++------ 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java index 39b180651ef5..6782509440e2 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java @@ -60,7 +60,11 @@ public static ArrayType ARRAY(DataType element) { } public static VectorType VECTOR(int length, DataType element) { - return new VectorType(length, element); + // The element type of vector should currently be nonNull. + // However, most other types default to nullable. + // For conciseness, we accommodate a nullable input element type here + // and copy the nonNull version when needed. + return new VectorType(length, element.isNullable() ? element.notNull() : element); } public static CharType CHAR(int length) { diff --git a/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java index a98bd6fa30d5..195fcafc7025 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/VectorType.java @@ -49,12 +49,13 @@ public class VectorType extends DataType { public VectorType(boolean isNullable, int length, DataType elementType) { super(isNullable, DataTypeRoot.VECTOR); - // TODO: should we support nullable for vector type? - // Preconditions.checkArgument(!isNullable, "Nullable is not supported for VectorType."); this.elementType = Preconditions.checkNotNull(elementType, "Element type must not be null."); Preconditions.checkArgument( isValidElementType(elementType), "Invalid element type for vector: " + elementType); + // Currently we do not support nullable elements. + Preconditions.checkArgument( + !elementType.isNullable(), "Element type must be nonNull for vector."); if (length < MIN_LENGTH) { throw new IllegalArgumentException( String.format( diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java index 7fedf3a30d8f..c1b4e148cdc6 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypeChecksTest.java @@ -24,6 +24,7 @@ import static org.apache.paimon.types.VarCharType.STRING_TYPE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** Tests for {@link DataTypeChecks}. */ class DataTypeChecksTest { @@ -65,8 +66,27 @@ void testFieldCountExtraction() { @Test void testVectorNestedTypesAndLength() { - DataType vectorType = DataTypes.VECTOR(4, DataTypes.FLOAT()); - assertThat(DataTypeChecks.getNestedTypes(vectorType)).containsExactly(DataTypes.FLOAT()); + DataType vectorType = DataTypes.VECTOR(4, DataTypes.FLOAT().notNull()); + assertThat(DataTypeChecks.getNestedTypes(vectorType)) + .containsExactly(DataTypes.FLOAT().notNull()); assertThat(DataTypeChecks.getLength(vectorType)).isEqualTo(4); + + // VECTOR accepts a nullable type and internally converted to nonNull + vectorType = DataTypes.VECTOR(5, DataTypes.BIGINT()); + assertThat(DataTypeChecks.getNestedTypes(vectorType)) + .containsExactly(DataTypes.BIGINT().notNull()); + assertThat(DataTypeChecks.getLength(vectorType)).isEqualTo(5); + } + + @Test + void testVectorElementMustBeNonNull() { + // If nullable element types are supported, this unit test needs to be corrected. + DataType vectorType = new VectorType(true, 4, new FloatType(false)); + try { + vectorType = new VectorType(true, 4, new FloatType(true)); + fail("Should throw IllegalArgumentException when element type is nullable."); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("Element type must be nonNull for vector."); + } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java index 5ec613e2f696..4b469f23f121 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java @@ -44,7 +44,6 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; -import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.JsonSerdeUtil; import org.junit.jupiter.api.Test; @@ -115,15 +114,16 @@ private static Stream testData() { TestSpec.forString("TIMESTAMP_LTZ(3)").expectType(new LocalZonedTimestampType(3)), TestSpec.forString("VARIANT").expectType(new VariantType()), TestSpec.forString("BLOB").expectType(new BlobType()), - TestSpec.forString("VECTOR NOT NULL") + TestSpec.forString("VECTOR NOT NULL") .expectType(DataTypes.VECTOR(3, DataTypes.FLOAT())), - TestSpec.forString("VECTOR") - .expectType(new VectorType(true, 5, DataTypes.INT())), + TestSpec.forString("VECTOR") + .expectType(DataTypes.VECTOR(5, DataTypes.INT()).nullable()), TestSpec.forString( - "{\"type\":\"VECTOR NOT NULL\",\"element\":\"BOOLEAN\",\"length\":7}") + "{\"type\":\"VECTOR NOT NULL\",\"element\":\"BOOLEAN NOT NULL\",\"length\":7}") .expectType(DataTypes.VECTOR(7, DataTypes.BOOLEAN())), - TestSpec.forString("{\"type\":\"VECTOR\",\"element\":\"TINYINT\",\"length\":11}") - .expectType(new VectorType(true, 11, DataTypes.TINYINT())), + TestSpec.forString( + "{\"type\":\"VECTOR\",\"element\":\"TINYINT NOT NULL\",\"length\":11}") + .expectType(DataTypes.VECTOR(11, DataTypes.TINYINT()).nullable()), TestSpec.forString( "{\"type\":\"ARRAY\",\"element\":\"TIMESTAMP(3) WITH LOCAL TIME ZONE\"}") .expectType(new ArrayType(new LocalZonedTimestampType(3))),