Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, B
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.EncodeTypeOps
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -71,6 +72,9 @@ object RowEncoder extends DataTypeErrorsBase {

private[sql] def encoderForDataType(dataType: DataType, lenient: Boolean): AgnosticEncoder[_] =
dataType match {
// Types Framework: delegate to EncodeTypeOps for supported types when enabled
case _ if SqlApiConf.get.typesFrameworkEnabled && EncodeTypeOps.supports(dataType) =>
EncodeTypeOps(dataType).getEncoder
case NullType => NullEncoder
case BooleanType => BoxedBooleanEncoder
case ByteType => BoxedByteEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[sql] trait SqlApiConf {
def parserDfaCacheFlushRatio: Double
def legacyParameterSubstitutionConstantsOnly: Boolean
def legacyIdentifierClauseOnly: Boolean
def typesFrameworkEnabled: Boolean
}

private[sql] object SqlApiConf {
Expand Down Expand Up @@ -110,4 +111,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def parserDfaCacheFlushRatio: Double = -1.0
override def legacyParameterSubstitutionConstantsOnly: Boolean = false
override def legacyIdentifierClauseOnly: Boolean = false
override def typesFrameworkEnabled: Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.types.DataType

/**
* Operations for row encoding and decoding.
*
* PURPOSE:
* Provides the encoder needed for Dataset[T] operations. The encoder handles
* serialization and deserialization of user objects to/from internal rows.
*
* USAGE CONTEXT:
* Used by:
* - RowEncoder.scala - creates encoders for schema fields
* - EncoderUtils.scala - encoder utility functions
* - Spark Connect - client-side encoding
* - Dataset[T] API - all typed dataset operations
*
* @see TimeTypeApiOps for reference implementation
* @since 4.1.0
*/
trait EncodeTypeOps extends TypeApiOps {
/**
* Returns the AgnosticEncoder for this type.
*
* The encoder handles serialization (external -> internal) and deserialization
* (internal -> external) for Dataset[T] operations.
*
* @return AgnosticEncoder instance (e.g., LocalTimeEncoder for TimeType)
* @example TimeType -> LocalTimeEncoder (handles java.time.LocalTime)
* @example DateType -> LocalDateEncoder or DateEncoder (depending on config)
*/
def getEncoder: AgnosticEncoder[_]
}

/**
* Companion object providing factory methods for EncodeTypeOps.
*/
object EncodeTypeOps {
/**
* Creates an EncodeTypeOps instance for the given DataType.
*
* @param dt The DataType to get encoding operations for
* @return EncodeTypeOps instance
* @throws SparkException if the type doesn't support EncodeTypeOps
*/
def apply(dt: DataType): EncodeTypeOps = TypeApiOps(dt).asInstanceOf[EncodeTypeOps]

/**
* Checks if a DataType supports EncodeTypeOps operations.
*
* @param dt The DataType to check
* @return true if the type supports EncodeTypeOps
*/
def supports(dt: DataType): Boolean = TypeApiOps.supports(dt)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

/**
* Operations for formatting values as strings.
*
* PURPOSE:
* Handles string formatting for display and SQL output. This includes formatting
* values for CAST to STRING, EXPLAIN output, SHOW commands, and SQL literals.
*
* USAGE CONTEXT:
* Used by:
* - ToStringBase.scala - CAST(x AS STRING) expressions
* - EXPLAIN output - displaying literal values
* - SHOW commands - displaying column values
* - SQL generation - creating SQL literal representations
* - HiveResult.scala - formatting results for Thrift/JDBC
*
* @see TimeTypeApiOps for reference implementation
* @since 4.1.0
*/
trait FormatTypeOps extends TypeApiOps {
/**
* Formats an internal value as a display string.
*
* This method converts the internal Catalyst representation to a human-readable
* string suitable for display or CAST to STRING operations.
*
* @param v The internal value (e.g., Long nanoseconds for TimeType)
* @return Formatted string (e.g., "10:30:45.123456")
* @example 37800000000000L -> "10:30:00" (for TIME)
*/
def format(v: Any): String

/**
* Formats an internal value as a UTF8String.
*
* Convenience method that wraps format() for use in expressions that
* need UTF8String output directly.
*
* @param v The internal value
* @return UTF8String representation
*/
def formatUTF8(v: Any): UTF8String = UTF8String.fromString(format(v))

/**
* Formats an internal value as a SQL literal string.
*
* This method produces a string that can be used in SQL statements,
* including the type prefix if appropriate (e.g., "TIME '10:30:00'").
*
* @param v The internal value
* @return SQL literal string (e.g., "TIME '10:30:00'")
* @example 37800000000000L -> "TIME '10:30:00'" (for TIME)
*/
def toSQLValue(v: Any): String
}

/**
* Companion object providing factory methods for FormatTypeOps.
*/
object FormatTypeOps {
/**
* Creates a FormatTypeOps instance for the given DataType.
*
* @param dt The DataType to get formatting operations for
* @return FormatTypeOps instance
* @throws SparkException if the type doesn't support FormatTypeOps
*/
def apply(dt: DataType): FormatTypeOps = TypeApiOps(dt).asInstanceOf[FormatTypeOps]

/**
* Checks if a DataType supports FormatTypeOps operations.
*
* @param dt The DataType to check
* @return true if the type supports FormatTypeOps
*/
def supports(dt: DataType): Boolean = TypeApiOps.supports(dt)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.LocalTimeEncoder
import org.apache.spark.sql.catalyst.util.{FractionTimeFormatter, TimeFormatter}
import org.apache.spark.sql.types.{DataType, TimeType}

/**
* Client-side (spark-api) operations for TimeType.
*
* This class provides all client-side operations for TIME type including:
* - String formatting (FormatTypeOps)
* - Row encoding/decoding (EncodeTypeOps)
*
* IMPLEMENTATION NOTES:
* - Uses FractionTimeFormatter for consistent formatting with ToStringBase
* - Uses LocalTimeEncoder for Dataset[T] operations with java.time.LocalTime
* - SQL literals use the format: TIME 'HH:mm:ss.ffffff'
*
* RELATIONSHIP TO TimeTypeOps:
* TimeTypeOps (in catalyst package) extends this class to inherit client-side
* operations while adding server-side operations (physical type, literals, etc.).
*
* @param t The TimeType with precision information
* @since 4.1.0
*/
class TimeTypeApiOps(val t: TimeType)
extends TypeApiOps
with FormatTypeOps
with EncodeTypeOps {

override def dataType: DataType = t

// ==================== FormatTypeOps ====================

/**
* Formatter for TIME values.
*
* Uses FractionTimeFormatter which:
* - Formats times as HH:mm:ss with fractional seconds
* - Does not output trailing zeros in the fraction
* - Example: "15:00:01.123400" formats as "15:00:01.1234"
*/
@transient
private lazy val timeFormatter: TimeFormatter = new FractionTimeFormatter()

/**
* Formats a TIME value (nanoseconds since midnight) as a display string.
*
* @param v Long nanoseconds since midnight
* @return Formatted string (e.g., "10:30:45.123456")
*/
override def format(v: Any): String = {
timeFormatter.format(v.asInstanceOf[Long])
}

/**
* Formats a TIME value as a SQL literal.
*
* @param v Long nanoseconds since midnight
* @return SQL literal (e.g., "TIME '10:30:45.123456'")
*/
override def toSQLValue(v: Any): String = {
s"TIME '${format(v)}'"
}

// ==================== EncodeTypeOps ====================

/**
* Returns the encoder for java.time.LocalTime.
*
* LocalTimeEncoder handles serialization/deserialization between
* LocalTime objects and internal Long nanosecond representation.
*/
override def getEncoder: AgnosticEncoder[_] = LocalTimeEncoder
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.SparkException
import org.apache.spark.sql.types.{DataType, TimeType}

/**
* Base trait for client-side (spark-api) type operations.
*
* PURPOSE:
* TypeApiOps handles operations that require spark-api internals (e.g., AgnosticEncoder)
* that are not available in the catalyst package. This separation prevents circular
* dependencies between sql/api and sql/catalyst modules.
*
* USAGE:
* TypeApiOps is used for:
* - Row encoding/decoding (EncodeTypeOps)
* - String formatting (FormatTypeOps)
*
* RELATIONSHIP TO TypeOps:
* - TypeOps (catalyst): Server-side operations - physical types, literals, conversions
* - TypeApiOps (spark-api): Client-side operations - encoding, formatting
*
* For TimeType, TimeTypeOps extends TimeTypeApiOps to inherit both sets of operations.
*
* @see TimeTypeApiOps for a reference implementation
* @since 4.1.0
*/
trait TypeApiOps extends Serializable {
/** The DataType this Ops instance handles */
def dataType: DataType
}

/**
* Factory object for creating TypeApiOps instances.
*/
object TypeApiOps {
/**
* Creates a TypeApiOps instance for the given DataType.
*
* @param dt The DataType to get operations for
* @return TypeApiOps instance for the type
* @throws SparkException if no TypeApiOps implementation exists for the type
*/
def apply(dt: DataType): TypeApiOps = dt match {
case tt: TimeType => new TimeTypeApiOps(tt)
// Future types will be added here
case _ => throw SparkException.internalError(
s"No TypeApiOps implementation for ${dt.typeName}. " +
"This type is not yet supported by the Types Framework.")
}

/**
* Checks if a DataType is supported by the Types Framework (client-side).
*
* @param dt The DataType to check
* @return true if the type is supported by the framework
*/
def supports(dt: DataType): Boolean = dt match {
case _: TimeType => true
// Future types will be added here
case _ => false
}
}
Loading