Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ string:
description:
Decodes a given string in 'application/x-www-form-urlencoded' format using the UTF-8 encoding scheme.
If the input is NULL, or there is an issue with the decoding process(such as encountering an illegal escape pattern), or the encoding scheme is not supported, the function returns NULL.
- sql: URL_DECODE_RECURSIVE(string[, integer])
table: STRING.urlDecodeRecursive() / STRING.urlDecodeRecursive(INTEGER)
description:
Recursively decodes a given string in 'application/x-www-form-urlencoded' format using the UTF-8 encoding scheme until no further decoding is possible or the optional max depth (default 10) is reached.
If the input is NULL, or there is an issue with the decoding process(such as encountering an illegal escape pattern), or the encoding scheme is not supported, the function returns NULL.
- sql: URL_ENCODE(string)
table: STRING.urlEncode()
description:
Expand Down
7 changes: 6 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,12 @@ string:
- sql: URL_DECODE(string)
table: STRING.urlDecode()
description:
使用 UTF-8 编码方案对“application/x-www-form-urlencoded”格式的给定字符串进行解码。
使用 UTF-8 编码方案对"application/x-www-form-urlencoded"格式的给定字符串进行解码。
如果输入为 NULL,或者解码过程出现问题(例如遇到非法转义模式或者不支持该编码方案),该函数将返回 NULL。
- sql: URL_DECODE_RECURSIVE(string[, integer])
table: STRING.urlDecodeRecursive() / STRING.urlDecodeRecursive(INTEGER)
description:
使用 UTF-8 编码方案对"application/x-www-form-urlencoded"格式的给定字符串进行递归解码,直到无法进一步解码或达到可选的最大深度(默认为 10)为止。
如果输入为 NULL,或者解码过程出现问题(例如遇到非法转义模式或者不支持该编码方案),该函数将返回 NULL。
- sql: URL_ENCODE(string)
table: STRING.urlEncode()
Expand Down
13 changes: 13 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,19 @@ def url_decode(self) -> 'Expression[str]':
"""
return _unary_op("urlDecode")(self)

def url_decode_recursive(self, max_depth: int = None) -> 'Expression[str]':
"""
Recursively decodes a URL-encoded string from 'application/x-www-form-urlencoded'
format using the UTF-8 encoding scheme until no further decoding is possible or the
max depth is reached. If the input is null, or there is an issue with the decoding
process, or the encoding scheme is not supported, returns null.

:param max_depth: the maximum number of decoding iterations (default: 10)
"""
if max_depth is None:
return _unary_op("urlDecodeRecursive")(self)
return _binary_op("urlDecodeRecursive")(self, max_depth)

def url_encode(self) -> 'Expression[str]':
"""
Translates a string into 'application/x-www-form-urlencoded' format using the UTF-8
Expand Down
80 changes: 80 additions & 0 deletions flink-python/pyflink/table/tests/test_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,86 @@ def test_from_element_expression(self):
expected = ['+I[1, abc, 2.0]', '+I[2, def, 3.0]']
self.assert_equals(actual, expected)

def test_url_decode_recursive(self):
t_env = self.t_env

# Create sink table
sink_table_ddl = """
CREATE TABLE Results_url_decode_recursive(
decoded STRING,
decoded_with_depth2 STRING,
decoded_with_depth1 STRING,
decoded_null STRING
)
WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)

# Test URL_DECODE_RECURSIVE with various scenarios
# Scenario 1: Simple URL decode (Hello%20World -> Hello World)
# Scenario 2: Double encoded URL (Hello%2520World -> Hello%20World -> Hello World)
# with maxDepth=2
# Scenario 3: Double encoded URL with maxDepth=1 (should only decode once)
# Scenario 4: NULL input
t = t_env.from_elements([
("Hello%20World", "Hello%2520World", "Hello%2520World", None),
], DataTypes.ROW([
DataTypes.FIELD('a', DataTypes.STRING()),
DataTypes.FIELD('b', DataTypes.STRING()),
DataTypes.FIELD('c', DataTypes.STRING()),
DataTypes.FIELD('d', DataTypes.STRING()),
]))

t.select(
t.a.url_decode_recursive(), # Simple decode
t.b.url_decode_recursive(2), # Double decode with depth=2
t.c.url_decode_recursive(1), # Double decode with depth=1
t.d.url_decode_recursive() # NULL input
).execute_insert("Results_url_decode_recursive").wait()

actual = source_sink_utils.results()

# Expected results:
# - Hello%20World -> Hello World (default maxDepth=10)
# - Hello%2520World -> Hello%20World -> Hello World (maxDepth=2)
# - Hello%2520World -> Hello%20World (maxDepth=1, only one decode)
# - NULL -> NULL
expected = [
'+I[Hello World, Hello World, Hello%20World, null]'
]
self.assert_equals(actual, expected)

# Test with triple encoded URL
sink_table_ddl2 = """
CREATE TABLE Results_url_decode_recursive2(
decoded_default STRING,
decoded_depth3 STRING,
decoded_depth2 STRING
)
WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl2)

# Triple encoded: Hello%252520World
# Depth 3: Hello%252520World -> Hello%2520World -> Hello%20World -> Hello World
# Depth 2: Hello%252520World -> Hello%2520World -> Hello%20World
t2 = t_env.from_elements([
("Hello%252520World", "Hello%252520World"),
], ['e', 'f'])

t2.select(
t2.e.url_decode_recursive(), # Default maxDepth=10, should decode all
t2.e.url_decode_recursive(3), # Depth=3, should decode all
t2.f.url_decode_recursive(2) # Depth=2, should stop at 2 decodes
).execute_insert("Results_url_decode_recursive2").wait()

actual2 = source_sink_utils.results()

expected2 = [
'+I[Hello World, Hello World, Hello%20World]'
]
self.assert_equals(actual2, expected2)


if __name__ == '__main__':
import unittest
Expand Down
4 changes: 4 additions & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def test_expression(self):
self.assertEqual('instr(a, b)', str(expr1.instr(expr2)))
self.assertEqual('locate(a, b)', str(expr1.locate(expr2)))
self.assertEqual('locate(a, b, 2)', str(expr1.locate(expr2, 2)))
self.assertEqual('URL_DECODE(a)', str(expr1.url_decode()))
self.assertEqual('URL_DECODE_RECURSIVE(a)', str(expr1.url_decode_recursive()))
self.assertEqual('URL_DECODE_RECURSIVE(a, 5)', str(expr1.url_decode_recursive(5)))
self.assertEqual('URL_ENCODE(a)', str(expr1.url_encode()))
self.assertEqual('parseUrl(a, b)', str(expr1.parse_url(expr2)))
self.assertEqual("parseUrl(a, b, 'query')", str(expr1.parse_url(expr2, 'query')))
self.assertEqual('ltrim(a)', str(expr1.ltrim()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNHEX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UPPER;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.URL_DECODE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.URL_DECODE_RECURSIVE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.URL_ENCODE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.VAR_POP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.VAR_SAMP;
Expand Down Expand Up @@ -1430,6 +1431,31 @@ public OutType urlDecode() {
return toApiSpecificExpression(unresolvedCall(URL_DECODE, toExpr()));
}

/**
* Recursively decodes a URL-encoded string from 'application/x-www-form-urlencoded' format
* using the UTF-8 encoding scheme until no further decoding is possible or the default max
* depth of 10 is reached. If the input is null, or there is an issue with the decoding
* process(such as encountering an illegal escape pattern), or the encoding scheme is not
* supported, will return null.
*/
public OutType urlDecodeRecursive() {
return toApiSpecificExpression(unresolvedCall(URL_DECODE_RECURSIVE, toExpr()));
}

/**
* Recursively decodes a URL-encoded string from 'application/x-www-form-urlencoded' format
* using the UTF-8 encoding scheme until no further decoding is possible or the specified max
* depth is reached. If the input is null, or there is an issue with the decoding process(such
* as encountering an illegal escape pattern), or the encoding scheme is not supported, will
* return null.
*
* @param maxDepth the maximum number of decoding iterations (must be a positive literal)
*/
public OutType urlDecodeRecursive(InType maxDepth) {
return toApiSpecificExpression(
unresolvedCall(URL_DECODE_RECURSIVE, toExpr(), objectToExpression(maxDepth)));
}

/**
* Translates a string into 'application/x-www-form-urlencoded' format using the UTF-8 encoding
* scheme. If the input is null, or there is an issue with the encoding process, or the encoding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.MaxDepthArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
Expand Down Expand Up @@ -455,6 +456,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.UrlDecodeFunction")
.build();

public static final BuiltInFunctionDefinition URL_DECODE_RECURSIVE =
BuiltInFunctionDefinition.newBuilder()
.name("URL_DECODE_RECURSIVE")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
new MaxDepthArgumentTypeStrategy(false))))
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.UrlDecodeFunction")
.build();

public static final BuiltInFunctionDefinition URL_ENCODE =
BuiltInFunctionDefinition.newBuilder()
.name("URL_ENCODE")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.Optional;

/**
* An {@link ArgumentTypeStrategy} that expects a positive integer literal for the {@code maxDepth}
* parameter of {@code URL_DECODE_RECURSIVE}.
*
* <p>The argument must be a compile-time literal (not a dynamic/runtime value), and its value must
* be strictly positive ({@code > 0}).
*
* <p>This strategy uses {@link Number#intValue()} to handle all integer types uniformly, similar to
* {@link PercentageArgumentTypeStrategy}.
*/
@Internal
public final class MaxDepthArgumentTypeStrategy implements ArgumentTypeStrategy {

private final boolean expectedNullability;

public MaxDepthArgumentTypeStrategy(boolean nullable) {
this.expectedNullability = nullable;
}

@Override
public Optional<DataType> inferArgumentType(
CallContext callContext, int argumentPos, boolean throwOnFailure) {
final LogicalType actualType =
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();

if (!actualType.is(LogicalTypeFamily.INTEGER_NUMERIC)) {
Comment thread
featzhang marked this conversation as resolved.
return callContext.fail(throwOnFailure, "maxDepth must be of INTEGER type.");
}
if (!expectedNullability && actualType.isNullable()) {
return callContext.fail(throwOnFailure, "maxDepth must be of NOT NULL type.");
}

// maxDepth must be a compile-time literal; dynamic (runtime) arguments are not accepted.
if (!callContext.isArgumentLiteral(argumentPos)) {
return callContext.fail(
throwOnFailure,
"maxDepth must be a literal integer, but was a dynamic argument.");
}

// Use Number.class to handle all integer types uniformly
Optional<Number> literalVal = callContext.getArgumentValue(argumentPos, Number.class);

Integer maxDepth = null;
if (literalVal.isPresent()) {
maxDepth = literalVal.get().intValue();
}

if (maxDepth == null || maxDepth <= 0) {
return callContext.fail(
throwOnFailure, "maxDepth must be a positive integer, but was: %s.", maxDepth);
}

// Preserve the actual integer type to avoid type casting issues at planning time
return Optional.of(resolveOutputType(actualType.getTypeRoot(), expectedNullability));
}

@Override
public Signature.Argument getExpectedArgument(
FunctionDefinition functionDefinition, int argumentPos) {
return Signature.Argument.of(expectedNullability ? "<INTEGER>" : "<INTEGER NOT NULL>");
}

/**
* Returns the output {@link DataType} that corresponds to the given integer {@link
* LogicalTypeRoot}, with the requested nullability.
*/
private static DataType resolveOutputType(LogicalTypeRoot root, boolean nullable) {
final DataType base;
switch (root) {
case TINYINT:
base = DataTypes.TINYINT();
break;
case SMALLINT:
base = DataTypes.SMALLINT();
break;
case BIGINT:
base = DataTypes.BIGINT();
break;
case INTEGER:
default:
base = DataTypes.INT();
break;
}
return nullable ? base : base.notNull();
}
}
Loading