Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap timestamps in ROWTIME expressions with STRINGTOTIMESTAMP #3160

Merged
merged 5 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor: add qtt tests and time zone support, change rowtime detection
  • Loading branch information
Zara Lim committed Aug 8, 2019
commit 7b1352a109d40b4887be60d5106fde335669be93
9 changes: 5 additions & 4 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1159,13 +1159,14 @@ For example, the above query is equivalent to the following:
.. code:: sql

SELECT * FROM pageviews
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
WHERE ROWTIME >= '2017-11-17 04:53:45'
AND ROWTIME <= '2017-11-17 04:53:48';
WHERE ROWTIME >= '2017-11-17T04:53:45'
AND ROWTIME <= '2017-11-17T04:53:48';

If the datestring is inexact, the rest of the timestamp is assumed to be padded with 0's.
For example, ``ROWTIME = '2019-07-30 11:00'`` is equivalent to ``ROWTIME = '2019-07-30 11:00:00.0000'``.
For example, ``ROWTIME = '2019-07-30T11:00'`` is equivalent to ``ROWTIME = '2019-07-30T11:00:00.0000'``.

Note that the timestamps are interperted in the UTC timezone.
Timezones can be specified within the datestring. For example, `2017-11-17T04:53:45-0330` is in the Newfoundland time
zone. If no timezone is specified within the datestring, then timestamps are interperted in the UTC timezone.

A ``LIMIT`` can be used to limit the number of rows returned. Once the limit is reached the query will terminate.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;

import java.time.ZoneId;
import java.util.concurrent.ExecutionException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{
"comments": [
"Tests covering filters using ROWTIME"
],
"tests": [
{
"name": "test ROWTIME",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 0},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 1546300808000},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 0}
],
"outputs": [
{"topic": "OUTPUT", "key": 2, "value": {"THING": 2}, "timestamp": 1546300808000},
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1546300800000}
]
},
{
"name": "test ROWTIME with BETWEEN",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00' AND '2019-12-31T23:59:59';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300808000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1536307808000},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300808000},
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1536307808000}
]
},
{
"name": "test ROWTIME with timezone",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000},
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
},
{
"name": "test ROWTIME with AND",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
},
{
"name": "test ROWTIME with inexact timestring",
"statements": [
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0},
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000},
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0},
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0},
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0},
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000},
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.parser.tree.BetweenPredicate;
import io.confluent.ksql.parser.tree.ComparisonExpression;
import io.confluent.ksql.parser.tree.DereferenceExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
import io.confluent.ksql.parser.tree.Node;
Expand All @@ -43,7 +44,9 @@ public Expression rewriteForRowtime() {
}

private static class TimestampRewriter extends StatementRewriter {
private static final String PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
private static final String DATE_PATTERN = "yyyy-MM-dd";
private static final String TIME_PATTERN = "HH:mm:ss.SSS";
private static final String PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN;

@Override
public Expression visitFunctionCall(final FunctionCall node, final Object context) {
Expand All @@ -53,19 +56,69 @@ public Expression visitFunctionCall(final FunctionCall node, final Object contex
@Override
public Node visitStringLiteral(final StringLiteral node, final Object context) {
if (!node.getValue().equals("ROWTIME")) {
final List<Expression> args = new ArrayList<>();
args.add(new StringLiteral(appendZeros(node.getValue())));
args.add(new StringLiteral(PATTERN));
return new FunctionCall(QualifiedName.of("STRINGTOTIMESTAMP"), args);
return new FunctionCall(
QualifiedName.of("STRINGTOTIMESTAMP"),
getFunctionArgs(node.getValue()));
}
return node;
}

private String appendZeros(final String timestamp) {
if (timestamp.length() >= PATTERN.length()) {
return timestamp;
private List<Expression> getFunctionArgs(final String datestring) {
final List<Expression> args = new ArrayList<>();
final String date;
final String time;
final String timezone;
if (datestring.contains("T")) {
date = datestring.substring(0, datestring.indexOf('T'));
final String withTimezone = completeTime(datestring.substring(datestring.indexOf('T') + 1));
timezone = getTimezone(withTimezone);
time = completeTime(withTimezone.substring(0, timezone.length()));
} else {
date = completeDate(datestring);
time = completeTime("");
timezone = "";
}

if (timezone.length() > 0) {
args.add(new StringLiteral(date + "T" + time));
args.add(new StringLiteral(PATTERN));
args.add(new StringLiteral(timezone));
} else {
args.add(new StringLiteral(date + "T" + time));
args.add(new StringLiteral(PATTERN));
}
return timestamp + PATTERN.substring(timestamp.length()).replaceAll("[a-zA-Z]", "0");
return args;
}

private String getTimezone(final String time) {
if (time.contains("+")) {
return time.substring(time.indexOf('+'));
} else if (time.contains("-")) {
return time.substring(time.indexOf('-'));
} else {
return "";
}
}

private String completeDate(final String date) {
final String[] parts = date.split("-");
if (parts.length == 1) {
return date + "-01-01";
} else if (parts.length == 2) {
return date + "-01";
} else {
// It is either a complete date or an incorrectly formatted one.
// In the latter case, we can pass the incorrectly formed string
// to STRINGTITIMESTAMP which will deal with the error handling.
return date;
}
}

private String completeTime(final String time) {
if (time.length() >= TIME_PATTERN.length()) {
return time;
}
return time + TIME_PATTERN.substring(time.length()).replaceAll("[a-zA-Z]", "0");
}
}

Expand All @@ -90,7 +143,7 @@ public Expression visitBetweenPredicate(final BetweenPredicate node, final Objec
public Expression visitComparisonExpression(
final ComparisonExpression node,
final Object context) {
if (StatementRewriteForRowtime.requiresRewrite(node)) {
if (expressionIsRowtime(node.getLeft()) || expressionIsRowtime(node.getRight())) {
return new ComparisonExpression(
node.getLocation(),
node.getType(),
Expand All @@ -104,4 +157,9 @@ public Expression visitComparisonExpression(
(Expression) process(node.getRight(), context));
}
}

private static boolean expressionIsRowtime(final Expression node) {
return (node instanceof DereferenceExpression)
&& ((DereferenceExpression) node).getFieldName().equals("ROWTIME");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public void init() {
}

@Test
public void shouldReplaceStringWithLong() {
final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01 00:00:00.000';";
public void shouldWrapDatestring() {
final String query = "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))"));
}

@Test
public void shouldHandleInexactTimestamp() {
final String query = "SELECT * FROM orders where ROWTIME = '2017-01-01';";
final String query = "SELECT * FROM orders where ROWTIME = '2017';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))"));
}

@Test
Expand All @@ -64,18 +64,18 @@ public void shouldHandleBetweenExpression() {
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME BETWEEN"
+ " STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') AND"
+ " STRINGTOTIMESTAMP('2017-02-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS'))"));
+ " STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS') AND"
+ " STRINGTOTIMESTAMP('2017-02-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS'))"));
}

@Test
public void shouldNotProcessStringsInFunctions() {
final String query = "SELECT * FROM orders where ROWTIME = foo('bar');";
final String query = "SELECT * FROM orders where ROWTIME = foo('2017-01-01');";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = FOO('bar'))"));
assertThat(rewritten.toString(), equalTo("(ORDERS.ROWTIME = FOO('2017-01-01'))"));
}

@Test
Expand All @@ -85,16 +85,16 @@ public void shouldIgnoreNonRowtimeStrings() {
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), equalTo("((ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) AND (ORDERS.ROWKEY = '2017-01-01'))"));
assertThat(rewritten.toString(), equalTo("((ORDERS.ROWTIME > STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS')) AND (ORDERS.ROWKEY = '2017-01-01'))"));
}

@Test
public void shouldHandleNestedExpressions() {
final String simpleQuery = "SELECT * FROM orders where FOO(ROWTIME + 6000) > '2017-01-01 00:00:00.000' + 35;";
public void shouldHandleTimezones() {
final String simpleQuery = "SELECT * FROM orders where ROWTIME = '2017-01-01T00:00:00.000+0100';";
final Query statement = (Query) KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement();
final Expression predicate = statement.getWhere().get();
final Expression rewritten = new StatementRewriteForRowtime(predicate).rewriteForRowtime();

assertThat(rewritten.toString(), containsString("(FOO((ORDERS.ROWTIME + 6000)) > (STRINGTOTIMESTAMP('2017-01-01 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') + 35))"));
assertThat(rewritten.toString(), containsString("(ORDERS.ROWTIME = STRINGTOTIMESTAMP('2017-01-01T00:00:00.000', 'yyyy-MM-dd''T''HH:mm:ss.SSS', '+0100'))"));
}
}