Skip to content

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Jan 27, 2026

What changes were proposed in this pull request?

Let _create_batch and _create_array in PySpark's Pandas serializers to use Spark's DataType as the single source of truth, deriving Arrow types internally when needed.

Before: Callers in worker.py pre-computed arrow_return_type = to_arrow_type(return_type, ...) and passed both arrow_type and spark_type through the serialization pipeline.

After: Callers pass only spark_type (Spark DataType). The serializers derive arrow_type internally via to_arrow_type().

Key changes:

  • ~15 Pandas-based wrapper functions in worker.py updated to yield return_type instead of arrow_return_type
  • Arrow UDF functions (which use ArrowStreamArrowUDFSerializer) unchanged - they still pass arrow_type directly

Why are the changes needed?

  1. Single source of truth: spark_type is the canonical type representation defined by users
  2. Simplified API: Callers no longer need to pre-compute arrow_type
  3. Consistency: Both _create_batch and _create_array now follow the same pattern

Does this PR introduce any user-facing change?

No. This is an internal refactoring with no user-facing API changes.

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions
Copy link

JIRA Issue Information

=== Improvement SPARK-55224 ===
Summary: Use Spark DataType as ground truth in Pandas-Arrow serialization
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

is_struct_type = (
spark_type is not None
and isinstance(spark_type, StructType)
and not isinstance(spark_type, VariantType)
Copy link
Contributor

@zhengruifeng zhengruifeng Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed, StructType can not be a VariantType

we have

                and pa.types.is_struct(arrow_type)
                and not is_variant(arrow_type)

because VariantType in arrow is represented by a pa.struct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! thanks for the explanation! will change this.

spark_type : DataType, optional
If None, spark type converted from arrow_type will be used
arrow_cast: bool, optional
The Spark type to use. If None, pyarrow's inferred type will be used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, the spark type here is the return type?
I think it should never be None?

df: "pd.DataFrame",
arrow_struct_type: "pa.StructType",
spark_type: Optional[StructType] = None,
spark_type: StructType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename return type spark_type to return_type to be consistent with worker.py?

We have input_type in this file for input schema, spark_type is kind of confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 434 to 438
def to_output_format(result):
# Convert Series of dicts/Rows to DataFrame for struct types
if isinstance(return_type, StructType):
return pd.DataFrame(list(result))
return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def to_output_format(result):
# Convert Series of dicts/Rows to DataFrame for struct types
if isinstance(return_type, StructType):
return pd.DataFrame(list(result))
return result
if isinstance(return_type, StructType):
# Convert Series of dicts/Rows to DataFrame for struct types
def to_output_format(result):
return pd.DataFrame(list(result))
else:
def to_output_format(result):
return result

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants