Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap timestamps in ROWTIME expressions with STRINGTOTIMESTAMP #3160

Merged
merged 5 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,17 @@ Example:
WHERE ROWTIME >= 1510923225000
AND ROWTIME <= 1510923228000;

When writing logical expressions using `ROWTIME`, ISO-8061 formatted datestrings can also be used to represent dates.
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
For example, the above query is equivalent to the following:

.. code:: sql
SELECT * FROM pageviews
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
WHERE ROWTIME >= '2017-11-17 04:53:45'
AND ROWTIME <= '2017-11-17 04:53:48';

If the datestring is inexact, the rest of the timestamp is assumed to be 0.
For example, `ROWTIME = `2019-07-30 11:00` is equivalent to `ROWTIME = `2019-07-30 11:00:00`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For example, `ROWTIME = `2019-07-30 11:00` is equivalent to `ROWTIME = `2019-07-30 11:00:00`.
For example, ``ROWTIME = 2019-07-30 11:00`` is equivalent to ``ROWTIME = 2019-07-30 11:00:00``.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i might explicitly call out here that all unspecified elements of the full pattern are right-padded with zeros

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhere in here i would expect to see a mention of timezone handling too, as this always trips people up.... i.e. in what TZ are your iso-8601 strings interpreted ? that of teh running jvm or utc or ....?


A ``LIMIT`` can be used to limit the number of rows returned. Once the limit is reached the query will terminate.

Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.parser.rewrite.StatementRewriteForRowtime;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.EngineProcessingLogMessageFactory;
Expand Down Expand Up @@ -54,7 +55,7 @@ class SqlPredicate {
final FunctionRegistry functionRegistry,
final ProcessingLogger processingLogger
) {
this.filterExpression = requireNonNull(filterExpression, "filterExpression");
this.filterExpression = rewriteFilter(requireNonNull(filterExpression, "filterExpression"));
this.schema = requireNonNull(schema, "schema");
this.genericRowValueTypeEnforcer = new GenericRowValueTypeEnforcer(schema);
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
Expand All @@ -63,7 +64,7 @@ class SqlPredicate {

final CodeGenRunner codeGenRunner = new CodeGenRunner(schema, ksqlConfig, functionRegistry);
final Set<CodeGenRunner.ParameterType> parameters
= codeGenRunner.getParameterInfo(filterExpression);
= codeGenRunner.getParameterInfo(this.filterExpression);

final String[] parameterNames = new String[parameters.size()];
final Class[] parameterTypes = new Class[parameters.size()];
Expand All @@ -86,7 +87,7 @@ class SqlPredicate {
final String expressionStr = new SqlToJavaVisitor(
schema,
functionRegistry
).process(filterExpression);
).process(this.filterExpression);

ee.cook(expressionStr);
} catch (final Exception e) {
Expand All @@ -99,6 +100,14 @@ class SqlPredicate {
}
}

private Expression rewriteFilter(final Expression expression) {
if (StatementRewriteForRowtime.requiresRewrite(expression)) {
return new StatementRewriteForRowtime(expression).rewriteForRowtime();
}
return expression;
}


<K> Predicate<K, GenericRow> getPredicate() {
final ExpressionMetadata expressionEvaluator = createExpressionMetadata();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;

import io.confluent.ksql.parser.tree.BetweenPredicate;
import io.confluent.ksql.parser.tree.ComparisonExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
import io.confluent.ksql.parser.tree.Node;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.StringLiteral;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class StatementRewriteForRowtime {
private final Expression expression;

public StatementRewriteForRowtime(final Expression expression) {
this.expression = Objects.requireNonNull(expression, "expression");
}

public static boolean requiresRewrite(final Expression expression) {
return expression.toString().contains("ROWTIME");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will match any expression with ROWTIME in it. For instance, it will match the following expression where you should not indeed rewrite the expression:
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') = '2019-08-08 12:12:12'.
You need to change this such that it only rewrites the cases you have mentioned in the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular statement remained the same, but the rewriter itself has been updated to only rewrite timestrings when one side of the comparison is exactly ROWTIME.

}

public Expression rewriteForRowtime() {
return (Expression) new RewriteWithTimestampTransform().process(expression, null);
}

private static class TimestampRewriter extends StatementRewriter {
private static final String PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";

@Override
public Expression visitFunctionCall(final FunctionCall node, final Object context) {
return (Expression) new StatementRewriter().process(node, context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line do anything other than return a duplicate of the function call node? I think StatementRewriter should be abstract as its meant as a base class for rewrites. In itself it doesn't actually change the query, right?

Copy link
Contributor Author

@jzaralim jzaralim Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It stops the rewriter from further processing anything within a function. So, if the expression looks like ROWTIME > FOO('2017-01-01 ' + 100) + '2019-01-01', then '2019-01-01' will get wrapped but '2017-01-01' doesn't. This actually also addresses the first point above, since function calls don't get rewritten at all.

}

@Override
public Node visitStringLiteral(final StringLiteral node, final Object context) {
if (!node.getValue().equals("ROWTIME")) {
final List<Expression> args = new ArrayList<>();
args.add(new StringLiteral(appendZeros(node.getValue())));
args.add(new StringLiteral(PATTERN));
return new FunctionCall(QualifiedName.of("STRINGTOTIMESTAMP"), args);
}
return node;
}

private String appendZeros(final String timestamp) {
if (timestamp.length() >= PATTERN.length()) {
return timestamp;
}
return timestamp + PATTERN.substring(timestamp.length()).replaceAll("[a-zA-Z]", "0");
}
}

private static class RewriteWithTimestampTransform extends StatementRewriter {
@Override
public Expression visitBetweenPredicate(final BetweenPredicate node, final Object context) {
if (StatementRewriteForRowtime.requiresRewrite(node)) {
return new BetweenPredicate(
node.getLocation(),
(Expression) new TimestampRewriter().process(node.getValue(), context),
(Expression) new TimestampRewriter().process(node.getMin(), context),
(Expression) new TimestampRewriter().process(node.getMax(), context));
}
return new BetweenPredicate(
node.getLocation(),
(Expression) process(node.getValue(), context),
(Expression) process(node.getMin(), context),
(Expression) process(node.getMax(), context));
}

@Override
public Expression visitComparisonExpression(
final ComparisonExpression node,
final Object context) {
if (StatementRewriteForRowtime.requiresRewrite(node)) {
return new ComparisonExpression(
node.getLocation(),
node.getType(),
(Expression) new TimestampRewriter().process(node.getLeft(), context),
(Expression) new TimestampRewriter().process(node.getRight(), context));
}
return new ComparisonExpression(
node.getLocation(),
node.getType(),
(Expression) process(node.getLeft(), context),
(Expression) process(node.getRight(), context));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;

import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParserTestUtil;
import io.confluent.ksql.parser.tree.*;
import io.confluent.ksql.util.MetaStoreFixture;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

public class StatementRewriteForRowtimeTest {
private MetaStore metaStore;

@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class));
}

@Test
public void shouldReplaceStringWithLong() {
final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01 00:00:00.000';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
}

@Test
public void shouldHandleInexactTimestamp() {
final String query = "SELECT * FROM orders where ROWTIME = '2017-01-01';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
}

@Test
public void shouldHandleBetweenExpression() {
final String query = "SELECT * FROM orders where ROWTIME BETWEEN '2017-01-01' AND '2017-02-01';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME BETWEEN"
+ " STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') AND"
+ " STRINGTOTIMESTAMP('2017-02-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
}

@Test
public void shouldNotProcessStringsInFunctions() {
final String query = "SELECT * FROM orders where ROWTIME = foo('bar');";
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = FOO('bar'))"));
}

@Test
public void shouldIgnoreNonRowtimeStrings() {
final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01' AND ROWKEY = '2017-01-01';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("((ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) AND (ORDERS.ROWKEY = '2017-01-01'))"));
}

@Test
public void shouldHandleNestedExpressions() {
final String simpleQuery = "SELECT * FROM orders where FOO(ROWTIME + 6000) > '2017-01-01 00:00:00.000' + 35;";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a bit dangerous? We're assuming FOO is returning a long and that represents a datetime, but this might not be the case, e.g. SELECT * FROM orders where IS_LEAP_YEAR(ROWTIME + 6000) > '2017-01-01 00:00:00.000' + 35; wouldn't make sense!

I don't think we can make any such assumption on the return value from a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts were that this is purely a query rewriter, so those kinds of cases would get caught later on. So for example, IS_LEAP_YEAR(ROWTIME + 6000) > 35 passes this step, but would still eventually get caught during evaluation. I did change the overall logic of the rewriter as per @hjafarpour's comment though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They may get caught later on, but the UX for the user is not great. There's going to get some message that Foo can not be compared to Long or something, which is very confusing for the user.

final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), containsString("(FOO((ORDERS.ROWTIME + 6000)) > (STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') + 35))"));
}
}