diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 4266421..1729ea3 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::spec::types::DataType; +use crate::spec::types::{DataType, RowType}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// The table schema for paimon table. /// @@ -102,6 +102,344 @@ pub fn escape_single_quotes(text: &str) -> String { text.replace('\'', "''") } +// ======================= Schema (DDL) =============================== + +/// Option key for primary key in table options (same as [CoreOptions.PRIMARY_KEY](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)). +pub const PRIMARY_KEY_OPTION: &str = "primary-key"; +/// Option key for partition in table options (same as [CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)). +pub const PARTITION_OPTION: &str = "partition"; + +/// Schema of a table (logical DDL schema). +/// +/// Corresponds to [org.apache.paimon.schema.Schema](https://github.com/apache/paimon/blob/1.3/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Schema { + fields: Vec, + partition_keys: Vec, + primary_keys: Vec, + options: HashMap, + comment: Option, +} + +impl Schema { + /// Build a schema with validation. Normalizes partition/primary keys from options if present. + pub fn new( + fields: Vec, + partition_keys: Vec, + primary_keys: Vec, + mut options: HashMap, + comment: Option, + ) -> crate::Result { + let primary_keys = Self::normalize_primary_keys(&primary_keys, &mut options)?; + let partition_keys = Self::normalize_partition_keys(&partition_keys, &mut options)?; + let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?; + + Ok(Self { + fields, + partition_keys, + primary_keys, + options, + comment, + }) + } + + /// Normalize primary keys: optionally take from table options (`primary-key`), remove from options. + /// Corresponds to Java `normalizePrimaryKeys`. + fn normalize_primary_keys( + primary_keys: &[String], + options: &mut HashMap, + ) -> crate::Result> { + if let Some(pk) = options.remove(PRIMARY_KEY_OPTION) { + if !primary_keys.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: "Cannot define primary key on DDL and table options at the same time." + .to_string(), + }); + } + return Ok(pk + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect()); + } + Ok(primary_keys.to_vec()) + } + + /// Normalize partition keys: optionally take from table options (`partition`), remove from options. + /// Corresponds to Java `normalizePartitionKeys`. + fn normalize_partition_keys( + partition_keys: &[String], + options: &mut HashMap, + ) -> crate::Result> { + if let Some(part) = options.remove(PARTITION_OPTION) { + if !partition_keys.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: "Cannot define partition on DDL and table options at the same time." + .to_string(), + }); + } + return Ok(part + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect()); + } + Ok(partition_keys.to_vec()) + } + + /// Normalize fields: validate (duplicate/subset checks) and make primary key columns non-nullable. + /// Corresponds to Java `normalizeFields`. + fn normalize_fields( + fields: &[DataField], + partition_keys: &[String], + primary_keys: &[String], + ) -> crate::Result> { + let field_names: Vec = fields.iter().map(|f| f.name().to_string()).collect(); + Self::validate_no_duplicate_fields(&field_names)?; + Self::validate_partition_keys(&field_names, partition_keys)?; + Self::validate_primary_keys(&field_names, primary_keys)?; + + if primary_keys.is_empty() { + return Ok(fields.to_vec()); + } + + let pk_set: HashSet<&str> = primary_keys.iter().map(String::as_str).collect(); + let mut new_fields = Vec::with_capacity(fields.len()); + for f in fields { + if pk_set.contains(f.name()) && f.data_type().is_nullable() { + new_fields.push( + DataField::new( + f.id(), + f.name().to_string(), + f.data_type().copy_with_nullable(false)?, + ) + .with_description(f.description().map(|s| s.to_string())), + ); + } else { + new_fields.push(f.clone()); + } + } + Ok(new_fields) + } + + /// Table columns must not contain duplicate field names. + fn validate_no_duplicate_fields(field_names: &[String]) -> crate::Result<()> { + let duplicates = Self::duplicate_fields(field_names); + if duplicates.is_empty() { + Ok(()) + } else { + Err(crate::Error::ConfigInvalid { + message: format!( + "Table column {field_names:?} must not contain duplicate fields. Found: {duplicates:?}" + ), + }) + } + } + + /// Partition key constraint must not contain duplicates; all partition keys must be in table columns. + fn validate_partition_keys( + field_names: &[String], + partition_keys: &[String], + ) -> crate::Result<()> { + let all_names: HashSet<&str> = field_names.iter().map(String::as_str).collect(); + let duplicates = Self::duplicate_fields(partition_keys); + if !duplicates.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: format!( + "Partition key constraint {partition_keys:?} must not contain duplicate columns. Found: {duplicates:?}" + ), + }); + } + if !partition_keys + .iter() + .all(|k| all_names.contains(k.as_str())) + { + return Err(crate::Error::ConfigInvalid { + message: format!( + "Table column {field_names:?} should include all partition fields {partition_keys:?}" + ), + }); + } + Ok(()) + } + + /// Primary key constraint must not contain duplicates; all primary keys must be in table columns. + fn validate_primary_keys(field_names: &[String], primary_keys: &[String]) -> crate::Result<()> { + if primary_keys.is_empty() { + return Ok(()); + } + let all_names: HashSet<&str> = field_names.iter().map(String::as_str).collect(); + let duplicates = Self::duplicate_fields(primary_keys); + if !duplicates.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: format!( + "Primary key constraint {primary_keys:?} must not contain duplicate columns. Found: {duplicates:?}" + ), + }); + } + if !primary_keys.iter().all(|k| all_names.contains(k.as_str())) { + return Err(crate::Error::ConfigInvalid { + message: format!( + "Table column {field_names:?} should include all primary key constraint {primary_keys:?}" + ), + }); + } + Ok(()) + } + + /// Returns the set of names that appear more than once. + pub fn duplicate_fields(names: &[String]) -> HashSet { + let mut seen = HashMap::new(); + for n in names { + *seen.entry(n.clone()).or_insert(0) += 1; + } + seen.into_iter() + .filter(|(_, count)| *count > 1) + .map(|(name, _)| name) + .collect() + } + + /// Row type with these fields (nullable = false for table row). + pub fn row_type(&self) -> RowType { + RowType::with_nullable(false, self.fields.clone()) + } + + pub fn fields(&self) -> &[DataField] { + &self.fields + } + + pub fn partition_keys(&self) -> &[String] { + &self.partition_keys + } + + pub fn primary_keys(&self) -> &[String] { + &self.primary_keys + } + + pub fn options(&self) -> &HashMap { + &self.options + } + + pub fn comment(&self) -> Option<&str> { + self.comment.as_deref() + } + + /// Create a new schema with the same keys/options/comment but different row type + pub fn copy(&self, row_type: RowType) -> crate::Result { + Self::new( + row_type.fields().to_vec(), + self.partition_keys.clone(), + self.primary_keys.clone(), + self.options.clone(), + self.comment.clone(), + ) + } + + /// Create a new builder for configuring a schema. + pub fn builder() -> SchemaBuilder { + SchemaBuilder::new() + } +} + +/// Builder for [`Schema`]. +pub struct SchemaBuilder { + columns: Vec, + partition_keys: Vec, + primary_keys: Vec, + options: HashMap, + comment: Option, + next_field_id: i32, +} + +impl SchemaBuilder { + pub fn new() -> Self { + Self { + columns: Vec::new(), + partition_keys: Vec::new(), + primary_keys: Vec::new(), + options: HashMap::new(), + comment: None, + next_field_id: 0, + } + } + + /// Add a column (name, data type). + pub fn column(self, column_name: impl Into, data_type: DataType) -> Self { + self.column_with_description(column_name, data_type, None) + } + + /// Add a column with optional description. + /// + /// TODO: Support RowType in schema columns with field ID assignment for nested fields. + /// See . + pub fn column_with_description( + mut self, + column_name: impl Into, + data_type: DataType, + description: Option, + ) -> Self { + if data_type.contains_row_type() { + todo!( + "Column type containing RowType is not supported yet: field ID assignment for nested row fields is not implemented. See https://github.com/apache/paimon/pull/1547" + ); + } + let name = column_name.into(); + let id = self.next_field_id; + self.next_field_id += 1; + self.columns + .push(DataField::new(id, name, data_type).with_description(description)); + self + } + + /// Set partition keys. + pub fn partition_keys(mut self, names: impl IntoIterator>) -> Self { + self.partition_keys = names.into_iter().map(Into::into).collect(); + self + } + + /// Set primary key columns. They must not be nullable. + pub fn primary_key(mut self, names: impl IntoIterator>) -> Self { + self.primary_keys = names.into_iter().map(Into::into).collect(); + self + } + + /// Set table options (merged with existing). + pub fn options(mut self, opts: impl IntoIterator) -> Self { + self.options.extend(opts); + self + } + + /// Set a single option. + pub fn option(mut self, key: impl Into, value: impl Into) -> Self { + self.options.insert(key.into(), value.into()); + self + } + + /// Set table comment. + pub fn comment(mut self, comment: Option) -> Self { + self.comment = comment; + self + } + + /// Build the schema (validates and normalizes). + pub fn build(self) -> crate::Result { + Schema::new( + self.columns, + self.partition_keys, + self.primary_keys, + self.options, + self.comment, + ) + } +} + +impl Default for SchemaBuilder { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use crate::spec::IntType; @@ -170,4 +508,125 @@ mod tests { let escaped_text = escape_single_quotes("text with 'single' quotes"); assert_eq!(escaped_text, "text with ''single'' quotes"); } + + #[test] + fn test_schema_builder_build() { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::with_nullable(true))) + .column("name", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("k", "v") + .comment(Some("table comment".into())) + .build() + .unwrap(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.primary_keys(), &["id"]); + assert_eq!(schema.options().get("k"), Some(&"v".to_string())); + assert_eq!(schema.comment(), Some("table comment")); + let id_field = schema.fields().iter().find(|f| f.name() == "id").unwrap(); + assert!( + !id_field.data_type().is_nullable(), + "primary key column should be normalized to NOT NULL" + ); + } + + #[test] + fn test_schema_validation() { + // Duplicate field names + let res = Schema::builder() + .column("a", DataType::Int(IntType::new())) + .column("b", DataType::Int(IntType::new())) + .column("a", DataType::Int(IntType::new())) + .build(); + assert!(res.is_err(), "duplicate field names should be rejected"); + + // Duplicate partition keys + let res = Schema::builder() + .column("a", DataType::Int(IntType::new())) + .column("b", DataType::Int(IntType::new())) + .partition_keys(["a", "a"]) + .build(); + assert!(res.is_err(), "duplicate partition keys should be rejected"); + + // Partition key not in fields + let res = Schema::builder() + .column("a", DataType::Int(IntType::new())) + .column("b", DataType::Int(IntType::new())) + .partition_keys(["c"]) + .build(); + assert!( + res.is_err(), + "partition key not in columns should be rejected" + ); + + // Duplicate primary keys + let res = Schema::builder() + .column("a", DataType::Int(IntType::with_nullable(false))) + .column("b", DataType::Int(IntType::new())) + .primary_key(["a", "a"]) + .build(); + assert!(res.is_err(), "duplicate primary keys should be rejected"); + + // Primary key not in fields + let res = Schema::builder() + .column("a", DataType::Int(IntType::with_nullable(false))) + .column("b", DataType::Int(IntType::new())) + .primary_key(["c"]) + .build(); + assert!( + res.is_err(), + "primary key not in columns should be rejected" + ); + + // primary-key in options and DDL at same time + let res = Schema::builder() + .column("a", DataType::Int(IntType::with_nullable(false))) + .column("b", DataType::Int(IntType::new())) + .primary_key(["a"]) + .option(PRIMARY_KEY_OPTION, "a") + .build(); + assert!( + res.is_err(), + "primary key defined in both DDL and options should be rejected" + ); + + // partition in options and DDL at same time + let res = Schema::builder() + .column("a", DataType::Int(IntType::new())) + .column("b", DataType::Int(IntType::new())) + .partition_keys(["a"]) + .option(PARTITION_OPTION, "a") + .build(); + assert!( + res.is_err(), + "partition defined in both DDL and options should be rejected" + ); + + // Valid: partition keys and primary key subset of fields + let schema = Schema::builder() + .column("a", DataType::Int(IntType::with_nullable(false))) + .column("b", DataType::Int(IntType::new())) + .column("c", DataType::Int(IntType::new())) + .partition_keys(["a"]) + .primary_key(["a", "b"]) + .build() + .unwrap(); + assert_eq!(schema.partition_keys(), &["a"]); + assert_eq!(schema.primary_keys(), &["a", "b"]); + } + + /// Adding a column whose type is or contains RowType panics (todo! until field ID assignment for nested row fields). + /// See . + #[test] + #[should_panic(expected = "RowType")] + fn test_schema_builder_column_row_type_panics() { + let row_type = RowType::new(vec![DataField::new( + 0, + "nested".into(), + DataType::Int(IntType::new()), + )]); + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("payload", DataType::Row(row_type)); + } } diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs index ed7f4e8..815c7c6 100644 --- a/crates/paimon/src/spec/types.rs +++ b/crates/paimon/src/spec/types.rs @@ -103,9 +103,22 @@ pub enum DataType { Row(RowType), } -#[allow(dead_code)] impl DataType { - fn is_nullable(&self) -> bool { + /// Returns whether this type is or contains (recursively) a [`RowType`]. + /// Used to reject schema columns that would require field ID assignment for nested row fields, + /// which is not yet implemented (see ). + pub fn contains_row_type(&self) -> bool { + match self { + DataType::Row(_) => true, + DataType::Array(v) => v.element_type.contains_row_type(), + DataType::Map(v) => v.key_type.contains_row_type() || v.value_type.contains_row_type(), + DataType::Multiset(v) => v.element_type.contains_row_type(), + _ => false, + } + } + + /// Returns whether this type is nullable. + pub fn is_nullable(&self) -> bool { match self { DataType::Boolean(v) => v.nullable, DataType::TinyInt(v) => v.nullable, @@ -129,6 +142,59 @@ impl DataType { DataType::Row(v) => v.nullable, } } + + /// Returns a copy of this type with the given nullability (top-level only). + /// Corresponds to Java `DataType.copy(boolean nullable)`. + pub fn copy_with_nullable(&self, nullable: bool) -> Result { + Ok(match self { + DataType::Boolean(_) => DataType::Boolean(BooleanType::with_nullable(nullable)), + DataType::TinyInt(_) => DataType::TinyInt(TinyIntType::with_nullable(nullable)), + DataType::SmallInt(_) => DataType::SmallInt(SmallIntType::with_nullable(nullable)), + DataType::Int(_) => DataType::Int(IntType::with_nullable(nullable)), + DataType::BigInt(_) => DataType::BigInt(BigIntType::with_nullable(nullable)), + DataType::Decimal(v) => DataType::Decimal(DecimalType::with_nullable( + nullable, + v.precision(), + v.scale(), + )?), + DataType::Double(_) => DataType::Double(DoubleType::with_nullable(nullable)), + DataType::Float(_) => DataType::Float(FloatType::with_nullable(nullable)), + DataType::Binary(v) => { + DataType::Binary(BinaryType::with_nullable(nullable, v.length())?) + } + DataType::VarBinary(v) => { + DataType::VarBinary(VarBinaryType::try_new(nullable, v.length())?) + } + DataType::Char(v) => DataType::Char(CharType::with_nullable(nullable, v.length())?), + DataType::VarChar(v) => { + DataType::VarChar(VarCharType::with_nullable(nullable, v.length())?) + } + DataType::Date(_) => DataType::Date(DateType::with_nullable(nullable)), + DataType::LocalZonedTimestamp(v) => DataType::LocalZonedTimestamp( + LocalZonedTimestampType::with_nullable(nullable, v.precision())?, + ), + DataType::Time(v) => DataType::Time(TimeType::with_nullable(nullable, v.precision())?), + DataType::Timestamp(v) => { + DataType::Timestamp(TimestampType::with_nullable(nullable, v.precision())?) + } + DataType::Array(v) => DataType::Array(ArrayType::with_nullable( + nullable, + v.element_type.as_ref().clone(), + )), + DataType::Map(v) => DataType::Map(MapType::with_nullable( + nullable, + v.key_type.as_ref().clone(), + v.value_type.as_ref().clone(), + )), + DataType::Multiset(v) => DataType::Multiset(MultisetType::with_nullable( + nullable, + v.element_type.as_ref().clone(), + )), + DataType::Row(v) => { + DataType::Row(RowType::with_nullable(nullable, v.fields().to_vec())) + } + }) + } } /// ArrayType for paimon. @@ -1329,6 +1395,10 @@ impl RowType { pub fn family(&self) -> DataTypeFamily { DataTypeFamily::CONSTRUCTED } + + pub fn fields(&self) -> &[DataField] { + &self.fields + } } mod serde_utils {