[FLINK-35515][hive] Add Hive 4 support to the Flink Hive connector#37
[FLINK-35515][hive] Add Hive 4 support to the Flink Hive connector#37jlalwani-amazon wants to merge 6 commits into
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
|
FYI, CI fails cause it tries to download Flink 1.20.0 binary which is not available anymore. Bump Flink version to 1.20.3 to fix. I'll try to review the changes in the upcoming days. |
|
Thanks @ferenc-csaky! Bumped to 1.20.3 in 0af4f50. Looking forward to your review. |
7f17c7f to
07e2969
Compare
3f5b1f8 to
d7507d0
Compare
Hive 4 renamed many ConfVars enum constants (e.g. HIVEMAPREDMODE -> HIVE_MAPRED_MODE). Add a HiveConfVars compatibility layer that resolves constants by name at runtime, allowing the same code to work across Hive 2.x, 3.x, and 4.x. Update all source and test files to use HiveConfVars instead of HiveConf.ConfVars for constants that were renamed in Hive 4.
Add flink-connector-hive-4.0.0 Maven module with HiveShimV400 compiled against Hive 4 jars. This avoids reflection for Hive 4 API changes: - Removed classes (SerDeUtils, GenericUDFNvl) - Changed return types (ArrayList -> List on StructTypeInfo) - Relocated packages (PrincipalDesc, CreateTableDesc, CreateViewDesc) - Added required parameters (engine param on column statistics) - PreOrderWalker now requires SemanticDispatcher instead of Dispatcher HiveShim interface additions: - getStructFieldNames/getStructFieldTypeInfos (ArrayList -> List) - initializeSerDe (SerDeUtils removed in Hive 4) - getTableColumnStatistics/getPartitionColumnStatistics (engine param) - createPrincipalDesc (PrincipalDesc relocated) - walkExpressionTree (SemanticDispatcher vs Dispatcher) HiveShimLoader updated to load HiveShimV400 by reflection. Runtime-scoped Maven dependency replaces surefire classpath hack.
Update source files for Hive 4 API changes: - HiveParserRuleDispatcher: adapter for DefaultRuleDispatcher renamed to SemanticNodeProcessor in Hive 4 - HiveParserDDLSemanticAnalyzer: use shim walkExpressionTree, reflection for relocated CreateTableDesc/CreateViewDesc - HiveParserSemanticAnalyzer: CreateViewDesc -> Object for relocation - HiveParserAuthorizationParseUtils: PrincipalDesc via shim - HiveParserRexNodeConverter: GenericUDFCase string check (removed 4.1+) - HiveParserCalcitePlanner/Utils/BaseSemanticAnalyzer: hiveShim plumbing - HiveMapredSplitReader/HiveWriterFactory: use shim for SerDeUtils - HiveMetastoreClientWrapper: use shim for column statistics - HiveReflectionUtils: StatsSetupConst.TABLE_PARAMS_STATS_KEYS reflection - HiveTypeUtil/HiveInspectors: getStructFieldNames/TypeInfos via shim - HiveScriptTransformOperator: GenericUDFNvl string check
Add hive4 Maven profile with test dependency overrides: - protobuf 3.x (Hive 4 bundles unshaded protobuf 3) - DataNucleus 5.x (Hive 4 embedded metastore) - Hadoop 3.x hdfs jars - flink-connector-hive-4.0.0 runtime dependency Add --add-opens JVM flags for Java 17+ (Hive reflective access). Update test infrastructure: - HiveVersionTestUtil: add HIVE_4 version flag - HiveGenericUDFTest: skip GenericUDFNvl test on Hive 4 (class removed) - HiveModuleTest: adjust function count for Hive 4
Add integration test compatibility for Hive 4:
- Add HiveShim.getGenericWindowingEvaluator() for Hive 4's 5-arg API
- Add HiveShimV400.loadTable() for Hive 4's 11-arg Hive.loadTable()
- Add HiveShimV400 unit tests (getGenericWindowingEvaluator, initializeSerDe)
- Add default fetch size to HiveServer2Endpoint OpenSession response
(Hive 4 JDBC client requires hive.server2.thrift.resultset.default.fetch.size)
- Add Hive 4 versions to HiveRunnerShimLoader switch statement
- Fix HiveTemporalJoinITCase hardcoded Hive 3.1.3 version override
- Adjust test expectations for Hive 4 behavioral changes:
- testInsertDirectory: collection delimiter defaults to \u0002
- Skip testNullLiteralAsArgument: numeric-to-timestamp cast removed
- Skip testLoadData: OrcShim doesn't support Hive 4 yet
- Add hive4 Maven profile test dependency overrides (antlr-runtime 3.5.2)
- Fix checkstyle import ordering across modified files
- Fix hive4 profile artifactId (add _${scala.binary.version} suffix)
bf9b825 to
b56cc94
Compare
|
Note on the separate Maven module and cyclic dependency: Previous shims (V230–V313) compile against Hive 2.3 because API differences between 2.3 and 3.1 were minor. Hive 4 has far more breaking changes (removed classes, changed return types, relocated packages), making it impossible to compile against Hive 2.3 jars. The dedicated This creates a Maven cycle: the shim depends on the main connector (for the Considered alternative: Extract |
cac2a63 to
20bf42d
Compare
- Break cyclic Maven dependency: shim module included via activeByDefault profile, excluded when -Phive4 is active - Two-phase CI build: Phase 1 installs shim (no -Phive4), Phase 2 runs tests with -Phive4 -Dmaven.main.skip=true -DforkCount=1 - Skip enforcer on shim module (all deps provided-scope) - Hive 4 CI runs on JDK 11 only (Hive 4 requires Java 11+) - Align shim module dependency artifactId with renamed connector module - Add -am to Phase 1 shim install for transitive dependency resolution
20bf42d to
b0ac9fd
Compare
|
|
||
| @Override | ||
| public Object createPrincipalDesc(String principalName, PrincipalType principalType) { | ||
| return new org.apache.hadoop.hive.ql.plan.PrincipalDesc(principalName, principalType); |
There was a problem hiding this comment.
was it generated with claude?
why do we need absolute path here?
There was a problem hiding this comment.
Hive 3 and Hive 2 had PrincipalDesc in org.apache.hadoop.hive.ql.plan.Hive 4 moved it to org.apache.hadoop.hive.ql.ddl.privilege . I put the FQN here to make it clear that this is returning the Hive 2/3 class not the Hive 4 class. I can change it to import at the top. I thought this would make then code more clear.
Also, I realized that I'm not doing the same thing in Hive4 shim. It is inconsistent. I'll move the class to import in all shims
| <!-- Hive uses illegal reflective access (StringInternUtils, FunctionRegistry); | ||
| required on Java 17+ until upstream Hive removes setAccessible calls. --> |
There was a problem hiding this comment.
can you explain what this comment really means?
There was a problem hiding this comment.
Sorry this comment was left over some refactoring I did. It doesn't make sense here. I'll remove it
| <!-- | ||
| hive-exec 4.x bundles protobuf 3.x (unshaded). Override the transitive | ||
| protobuf 2.5.0 from Hadoop so the bundled ORC/protobuf code works on | ||
| the flat test classpath. The production fat jar handles this via shading. | ||
| --> | ||
| <dependency> | ||
| <groupId>com.google.protobuf</groupId> | ||
| <artifactId>protobuf-java</artifactId> | ||
| <version>3.3.0</version> | ||
| <scope>test</scope> |
There was a problem hiding this comment.
can we have a test in a special test module where al the artifacts are shaded then?
There was a problem hiding this comment.
flink-connector-hive-e2e-tests tests with shaded jars. I'll double check if we are testing with Hive 4 shaded jars and add tests if neccesary
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-hdfs-client</artifactId> | ||
| <version>3.3.6</version> |
There was a problem hiding this comment.
- why not 3.4.3+?
- can it be extracted into a property?
There was a problem hiding this comment.
Hive 4.0.0 uses 3.3.6. I didn't want to introduce incompatibility. I'll extract it to property
| hive-exec 4.x bundles ANTLR 3.5.2 unshaded. The default classpath has | ||
| antlr-runtime 3.3 which is binary-incompatible (CommonTree.insertChild). | ||
| Override to 3.5.2 so the standalone jar and bundled classes agree. | ||
| --> | ||
| <dependency> | ||
| <groupId>org.antlr</groupId> | ||
| <artifactId>antlr-runtime</artifactId> | ||
| <version>3.5.2</version> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
we need e2e test for that then
| <scope>test</scope> | ||
| </dependency> | ||
| <!-- Hive 4 requires compatible DataNucleus for the embedded metastore. | ||
| 4.0.x uses 5.x, 4.1+ uses 6.x. Override via -Dhive4.datanucleus-api-jdo.version=6.0.3 etc. --> |
There was a problem hiding this comment.
why do we need this comment?
isn't it clear from property declaration?
who will remember to update/check that this comment is updated next time?
| sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()); | ||
| sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name()); | ||
| sessionConfig.put(TABLE_DML_SYNC.key(), "true"); | ||
| // Hive 4 JDBC client requires this in the OpenSession response |
There was a problem hiding this comment.
can we put a link in comment to the requirement?
| private int numSubQueryPredicates; | ||
|
|
||
| private CreateTableDesc createTableDesc; | ||
| private Object createTableDesc; |
There was a problem hiding this comment.
why do we need this change?
There was a problem hiding this comment.
This is similar to PrincipalDesc change in HiveShim. Hive4 moved the class, I'm not sure where this is being set, but I'm making an assumption that whoever sets it will put Hive4's CreateTableDesc. So, this property needs to be able to accept both classes
There was a problem hiding this comment.
Hive4 moved the class,
how about having a separate hive4 module then where we can work with hive4 classes without such hacks?
Similar to how it is done for elastic and opensearch connectors
There was a problem hiding this comment.
That's the idea behind HiveSHim400 being in a seperate module. I can refactor the Semantic analyzer to move parts of it to other module. I don't want to duplicate the entire code. Can we do this refactoring is a follow up PR, to avoid holding up this PR. The build also needs refactoring
There was a problem hiding this comment.
in case there is a way to make it without changing type to Object then yes, otherwise I disagree with current approach, we can also ask what other people from the community think about this
There was a problem hiding this comment.
Community engagement is really low. I have created a Discussion thread on mailing list about Hive4 support, and pinged multiple times. I didn't get any engagement, until I went ahead and implemented it. Are there any specific members that I can pull into a discussion?
Otherwise, I will just go ahead and refactor SemanticAnalyzer
| import org.apache.calcite.plan.RelOptCluster; | ||
| import org.apache.calcite.tools.FrameworkConfig; | ||
| import org.apache.hadoop.hive.common.ObjectPair; | ||
| import org.apache.commons.lang3.tuple.MutablePair; |
There was a problem hiding this comment.
ObjectPair is mutable. MutablePair is the closest match.
We can use ImmutablePair. It would require larger change. All instances of MutablePair are local. There shouldn't be any concurrency concerns with use of MutablePair
There was a problem hiding this comment.
is there a place where we use/need this mutability?
There was a problem hiding this comment.
HiveParserCalcitePlanner.java, HiveParserBaseSemanticAnalyzer.java and HiveParserQBSubQuery.java use this. Out of these HiveParserCalcitePlanner and HiveParserBaseSemanticAnalyzer are mutating it. We can ofcourse refactor it to clone ImmutablePair.
| if (qb.getTableDesc() != null | ||
| && qb.getTableDesc().getLocation() != null) { | ||
| location = new Path(qb.getTableDesc().getLocation()); | ||
| // Note: getTableDesc() returns Object (Hive 4 relocated |
There was a problem hiding this comment.
well it returns not Object, it returns TableDesc
It started returning Object in this PR and I still don't understand why
There was a problem hiding this comment.
I'll add more comments. Hive 4 moved the class. So, we have changed the code to put this in an object and use reflection to call methods
There was a problem hiding this comment.
Alternatively, we can implement shims for PrincipalDesc and CreateTableDesc. That should make this code more understandable but will add another level of indirection
| fTypes, | ||
| HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()) | ||
| .getStructFieldNames(structType)); | ||
| } |
There was a problem hiding this comment.
do I understand it correctly: we load Hive shim multiple times?
what is the reason for that?
There was a problem hiding this comment.
loadHiveShim caches the instance of shim. It's not loaded multiple times
There was a problem hiding this comment.
good if so
however is there a reason we can not store in a local var and reuse the code?
There was a problem hiding this comment.
loadHiveShim stores shims in a map, and calls computeIfAbsent. THis should load the class only once
There was a problem hiding this comment.
yes, thanks for confirmation
but how about this
however is there a reason we can not store in a local var and reuse the code?
can we just reuse?
There was a problem hiding this comment.
"good if so however is there a reason we can not store in a local var and reuse the code?"
No reason. I'll change it.
| HiveParserBaseSemanticAnalyzer.getUnescapedName( | ||
| (HiveParserASTNode) ast.getChild(0)); | ||
| PrincipalDesc principalDesc = | ||
| Object principalDesc = |
There was a problem hiding this comment.
another strange non understandable change from my side
There was a problem hiding this comment.
I'll add more comments about class name changes.
| private static boolean isGenericUDFCase(Object udf) { | ||
| return udf.getClass() | ||
| .getName() | ||
| .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase"); |
There was a problem hiding this comment.
I didn't get this: initially there was a check for instanceof why are we checking for class name equals now?
There was a problem hiding this comment.
The challenge is that this class is removed from Hive 4. So, when you are running it with Hive4, the class fails to load if you are doing a static import of GenericUDFCase. We could move this code into the shim to avoid this
What is the purpose of the change
Add Apache Hive 4.x compatibility to the Flink Hive connector. Hive 4 is a major cleanup release with significant API changes (removed classes, changed return types, relocated packages, added required parameters). This PR introduces a dedicated
HiveShimV400module and extends the existingHiveShimabstraction to handle all version-specific differences.JIRA: FLINK-35515
Brief change log
Commit 1: Add HiveConfVars for Hive 4 ConfVars enum renames
Hive 4 renamed all
HiveConf.ConfVarsenum constants to snake_case (HIVE-27925, e.g.METASTOREURIS→METASTORE_URIS). CreatedHiveConfVars.javathat resolves the correct enum at class-load time viavalueOf()with fallback. 59 constants across 35 files.Commit 2: Add Hive 4 shim module
New
flink-connector-hive-4.0.0Maven module compiled against Hive 4 jars.HiveShimV400extendsHiveShimV313and overrides methods for Hive 4 API changes.HiveShimLoaderloads it by reflection to avoid cyclic Maven dependencies. Supports Hive 4.0.0, 4.0.1, 4.1.0, 4.2.0.Commit 3: Add Hive 4 source compatibility fixes
21 files changed for Hive 4 API compatibility:
HiveFileFormatUtils.getRecordWriter()removed →HiveShimV400.getHiveRecordWriter()StructTypeInfo.getAllStructFieldNames/TypeInfos()return typeArrayList→List→HiveShim.getStructFieldNames/TypeInfos()SerDeUtils.initializeSerDe()removed →HiveShimV400.initializeSerDe()IMetaStoreClient.getTableColumnStatistics/getPartitionColumnStatisticsadded engine parameterPrincipalDescrelocated fromql.plantoql.ddl.privilege→HiveShim.createPrincipalDesc()PreOrderWalkerchanged to requireSemanticDispatcher→HiveShim.walkExpressionTree()HiveMetaStoreUtils.getDeserializer/getFieldsFromDeserializersignature changesRowSchema.getSignature()return typeArrayList→ListCommit 4: Add unit test support for Hive 4
hive4Maven profile with test dependency overrides: protobuf 3.3.0 (hive-exec bundles it unshaded), DataNucleus 5.x, Hadoop 3.3.6, antlr-runtime 3.5.2,--add-opensJVM flags.Commit 5: Add integration test support and fixes for Hive 4
HiveShim.getGenericWindowingEvaluator()for Hive 4's 5-arg APIHiveShimV400.loadTable()for Hive 4's 11-argHive.loadTable()HiveServer2Endpointincludeshive.server2.thrift.resultset.default.fetch.sizein OpenSession responseHiveRunnerShimLoaderextended with Hive 4 version casesHiveShimV400Testunit testsCommit 6: Add hive4 CI profile and fix cyclic dependency
activeByDefaultprofile, excluded when-Phive4is active-Phive4), Phase 2 runs tests with-Phive4 -Dmaven.main.skip=true -DforkCount=1-amto Phase 1 shim install for transitive dependency resolutionWhy a separate Maven module?
Previous shims (V310–V313) compile against Hive 2.3 because API changes between 2.3 and 3.1 were minor. Hive 4 has far more breaking changes — removed classes, changed return types, relocated packages. Without a separate module, every API call requires reflection. The dedicated module compiles
HiveShimV400against Hive 4 jars so all calls are direct.Verifying this change
Build
Unit tests
Integration tests
Known limitations
flink-orcdoes not support Hive 4 yet.OrcShim.createShim()inflink-formats/flink-orc(upstreamapache/flinkrepo) throwsUnsupportedOperationExceptionfor Hive 4. ORC batch writes work (useOrcBulkWriterFactorydirectly), but ORC reads viaHiveInputFormatfail. Fix: addhiveVersion.startsWith("4.")mapping toOrcShimV230— needs a separate PR toapache/flink.GenericUDFTimestamprejects numeric input entirely. Tests relying on this behavior are skipped on Hive 4.INSERT OVERWRITE DIRECTORYignoresCOLLECTION ITEMS TERMINATED BY. Uses default\u0002delimiter regardless. Test expectations adjusted.HiveServer2EndpointITCaseexcluded from CI. This test is flaky on resource-constrained runners (FLINK-31351).Does this pull request potentially affect one of the following parts?
flink-connector-hive-4.0.0module with Hive 4 compile dependency;hive4test profile adds protobuf, DataNucleus, Hadoop overrides)@Public(Evolving): noDocumentation