diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/constants/KuduConstants.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/constants/KuduConstants.java new file mode 100644 index 000000000..e694c5d46 --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/constants/KuduConstants.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.constants; + +public class KuduConstants { + public static final Long DEFAULT_TIME_OUT_MILLS = 3000L; +} diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/AbstractKuduSplitConstructor.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/AbstractKuduSplitConstructor.java index ce116b05d..7351007a6 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/AbstractKuduSplitConstructor.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/AbstractKuduSplitConstructor.java @@ -46,9 +46,9 @@ public AbstractKuduSplitConstructor(BitSailConfiguration jobConf, KuduClient kud public abstract boolean isAvailable(); - protected abstract boolean fillSplitConf(BitSailConfiguration jobConf, KuduClient client) throws IOException; + protected abstract boolean fillSplitConf(BitSailConfiguration jobConf, KuduClient client) throws Exception; - public abstract List construct(KuduClient kuduClient) throws IOException; + public abstract List construct(KuduClient kuduClient) throws Exception; /** * Used for determine parallelism num. diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSourceSplit.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSourceSplit.java index cd66b7bb8..8c800e8b8 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSourceSplit.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSourceSplit.java @@ -70,6 +70,13 @@ public void addPredicate(KuduPredicate predicate) throws IOException { serializedPredicates.add(KuduPredicate.serialize(Collections.singletonList(predicate))); } + public void addSerializedPredicates(byte[] predicates) { + if (serializedPredicates == null) { + serializedPredicates = new ArrayList<>(); + } + serializedPredicates.add(predicates); + } + public void bindScanner(KuduScanner.KuduScannerBuilder builder, Schema schema) { List kuduPredicates = deserializePredicates(schema); kuduPredicates.forEach(builder::addPredicate); diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSplitFactory.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSplitFactory.java index 88a79de28..a193a1c0b 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSplitFactory.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/KuduSplitFactory.java @@ -20,6 +20,8 @@ import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.connector.kudu.error.KuduErrorCode; import com.bytedance.bitsail.connector.kudu.option.KuduReaderOptions; +import com.bytedance.bitsail.connector.kudu.source.split.strategy.PartitionDivideSplitConstructor; +import com.bytedance.bitsail.connector.kudu.source.split.strategy.PredicationDivideSplitConstructor; import com.bytedance.bitsail.connector.kudu.source.split.strategy.SimpleDivideSplitConstructor; import org.apache.kudu.client.KuduClient; @@ -32,7 +34,9 @@ public class KuduSplitFactory { private static final Logger LOG = LoggerFactory.getLogger(KuduSplitFactory.class); public enum KuduSplitStrategy { - SIMPLE_DIVIDE + SIMPLE_DIVIDE, + PARTITION_DIVIDE, + PREDICATION_DIVIDE } @SuppressWarnings("checkstyle:FallThrough") @@ -51,6 +55,24 @@ public static AbstractKuduSplitConstructor getSplitConstructor(BitSailConfigurat } catch (IOException e) { LOG.warn("Failed to create SimpleDivideSplitConstructor, will try the next constructor type.", e); } + case PARTITION_DIVIDE: + try { + constructor = new PartitionDivideSplitConstructor(jobConf, client); + if (constructor.isAvailable()) { + break; + } + } catch (Exception e) { + LOG.warn("Failed to create PartitionDivideSplitConstructor, will try the next constructor type.", e); + } + case PREDICATION_DIVIDE: + try { + constructor = new PredicationDivideSplitConstructor(jobConf, client); + if (constructor.isAvailable()) { + break; + } + } catch (IOException e) { + LOG.warn("Failed to create PredicationDivideSplitConstructor, will try the next constructor type.", e); + } default: throw new BitSailException(KuduErrorCode.SPLIT_ERROR, "Cannot create a split constructor."); } diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/coordinator/KuduSourceSplitCoordinator.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/coordinator/KuduSourceSplitCoordinator.java index 3222c3429..4727eaefa 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/coordinator/KuduSourceSplitCoordinator.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/coordinator/KuduSourceSplitCoordinator.java @@ -36,7 +36,6 @@ import javax.annotation.Nullable; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -65,7 +64,7 @@ public void start() { KuduClient client = kuduFactory.getClient(); AbstractKuduSplitConstructor splitConstructor = KuduSplitFactory.getSplitConstructor(jobConf, client); splitList = splitConstructor.construct(client); - } catch (IOException e) { + } catch (Exception e) { throw new BitSailException(KuduErrorCode.SPLIT_ERROR, "Failed to create splits."); } diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PartitionDivideSplitConstructor.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PartitionDivideSplitConstructor.java new file mode 100644 index 000000000..fcc08b40a --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PartitionDivideSplitConstructor.java @@ -0,0 +1,213 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.source.split.strategy; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.kudu.constants.KuduConstants; +import com.bytedance.bitsail.connector.kudu.option.KuduReaderOptions; +import com.bytedance.bitsail.connector.kudu.source.split.AbstractKuduSplitConstructor; +import com.bytedance.bitsail.connector.kudu.source.split.KuduSourceSplit; +import com.bytedance.bitsail.connector.kudu.util.KuduKeyEncoderUtils; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Partition; +import org.apache.kudu.client.PartitionSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +public class PartitionDivideSplitConstructor extends AbstractKuduSplitConstructor { + private static final Logger LOG = LoggerFactory.getLogger(PartitionDivideSplitConstructor.class); + + private SplitConfiguration splitConf = null; + private boolean available = false; + private transient List rangePartitions = null; + + public PartitionDivideSplitConstructor(BitSailConfiguration jobConf, KuduClient client) throws Exception { + super(jobConf, client); + + if (!jobConf.fieldExists(KuduReaderOptions.SPLIT_CONFIGURATION)) { + LOG.warn("{} cannot work due to lack of split configuration.", this.getClass().getSimpleName()); + return; + } + String splitConfStr = jobConf.get(KuduReaderOptions.SPLIT_CONFIGURATION); + this.splitConf = new ObjectMapper().readValue(splitConfStr, SplitConfiguration.class); + KuduTable kuduTable = client.openTable(this.tableName); + this.available = splitConf.isValid(kuduTable); + if (!available) { + LOG.warn("{} cannot work because split configuration is invalid.", this.getClass().getSimpleName()); + return; + } + this.available = fillSplitConf(jobConf, client); + if (!available) { + return; + } + LOG.info("Successfully created ,final split configuration is: {}", splitConf); + } + + @Override + public boolean isAvailable() { + return available; + } + + @Override + @SuppressWarnings("checkstyle:MagicNumber") + protected boolean fillSplitConf(BitSailConfiguration jobConf, KuduClient client) throws Exception { + if (splitConf.getSplitNum() == null || splitConf.getSplitNum() <= 0) { + splitConf.setSplitNum(jobConf.getUnNecessaryOption(KuduReaderOptions.READER_PARALLELISM_NUM, 1)); + } + if (splitConf.timeOutMills == null || splitConf.timeOutMills <= 0) { + splitConf.setTimeOutMills(KuduConstants.DEFAULT_TIME_OUT_MILLS); + } + KuduTable kuduTable = client.openTable(this.tableName); + this.rangePartitions = kuduTable.getRangePartitions(splitConf.getTimeOutMills()); + if (rangePartitions.size() <= 0) { + LOG.warn("Partition cannot be empty."); + return false; + } + if (this.rangePartitions.size() < splitConf.getSplitNum()) { + this.splitConf.setSplitNum(this.rangePartitions.size()); + LOG.info("Resize split num to {}.", splitConf.getSplitNum()); + } + return true; + } + + @Override + public List construct(KuduClient kuduClient) throws Exception { + List splits = new ArrayList <>(); + + KuduTable kuduTable = kuduClient.openTable(this.tableName); + PartitionSchema partitionSchema = kuduTable.getPartitionSchema(); + PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema(); + + for (int i = 0; i < this.rangePartitions.size(); i++) { + KuduSourceSplit split = new KuduSourceSplit(i); + PartialRow lowerPartialRow = KuduKeyEncoderUtils.decodeRangePartitionKey(schema, partitionSchema, + ByteBuffer.wrap(rangePartitions.get(i).getPartitionKeyStart()).order(ByteOrder.BIG_ENDIAN)); + PartialRow upperPartialRow = KuduKeyEncoderUtils.decodeRangePartitionKey(schema, partitionSchema, + ByteBuffer.wrap(rangePartitions.get(i).getPartitionKeyEnd()).order(ByteOrder.BIG_ENDIAN)); + + for (int columnId : rangeSchema.getColumnIds()) { + split.addPredicate(KuduPredicate.newComparisonPredicate( + schema.getColumnByIndex(schema.getColumnIndex(columnId)), + KuduPredicate.ComparisonOp.GREATER_EQUAL, + getColumnValueInPartialRow(schema, columnId, lowerPartialRow))); + split.addPredicate(KuduPredicate.newComparisonPredicate( + schema.getColumnByIndex(schema.getColumnIndex(columnId)), + KuduPredicate.ComparisonOp.LESS, + getColumnValueInPartialRow(schema, columnId, upperPartialRow))); + } + splits.add(split); + LOG.info(">>> the {}-th split is: {}", i, splits.get(i).toFormatString(schema)); + } + LOG.info("Finally get {} splits.", splits.size()); + return splits; + } + + private Object getColumnValueInPartialRow(Schema schema, int columnId, PartialRow partialRow) { + Object val; + ColumnSchema columnSchema = schema.getColumnByIndex(schema.getColumnIndex(columnId)); + switch (columnSchema.getType()) { + case INT8: + case INT16: + case INT32: + val = partialRow.getInt(columnSchema.getName()); + break; + case DATE: { + val = partialRow.getDate(columnSchema.getName()); + break; + } + case INT64: + case UNIXTIME_MICROS: + val = partialRow.getLong(columnSchema.getName()); + break; + case BINARY: { + val = partialRow.getBinary(columnSchema.getName()); + break; + } + case VARCHAR: { + val = partialRow.getVarchar(columnSchema.getName()); + break; + } + case STRING: { + val = partialRow.getString(columnSchema.getName()); + break; + } + case DECIMAL: { + val = partialRow.getDecimal(columnSchema.getName()); + break; + } + default: + throw new IllegalArgumentException(String.format( + "The column type %s is not a valid key component type", + columnSchema.getType())); + } + return val; + } + + @Override + public int estimateSplitNum() { + int estimatedSplitNum = 1; + if (splitConf.getSplitNum() != null && splitConf.getSplitNum() > 0) { + estimatedSplitNum = splitConf.getSplitNum(); + } + LOG.info("Estimated split num is: {}", estimatedSplitNum); + return estimatedSplitNum; + } + + @NoArgsConstructor + @AllArgsConstructor + @Data + @ToString(of = {"splitNum", "timeOutMills"}) + public static class SplitConfiguration { + @JsonProperty("split_num") + private Integer splitNum; + + @JsonProperty("time_out_mills") + private Long timeOutMills; + + public boolean isValid(KuduTable table) { + PartitionSchema partitionSchema = table.getPartitionSchema(); + + if (partitionSchema == null) { + LOG.warn("Partition schema cannot be empty."); + return false; + } + if (partitionSchema.getRangeSchema() == null || CollectionUtils.isEmpty(partitionSchema.getRangeSchema().getColumnIds())) { + LOG.warn("Partition rangeSchema cannot be empty."); + return false; + } + return true; + } + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PredicationDivideSplitConstructor.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PredicationDivideSplitConstructor.java new file mode 100644 index 000000000..c6b6cb226 --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/PredicationDivideSplitConstructor.java @@ -0,0 +1,142 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.source.split.strategy; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.kudu.option.KuduReaderOptions; +import com.bytedance.bitsail.connector.kudu.source.split.AbstractKuduSplitConstructor; +import com.bytedance.bitsail.connector.kudu.source.split.KuduSourceSplit; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduPredicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class PredicationDivideSplitConstructor extends AbstractKuduSplitConstructor { + private static final Logger LOG = LoggerFactory.getLogger(PredicationDivideSplitConstructor.class); + + private SplitConfiguration splitConf = null; + private boolean available = false; + + public PredicationDivideSplitConstructor(BitSailConfiguration jobConf, KuduClient client) throws IOException { + super(jobConf, client); + + if (!jobConf.fieldExists(KuduReaderOptions.SPLIT_CONFIGURATION)) { + LOG.warn("{} cannot work due to lack of split configuration.", this.getClass().getSimpleName()); + return; + } + String splitConfStr = jobConf.get(KuduReaderOptions.SPLIT_CONFIGURATION); + this.splitConf = new ObjectMapper().readValue(splitConfStr, SplitConfiguration.class); + this.available = splitConf.isValid(schema); + if (!available) { + LOG.warn("{} cannot work because split configuration is invalid.", this.getClass().getSimpleName()); + return; + } + this.available = fillSplitConf(jobConf, client); + if (!available) { + return; + } + + LOG.info("Successfully created ,final split configuration is: {}", splitConf); + } + + @Override + public boolean isAvailable() { + return available; + } + + @Override + protected boolean fillSplitConf(BitSailConfiguration jobConf, KuduClient client) throws IOException { + if (splitConf.getSplitNum() == null || splitConf.getSplitNum() <= 0) { + splitConf.setSplitNum(jobConf.getUnNecessaryOption(KuduReaderOptions.READER_PARALLELISM_NUM, 1)); + } + if (splitConf.getSplitNum() > splitConf.predications.size()) { + splitConf.setSplitNum(splitConf.predications.size()); + LOG.info("Resize split num to {}.", splitConf.getSplitNum()); + } + return true; + } + + @Override + public List construct(KuduClient kuduClient) throws Exception { + List splits = new ArrayList<>(splitConf.getSplitNum()); + + List predications = this.splitConf.predications; + for (int i = 0; i < predications.size(); i++) { + KuduSourceSplit split = new KuduSourceSplit(i); + split.addSerializedPredicates(predications.get(i)); + splits.add(split); + } + + LOG.info("Finally get {} splits.", splits.size()); + for (int i = 0; i < splits.size(); ++i) { + LOG.info(">>> the {}-th split is: {}", i, splits.get(i).toFormatString(schema)); + } + return splits; + } + + @Override + public int estimateSplitNum() { + int estimatedSplitNum = 1; + if (splitConf.getSplitNum() != null && splitConf.getSplitNum() > 0) { + estimatedSplitNum = splitConf.getSplitNum(); + } + LOG.info("Estimated split num is: {}", estimatedSplitNum); + return estimatedSplitNum; + } + + @NoArgsConstructor + @AllArgsConstructor + @Data + @ToString(of = {"predications", "splitNum"}) + public static class SplitConfiguration { + @JsonProperty("predications") + private List predications; + + @JsonProperty("split_num") + private Integer splitNum; + + @JsonIgnore + public boolean isValid(Schema schema) { + if (predications == null || predications.size() == 0) { + LOG.warn("Predications configurations cannot be empty."); + return false; + } + for (byte[] predication : predications) { + try { + KuduPredicate.deserialize(schema, predication); + } catch (Exception e) { + LOG.warn("Predication {} cannot be correctly deserialized, error is {}.", predication, e.getMessage()); + return false; + } + } + return true; + } + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/SimpleDivideSplitConstructor.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/SimpleDivideSplitConstructor.java index 3c7aa7066..41da2321b 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/SimpleDivideSplitConstructor.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/source/split/strategy/SimpleDivideSplitConstructor.java @@ -140,7 +140,7 @@ protected boolean fillSplitConf(BitSailConfiguration jobConf, KuduClient client) } @Override - public List construct(KuduClient kuduClient) throws IOException { + public List construct(KuduClient kuduClient) throws Exception { List splits = new ArrayList<>(splitConf.getSplitNum()); int index = 0; diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/util/KuduKeyEncoderUtils.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/util/KuduKeyEncoderUtils.java new file mode 100644 index 000000000..a53025021 --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/util/KuduKeyEncoderUtils.java @@ -0,0 +1,191 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.util; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.PartitionSchema; +import org.apache.kudu.util.ByteVec; +import org.apache.kudu.util.DateUtil; +import org.apache.kudu.util.DecimalUtil; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; + +public class KuduKeyEncoderUtils { + @SuppressWarnings("checkstyle:MagicNumber") + private static final BigInteger MIN_VALUE_128 = BigInteger.valueOf(-2).pow(127); + + /** + * Decodes a range partition key into a partial row and other columns set null. + * + * @param schema the schema of the table + * @param partitionSchema the partition schema of the table + * @param buf the encoded range partition key + * @return the decoded range key + */ + public static PartialRow decodeRangePartitionKey(Schema schema, + PartitionSchema partitionSchema, + ByteBuffer buf) { + PartialRow row = schema.newPartialRow(); + Iterator rangeIds = partitionSchema.getRangeSchema().getColumnIds().iterator(); + while (rangeIds.hasNext()) { + int idx = schema.getColumnIndex(rangeIds.next()); + if (buf.hasRemaining()) { + decodeColumn(buf, row, idx, !rangeIds.hasNext()); + } else { + row.setNull(idx); + } + } + + if (buf.hasRemaining()) { + throw new IllegalArgumentException("Unable to decode all partition key bytes"); + } + return row; + } + + /** + * Decoded a key-encoded column into a row. + * + * @param buf the buffer containing the column + * @param row the row to set the column value in + * @param idx the index of the column to decode + * @param isLast whether the column is the last column in the key + */ + public static void decodeColumn(ByteBuffer buf, PartialRow row, int idx, boolean isLast) { + Schema schema = row.getSchema(); + ColumnSchema column = schema.getColumnByIndex(idx); + switch (column.getType()) { + case INT8: + row.addByte(idx, (byte) (buf.get() ^ Byte.MIN_VALUE)); + break; + case INT16: + row.addShort(idx, (short) (buf.getShort() ^ Short.MIN_VALUE)); + break; + case DATE: { + int days = buf.getInt() ^ Integer.MIN_VALUE; + row.addDate(idx, DateUtil.epochDaysToSqlDate(days)); + break; + } + case INT32: + row.addInt(idx, buf.getInt() ^ Integer.MIN_VALUE); + break; + case INT64: + case UNIXTIME_MICROS: + row.addLong(idx, buf.getLong() ^ Long.MIN_VALUE); + break; + case BINARY: { + byte[] binary = decodeBinaryColumn(buf, isLast); + row.addBinary(idx, binary); + break; + } + case VARCHAR: { + byte[] binary = decodeBinaryColumn(buf, isLast); + row.addVarchar(idx, new String(binary, StandardCharsets.UTF_8)); + break; + } + case STRING: { + byte[] binary = decodeBinaryColumn(buf, isLast); + row.addStringUtf8(idx, binary); + break; + } + case DECIMAL: { + int scale = column.getTypeAttributes().getScale(); + int size = column.getTypeSize(); + switch (size) { + case DecimalUtil.DECIMAL32_SIZE: + int intVal = buf.getInt() ^ Integer.MIN_VALUE; + row.addDecimal(idx, BigDecimal.valueOf(intVal, scale)); + break; + case DecimalUtil.DECIMAL64_SIZE: + long longVal = buf.getLong() ^ Long.MIN_VALUE; + row.addDecimal(idx, BigDecimal.valueOf(longVal, scale)); + break; + case DecimalUtil.DECIMAL128_SIZE: + byte[] bytes = new byte[size]; + buf.get(bytes); + BigInteger bigIntVal = new BigInteger(bytes).xor(MIN_VALUE_128); + row.addDecimal(idx, new BigDecimal(bigIntVal, scale)); + break; + default: + throw new IllegalArgumentException("Unsupported decimal type size: " + size); + } + break; + } + default: + throw new IllegalArgumentException(String.format( + "The column type %s is not a valid key component type", + schema.getColumnByIndex(idx).getType())); + } + } + + /** + * Decode a binary key column. + * + * @param key the key bytes + * @param isLast whether the column is the final column in the key. + * @return the binary value. + */ + public static byte[] decodeBinaryColumn(ByteBuffer key, boolean isLast) { + if (isLast) { + byte[] bytes = Arrays.copyOfRange(key.array(), + key.arrayOffset() + key.position(), + key.arrayOffset() + key.limit()); + key.position(key.limit()); + return bytes; + } + + // When encoding a binary column that is not the final column in the key, a + // 0x0000 separator is used to retain lexicographic comparability. Null + // bytes in the input are escaped as 0x0001. + ByteVec buf = ByteVec.withCapacity(key.remaining()); + for (int i = key.position(); i < key.limit(); i++) { + if (key.get(i) == 0) { + switch (key.get(i + 1)) { + case 0: { + buf.append(key.array(), + key.arrayOffset() + key.position(), + i - key.position()); + key.position(i + 2); + return buf.toArray(); + } + case 1: { + buf.append(key.array(), + key.arrayOffset() + key.position(), + i + 1 - key.position()); + i++; + key.position(i + 1); + break; + } + default: throw new IllegalArgumentException("Unexpected binary sequence"); + } + } + } + + buf.append(key.array(), + key.arrayOffset() + key.position(), + key.remaining()); + key.position(key.limit()); + return buf.toArray(); + } + +} diff --git a/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/KuduKeyEncoderUtilsTest.java b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/KuduKeyEncoderUtilsTest.java new file mode 100644 index 000000000..33cc936ca --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/KuduKeyEncoderUtilsTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu; + +import com.bytedance.bitsail.connector.kudu.util.KuduKeyEncoderUtils; + +import com.google.common.collect.Lists; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Partition; +import org.apache.kudu.client.PartitionSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings("checkstyle:MagicNumber") +public class KuduKeyEncoderUtilsTest { + List columnSchemaList = Lists.newArrayList( + new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("field_long", Type.INT64).build(), + new ColumnSchema.ColumnSchemaBuilder("field_string", Type.STRING).build() + ); + List columnIds = Lists.newArrayList(0, 1, 2); + Schema schema = new Schema(columnSchemaList, columnIds); + + @Test + public void testDecodeRangePartitionKey() { + PartialRow lowerPartialRow1 = new PartialRow(schema); + lowerPartialRow1.addInt(0, 100); + PartialRow upperPartialRow1 = new PartialRow(schema); + upperPartialRow1.addInt(0, 400); + + PartialRow lowerPartialRow2 = new PartialRow(schema); + lowerPartialRow2.addInt(0, 400); + PartialRow upperPartialRow2 = new PartialRow(schema); + upperPartialRow2.addInt(0, 700); + + Partition partition1 = new Partition(lowerPartialRow1.encodePrimaryKey(), upperPartialRow1.encodePrimaryKey(), Lists.newArrayList()); + Partition partition2 = new Partition(lowerPartialRow2.encodePrimaryKey(), upperPartialRow2.encodePrimaryKey(), Lists.newArrayList()); + List partitions = Lists.newArrayList(partition1, partition2); + PartitionSchema.RangeSchema rangeSchema = new PartitionSchema.RangeSchema(Lists.newArrayList(0)); + PartitionSchema partitionSchema = new PartitionSchema(rangeSchema, new ArrayList <>(), schema); + + PartialRow[][] partialRows = new PartialRow[][] {{lowerPartialRow1, upperPartialRow1}, {lowerPartialRow2, upperPartialRow2}}; + + for (int i = 0; i < partitions.size(); i++) { + Assert.assertEquals(partialRows[i][0].getInt(0), KuduKeyEncoderUtils.decodeRangePartitionKey(schema, partitionSchema, + ByteBuffer.wrap(partitions.get(i).getPartitionKeyStart()).order(ByteOrder.BIG_ENDIAN)).getInt(0)); + Assert.assertEquals(partialRows[i][1].getInt(0), KuduKeyEncoderUtils.decodeRangePartitionKey(schema, partitionSchema, + ByteBuffer.wrap(partitions.get(i).getPartitionKeyEnd()).order(ByteOrder.BIG_ENDIAN)).getInt(0)); + } + } +} diff --git a/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PartitionDivideSplitConstructorTest.java b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PartitionDivideSplitConstructorTest.java new file mode 100644 index 000000000..f479e788c --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PartitionDivideSplitConstructorTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.source.split; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.kudu.option.KuduReaderOptions; +import com.bytedance.bitsail.connector.kudu.source.split.strategy.PartitionDivideSplitConstructor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Partition; +import org.apache.kudu.client.PartitionSchema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings("checkstyle:MagicNumber") +public class PartitionDivideSplitConstructorTest { + + List columnSchemaList = Lists.newArrayList( + new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("field_long", Type.INT64).build(), + new ColumnSchema.ColumnSchemaBuilder("field_string", Type.STRING).build() + ); + List columnIds = Lists.newArrayList(0, 1, 2); + Schema schema = new Schema(columnSchemaList, columnIds); + + String tableName = "test_kudu_table"; + KuduTable mockTable = Mockito.mock(KuduTable.class); + KuduClient mockClient = Mockito.mock(KuduClient.class); + + List lowerPredicates = Lists.newArrayList( + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 100), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 400) + ); + List upperPredicates = Lists.newArrayList( + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.LESS, 400), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.LESS, 700) + ); + PartitionSchema.RangeSchema rangeSchema = new PartitionSchema.RangeSchema(Lists.newArrayList(0)); + PartitionSchema partitionSchema = new PartitionSchema(rangeSchema, new ArrayList <>(), schema); + @Before + public void init() throws Exception { + PartialRow lowerPartialRow1 = new PartialRow(schema); + lowerPartialRow1.addInt(0, 100); + PartialRow upperPartialRow1 = new PartialRow(schema); + upperPartialRow1.addInt(0, 400); + + PartialRow lowerPartialRow2 = new PartialRow(schema); + lowerPartialRow2.addInt(0, 400); + PartialRow upperPartialRow2 = new PartialRow(schema); + upperPartialRow2.addInt(0, 700); + + Partition partition1 = new Partition(lowerPartialRow1.encodePrimaryKey(), upperPartialRow1.encodePrimaryKey(), Lists.newArrayList()); + Partition partition2 = new Partition(lowerPartialRow2.encodePrimaryKey(), upperPartialRow2.encodePrimaryKey(), Lists.newArrayList()); + List partitions = Lists.newArrayList(partition1, partition2); + + Mockito.when(mockTable.getSchema()).thenReturn(schema); + Mockito.when(mockClient.openTable(tableName)).thenReturn(mockTable); + Mockito.when(mockTable.getPartitionSchema()).thenReturn(partitionSchema); + Mockito.when(mockTable.getRangePartitions(3000L)).thenReturn(partitions); + } + + @Test + public void testParseSplitConf() throws Exception { + PartitionDivideSplitConstructor.SplitConfiguration splitConf = new PartitionDivideSplitConstructor.SplitConfiguration(); + splitConf.setSplitNum(3); + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(KuduReaderOptions.SPLIT_STRATEGY, "partition_divide"); + jobConf.set(KuduReaderOptions.KUDU_TABLE_NAME, tableName); + jobConf.set(KuduReaderOptions.SPLIT_CONFIGURATION, new ObjectMapper().writeValueAsString(splitConf)); + jobConf.set(KuduReaderOptions.READER_PARALLELISM_NUM, 3); + + PartitionDivideSplitConstructor constructor = new PartitionDivideSplitConstructor(jobConf, mockClient); + Assert.assertEquals(2, constructor.estimateSplitNum()); + + List splits = constructor.construct(mockClient); + Assert.assertEquals(2, splits.size()); + + for (int i = 0; i < splits.size(); ++i) { + List predicates = splits.get(i).deserializePredicates(schema); + Assert.assertEquals(2, predicates.size()); + Assert.assertEquals(lowerPredicates.get(i), predicates.get(0)); + Assert.assertEquals(upperPredicates.get(i), predicates.get(1)); + } + + } +} diff --git a/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PredicationDivideSplitConstructorTest.java b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PredicationDivideSplitConstructorTest.java new file mode 100644 index 000000000..3beedc094 --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/test/java/com/bytedance/bitsail/connector/kudu/source/split/PredicationDivideSplitConstructorTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.source.split; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.kudu.option.KuduReaderOptions; +import com.bytedance.bitsail.connector.kudu.source.split.strategy.PredicationDivideSplitConstructor; +import com.bytedance.bitsail.connector.kudu.source.split.strategy.PredicationDivideSplitConstructor.SplitConfiguration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduTable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@SuppressWarnings("checkstyle:MagicNumber") +public class PredicationDivideSplitConstructorTest { + + List columnSchemaList = Lists.newArrayList( + new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("field_long", Type.INT64).build(), + new ColumnSchema.ColumnSchemaBuilder("field_string", Type.STRING).build() + ); + Schema schema = new Schema(columnSchemaList); + String tableName = "test_kudu_table"; + KuduTable mockTable = Mockito.mock(KuduTable.class); + KuduClient mockClient = Mockito.mock(KuduClient.class); + + List singlePredicates = Lists.newArrayList( + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.EQUAL, 334), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.EQUAL, 668), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.EQUAL, 1002) + ); + + List lowerPredicates = Lists.newArrayList( + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 0), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 334), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 668) + ); + List upperPredicates = Lists.newArrayList( + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.LESS, 334), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.LESS, 668), + KuduPredicate.newComparisonPredicate(columnSchemaList.get(0), KuduPredicate.ComparisonOp.LESS, 1002) + ); + + @Before + public void init() throws Exception { + Mockito.when(mockTable.getSchema()).thenReturn(schema); + Mockito.when(mockClient.openTable(tableName)).thenReturn(mockTable); + } + + @Test + public void testSingleParseSplitConf() throws Exception { + SplitConfiguration splitConf = new SplitConfiguration(); + List predications = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + predications.add(KuduPredicate.serialize(Collections.singletonList(singlePredicates.get(i)))); + } + splitConf.setPredications(predications); + splitConf.setSplitNum(3); + + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(KuduReaderOptions.KUDU_TABLE_NAME, tableName); + jobConf.set(KuduReaderOptions.SPLIT_CONFIGURATION, new ObjectMapper().writeValueAsString(splitConf)); + jobConf.set(KuduReaderOptions.READER_PARALLELISM_NUM, 3); + + PredicationDivideSplitConstructor constructor = new PredicationDivideSplitConstructor(jobConf, mockClient); + Assert.assertEquals(3, constructor.estimateSplitNum()); + List splits = constructor.construct(mockClient); + Assert.assertEquals(3, splits.size()); + + for (int i = 0; i < 3; ++i) { + List predicates = splits.get(i).deserializePredicates(schema); + Assert.assertEquals(1, predicates.size()); + Assert.assertEquals(singlePredicates.get(i), predicates.get(0)); + } + } + + @Test + public void testParseMutilSplitConf() throws Exception { + SplitConfiguration splitConf = new SplitConfiguration(); + List predications = new ArrayList <>(); + for (int i = 0; i < 3; i++) { + predications.add(KuduPredicate.serialize(Lists.newArrayList(lowerPredicates.get(i), upperPredicates.get(i)))); + } + splitConf.setPredications(predications); + splitConf.setSplitNum(4); + + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(KuduReaderOptions.KUDU_TABLE_NAME, tableName); + jobConf.set(KuduReaderOptions.SPLIT_CONFIGURATION, new ObjectMapper().writeValueAsString(splitConf)); + + PredicationDivideSplitConstructor constructor = new PredicationDivideSplitConstructor(jobConf, mockClient); + Assert.assertEquals(3, constructor.estimateSplitNum()); + + List splits = constructor.construct(mockClient); + Assert.assertEquals(3, splits.size()); + + for (int i = 0; i < 3; ++i) { + List predicates = splits.get(i).deserializePredicates(schema); + Assert.assertEquals(2, predicates.size()); + Assert.assertTrue(lowerPredicates.get(i).equals(predicates.get(0)) || upperPredicates.get(i).equals(predicates.get(1))); + } + + } +}