Skip to content

Commit

Permalink
[FLINK-16200][table] Support JSON_EXISTS in SQL & Table API
Browse files Browse the repository at this point in the history
This is partially based on the work done by @XuQianJin-Stars in
apache#11186.

supersedes apache#11186
  • Loading branch information
Airblader authored and twalthr committed Aug 12, 2021
1 parent a901173 commit 334082d
Show file tree
Hide file tree
Showing 16 changed files with 462 additions and 4 deletions.
21 changes: 21 additions & 0 deletions docs/content.zh/docs/dev/table/functions/systemFunctions.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ Flink Table API & SQL 为用户提供了一组内置的数据转换函数。本

{{< sql_functions_zh "collection" >}}

### JSON Functions

JSON functions make use of JSON path expressions as described in ISO/IEC TR 19075-6 of the SQL
standard. Their syntax is inspired by and adopts many features of ECMAScript, but is neither a
subset nor superset thereof.

Path expressions come in two flavors, lax and strict. When omitted, it defaults to the strict mode.
Strict mode is intended to examine data from a schema perspective and will throw errors whenever
data does not adhere to the path expression. However, functions like `JSON_VALUE` allow defining
fallback behavior if an error is encountered. Lax mode, on the other hand, is more forgiving and
converts errors to empty sequences.

The special character `$` denotes the root node in a JSON path. Paths can access properties (`$.a`),
array elements (`$.a[0].b`), or branch over all elements in an array (`$.a[*].b`).

Known Limitations:
* Not all features of Lax mode are currently supported correctly. This is an upstream bug
(CALCITE-4717). Non-standard behavior is not guaranteed.

{{< sql_functions "json" >}}

### 值构建函数

{{< sql_functions_zh "valueconstruction" >}}
Expand Down
21 changes: 21 additions & 0 deletions docs/content/docs/dev/table/functions/systemFunctions.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ The scalar functions take zero, one or more values as the input and return a sin

{{< sql_functions "collection" >}}

### JSON Functions

JSON functions make use of JSON path expressions as described in ISO/IEC TR 19075-6 of the SQL
standard. Their syntax is inspired by and adopts many features of ECMAScript, but is neither a
subset nor superset thereof.

Path expressions come in two flavors, lax and strict. When omitted, it defaults to the strict mode.
Strict mode is intended to examine data from a schema perspective and will throw errors whenever
data does not adhere to the path expression. However, functions like `JSON_VALUE` allow defining
fallback behavior if an error is encountered. Lax mode, on the other hand, is more forgiving and
converts errors to empty sequences.

The special character `$` denotes the root node in a JSON path. Paths can access properties (`$.a`),
array elements (`$.a[0].b`), or branch over all elements in an array (`$.a[*].b`).

Known Limitations:
* Not all features of Lax mode are currently supported correctly. This is an upstream bug
(CALCITE-4717). Non-standard behavior is not guaranteed.

{{< sql_functions "json" >}}

### Value Construction Functions

{{< sql_functions "valueconstruction" >}}
Expand Down
25 changes: 25 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,31 @@ collection:
table: MAP.at(ANY)
description: Returns the value specified by key value in map.

json:
- sql: JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ])
table: STRING.jsonExists(STRING path [, JsonExistsOnError onError])
description: |
Determines whether a JSON string satisfies a given path search criterion.
If the error behavior is omitted, `FALSE ON ERROR` is assumed as the default.
```
// TRUE
SELECT JSON_EXISTS('{"a": true}', '$.a');
// FALSE
SELECT JSON_EXISTS('{"a": true}', '$.b');
// TRUE
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}',
'$.a[0].b');
// TRUE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' TRUE ON ERROR);
// FALSE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' FALSE ON ERROR);
```
valueconstruction:
- sql: |
-- implicit constructor with parenthesis
Expand Down
25 changes: 25 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,31 @@ collection:
table: MAP.at(ANY)
description: 返回 map 中指定 key 对应的值。

json:
- sql: JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ])
table: STRING.jsonExists(STRING path [, JsonExistsOnError onError])
description: |
Determines whether a JSON string satisfies a given path search criterion.
If the error behavior is omitted, `FALSE ON ERROR` is assumed as the default.
```
// TRUE
SELECT JSON_EXISTS('{"a": true}', '$.a');
// FALSE
SELECT JSON_EXISTS('{"a": true}', '$.b');
// TRUE
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}',
'$.a[0].b');
// TRUE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' TRUE ON ERROR);
// FALSE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' FALSE ON ERROR);
```
valueconstruction:
- sql: |
-- implicit constructor with parenthesis
Expand Down
41 changes: 40 additions & 1 deletion flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pyflink.table.types import DataType, _to_java_data_type
from pyflink.util.java_utils import to_jarray

__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit', 'JsonExistsOnError']


_aggregation_doc = """
Expand Down Expand Up @@ -352,6 +352,22 @@ def _to_j_time_point_unit(self):
return getattr(JTimePointUnit, self.name)


class JsonExistsOnError(Enum):
"""
Behavior in case of errors for json_exists().
"""

TRUE = 0,
FALSE = 1,
UNKNOWN = 2,
ERROR = 3

def _to_j_json_exists_on_error(self):
gateway = get_gateway()
JJsonExistsOnError = gateway.jvm.org.apache.flink.table.api.JsonExistsOnError
return getattr(JJsonExistsOnError, self.name)


T = TypeVar('T')


Expand Down Expand Up @@ -1354,6 +1370,29 @@ def sha2(self, hash_length: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
return _binary_op("sha2")(self, hash_length)

# ---------------------------- JSON functions -----------------------------

def json_exists(self, path: str, on_error: JsonExistsOnError = None) -> 'Expression[bool]':
"""
Determines whether a JSON string satisfies a given search criterion.
This follows the ISO/IEC TR 19075-6 specification for JSON support in SQL.
Example:
::
>>> lit('{"a": true}').json_exists('$.a') // true
>>> lit('{"a": true}').json_exists('$.b') // false
>>> lit('{"a": [{ "b": 1 }]}').json_exists('$.a[0].b') // true
>>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.TRUE) // true
>>> lit('{"a": true}').json_exists('strict $.b', JsonExistsOnError.FALSE) // false
"""
if on_error is None:
return _binary_op("jsonExists")(self, path)
else:
return _ternary_op("jsonExists")(self, path, on_error._to_j_json_exists_on_error())


# add the docs
_make_math_log_doc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.JsonExistsOnError;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
Expand Down Expand Up @@ -85,6 +86,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT_TRUE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NULL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LIKE;
Expand Down Expand Up @@ -1261,4 +1263,64 @@ public OutType sha2(InType hashLength) {
return toApiSpecificExpression(
unresolvedCall(SHA2, toExpr(), objectToExpression(hashLength)));
}

// JSON functions

/**
* Returns whether a JSON string satisfies a given search criterion.
*
* <p>This follows the ISO/IEC TR 19075-6 specification for JSON support in SQL.
*
* <p>Examples:
*
* <pre>{@code
* // true
* lit("{\"a\": true}").jsonExists("$.a")
* // false
* lit("{\"a\": true}").jsonExists("$.b")
* // true
* lit("{\"a\": [{ \"b\": 1 }]}").jsonExists("$.a[0].b")
*
* // true
* lit("{\"a\": true}").jsonExists("strict $.b", JsonExistsOnError.TRUE)
* // false
* lit("{\"a\": true}").jsonExists("strict $.b", JsonExistsOnError.FALSE)
* }</pre>
*
* @param path JSON path to search for.
* @param onError Behavior in case of an error.
* @return {@code true} if the JSON string satisfies the search criterion.
*/
public OutType jsonExists(String path, JsonExistsOnError onError) {
return toApiSpecificExpression(
unresolvedCall(JSON_EXISTS, toExpr(), valueLiteral(path), valueLiteral(onError)));
}

/**
* Determines whether a JSON string satisfies a given search criterion.
*
* <p>This follows the ISO/IEC TR 19075-6 specification for JSON support in SQL.
*
* <p>Examples:
*
* <pre>{@code
* // true
* lit("{\"a\": true}").jsonExists("$.a")
* // false
* lit("{\"a\": true}").jsonExists("$.b")
* // true
* lit("{\"a\": [{ \"b\": 1 }]}").jsonExists("$.a[0].b")
*
* // true
* lit("{\"a\": true}").jsonExists("strict $.b", JsonExistsOnError.TRUE)
* // false
* lit("{\"a\": true}").jsonExists("strict $.b", JsonExistsOnError.FALSE)
* }</pre>
*
* @param path JSON path to search for.
* @return {@code true} if the JSON string satisfies the search criterion.
*/
public OutType jsonExists(String path) {
return toApiSpecificExpression(unresolvedCall(JSON_EXISTS, toExpr(), valueLiteral(path)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.expressions.TableSymbol;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;

/** Behavior in case of errors for {@link BuiltInFunctionDefinitions#JSON_EXISTS}. */
@PublicEvolving
public enum JsonExistsOnError implements TableSymbol {
TRUE,
FALSE,
UNKNOWN,
ERROR
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,31 @@ public Builder notDeterministic() {
return this;
}

/**
* Specifies that this {@link BuiltInFunctionDefinition} is implemented during code
* generation.
*/
public Builder runtimeProvided() {
this.isRuntimeProvided = true;
return this;
}

/** Specifies the runtime class implementing this {@link BuiltInFunctionDefinition}. */
public Builder runtimeClass(String runtimeClass) {
this.runtimeClass = runtimeClass;
return this;
}

/**
* Specifies that this {@link BuiltInFunctionDefinition} will be mapped to a Calcite
* function.
*/
public Builder runtimeDeferred() {
// This method is just a marker method for clarity. It is equivalent to calling
// neither {@link #runtimeProvided} nor {@link #runtimeClass}.
return this;
}

public BuiltInFunctionDefinition build() {
return new BuiltInFunctionDefinition(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.JsonExistsOnError;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
Expand Down Expand Up @@ -55,6 +56,7 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.wildcardWithCount;
import static org.apache.flink.table.types.inference.TypeStrategies.COMMON;
Expand Down Expand Up @@ -1477,6 +1479,31 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(TypeStrategies.MISSING)
.build();

// --------------------------------------------------------------------------------------------
// JSON functions
// --------------------------------------------------------------------------------------------

public static final BuiltInFunctionDefinition JSON_EXISTS =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_EXISTS")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
and(
logical(LogicalTypeFamily.CHARACTER_STRING),
LITERAL)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
and(
logical(LogicalTypeFamily.CHARACTER_STRING),
LITERAL),
symbol(JsonExistsOnError.class))))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().nullable()))
.runtimeDeferred()
.build();

// --------------------------------------------------------------------------------------------
// Other functions
// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 334082d

Please sign in to comment.