-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: CREATE IF NOT EXISTS does not work at all #6073
Conversation
@confluentinc It looks like @hemantgs just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There's a bug in the CREATE STREAM IF NOT EXISTS command. It still returns an error if the stream does exists. We should fix the above and the doc. |
@spena Is someone looking at it ? else I can try 👍 |
760f5d8
to
04546cc
Compare
@@ -75,6 +75,13 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre | |||
withQuery, | |||
getKsqlTopic(createStream) | |||
); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@big-andy-coates @spena
I noticed that CreateStreamCommand
did not have the boolean flag for ifNotExists set here , I thought if this approach is ok , I will add that too and fail the query if its not set and source exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The place to put the check is in the CreateSourceFactory
. You'll need to pass the MetaStore
in to the factory's constructor.
See DropSourceFactory
for an example of the pattern.
What out for merge clashes with #6041. Would hate to lose some of your doc changes in a merge error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick review before I sign off. Thanks!
If the IF NOT EXISTS clause is present, the statement doesn't fail if the | ||
stream doesn't exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the IF NOT EXISTS clause is present, the statement doesn't fail if the | |
stream doesn't exist. | |
If the IF NOT EXISTS clause is present, the statement won't fail if a | |
stream with the same name already exists. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed this
If the IF NOT EXISTS clause is present, the statement doesn't fail if the | ||
table doesn't exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the IF NOT EXISTS clause is present, the statement doesn't fail if the | |
table doesn't exist. | |
If the IF NOT EXISTS clause is present, the statement won't fail if a | |
table with the same name already exists. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this too
@@ -75,6 +75,13 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre | |||
withQuery, | |||
getKsqlTopic(createStream) | |||
); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The place to put the check is in the CreateSourceFactory
. You'll need to pass the MetaStore
in to the factory's constructor.
See DropSourceFactory
for an example of the pattern.
@@ -77,22 +77,20 @@ public DataSource getSource(final SourceName sourceName) { | |||
} | |||
|
|||
@Override | |||
public void putSource(final DataSource dataSource, final boolean allowReplace) { | |||
public boolean validateSource(final DataSource dataSource, final boolean allowReplace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be splitting this into two methods, as this makes it much more likely someone will call putSource
and inadvertently overwrite a source that they shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point,
I looked at DropSourceFactory
, even there
if (dataSource == null) {
if (!ifExists) {
throw new KsqlException("Source " + sourceName.text() + " does not exist.");
}
}
We throw an Exception which will Fail the command which should not be the outcome right ,
The only place I could find other than DdlCommandExec
that had capability to pass along a warning was in KsqlResource.handleKsqlStatements()
where if an exception is thrown in MetaStoreImpl
we could possibly catch that and throw a warning like
catch (final KsqlStatementException e) {
LOG.info("Processed unsuccessfully: " + request + ", reason: " + e.getMessage());
Errors.badStatement("",e.getSqlStatement(),new KsqlEntityList(
Arrays.asList(new WarningEntity(e.getSqlStatement(),"Source Exists"))
));
Even here the first parameter to badStatement() seems it needs to be an empty string , else the msg is printed in the client in red
. What do you suggest we do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We throw an Exception which will Fail the command which should not be the outcome right ,
That code looks correct to me for dropping a source. It means if I say DROP TABLE IF EXISTS FOO
that it won't throw if FOO
doesn't exist, which is what we want.
As to where to put this new check - you'll need two changes:
-
as above, put a check in
CreateSourceFactory
so that it throws an exception ifIF NOT EXIST
is not present and a source with the same name already exists. (Following a similar pattern toDropSourceFactory
). -
In
DdlCommandExec.executeCreateStream
andexecuteCreateTable
you will need to add a check to see if a source with the same name already exists andisOrReplace()
is false, e.g.
@Override
public DdlCommandResult executeCreateStream(final CreateStreamCommand createStream) {
final SourceName sourceName = createStream.getSourceName();
final DataSource existing = metaStore.getSource(sourceName);
if (existing != null && !createStream.isOrReplace()) {
return new DdlCommandResult(true, "Stream " + sourceName + " does not exist.");
}
final KsqlStream<?> ksqlStream = new KsqlStream<>(
sql,
sourceName,
createStream.getSchema(),
createStream.getFormats().getOptions(),
createStream.getTimestampColumn(),
withQuery,
getKsqlTopic(createStream)
);
metaStore.putSource(ksqlStream, createStream.isOrReplace());
return new DdlCommandResult(true, "Stream created");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That code looks correct to me for dropping a source. It means if I say DROP TABLE IF EXISTS FOO that it won't throw if FOO doesn't exist, which is what we want.
@big-andy-coates I always seem to confuse myself with that part :| , so anyway I have made the changes
i have made changes to KsqlEngineTest.shouldNotThrowWhenExecutingDuplicateStream()
and KsqlEngineTest.shouldThrowWhenExecutingDuplicateTable()
, these tests were failing when the query plans for both create source queries had different Kafka topics one had FOO(stream name) and other had BAR, before this PR the test would have failed much before getting to the planning phase, so I have made changes to the queries in the tests , I am unsure if that is alright
Also build in Jenkins has failed I get 404 when I try to open the link ,
When I try mvn verify
I get
[INFO] [ERROR] Failed to execute goal on project my-first-test-udf: Could not resolve dependencies for project com.acme.ksql.functions:my-first-test-udf:jar:0.1.0-SNAPSHOT: Could not find artifact io.confluent.ksql:ksqldb-udf:jar:6.1.0-SNAPSHOT in confluent (https://packages.confluent.io/maven/) -> [Help 1]
[INFO] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project my-first-test-udf: Could not resolve dependencies for project com.acme.ksql.functions:my-first-test-udf:jar:0.1.0-SNAPSHOT: Could not find artifact io.confluent.ksql:ksqldb-udf:jar:6.1.0-SNAPSHOT in confluent (https://packages.confluent.io/maven/)
and it fails
I am a bit unsure as to how to run all the tests
2559b8c
to
fa37637
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hemantgs, this is looking close, except looks like you've still got some changes in there from the old approach.
Also, would you mind adding some json QTT style tests to cover adding tables/streams when a source (or the opposite type) already exists with the same name:
- Positive tests: i.e. with
IF NOT EXISTS
, then ensure selecting from the source is selecting from the pre-existing source, not the new source, i.e. testing that the source was NOT replaced. - Negative tests: i.e. without
IF NOT EXISTS
and testing the exception is thrown and the error message is correct.
Thanks!
) { | ||
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); | ||
this.serdeOptionsSupplier = | ||
Objects.requireNonNull(serdeOptionsSupplier, "serdeOptionsSupplier"); | ||
this.keySerdeFactory = Objects.requireNonNull(keySerdeFactory, "keySerdeFactory"); | ||
this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory"); | ||
this.metaStore = metaStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: validate params that will be stored in object state; ensuring object does not get into an invalid state. i.e. use requireNonNull
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done
@@ -115,6 +131,17 @@ public CreateTableCommand createTableCommand( | |||
final KsqlConfig ksqlConfig | |||
) { | |||
final SourceName sourceName = statement.getName(); | |||
final DataSource dataSource = metaStore.getSource(sourceName); | |||
|
|||
if (dataSource != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (dataSource != null) { | |
if (dataSource != null && !statement.isOrReplace()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done
String.format("Cannot add %s '%s': A %s with the same name already exists", | ||
sourceType, sourceName.text(), sourceType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format("Cannot add %s '%s': A %s with the same name already exists", | |
sourceType, sourceName.text(), sourceType)); | |
String.format("Cannot add stream '%s': A %s with the same name already exists", | |
sourceName.text(), sourceType.toLowerCase())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done
String.format("Cannot add %s '%s': A %s with the same name already exists", | ||
sourceType, sourceName.text(), sourceType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format("Cannot add %s '%s': A %s with the same name already exists", | |
sourceType, sourceName.text(), sourceType)); | |
String.format("Cannot add table '%s': A %s with the same name already exists", | |
sourceName.text(), sourceType.toLowerCase())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done
if (dataSource != null && !createStream.isOrReplace()) { | ||
return new DdlCommandResult(true, | ||
String.format("Cannot add stream %s: A stream with the same name " | ||
+ "already exists.", | ||
sourceName)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this check now redundant, given there is code in CreateSourceFactory
to do this now? We should let the code do what it used to do, which is throw an exception, not return a success code!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@big-andy-coates The code in CreateSourceFactory
checks if a dataSource
already exists , if isOrReplace()
is not set and ifNotExists
is also not set and throws exception , which seems right
If the code passes the checks in that gate ,it means in DdlCommandExec
it would mean the ifNotExists flag is set to true and we need to ensure that the statement does NOT fail ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems redundant, but it is what DROP IF NOT EXIST does too. It has a check in the DropSourceCommand and another in this class, which returns the DdlCommandResult message without causing a failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out why we need this repeated code. Turns out that CreateSourceFactory
is only called for CREATE
without queries statements. For CREATE AS
, a different code path is called, which creates the Create command similar to the CreateSourceFactory
, but without the factory validations and with more parameters. That path is EngineExecutor.maybeCreateSinkDdl
.
I was thinking on refactor the code, but seems it requires several changes. For now, I leave this temporary and do a check inside the maybeCreateSinkDdl
for the isNotExists
value. I'll do the refactor in another PR.
+ "CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;"); | ||
|
||
givenStatementAlreadyExecuted(parsed.get(0)); | ||
|
||
final PreparedStatement<?> prepared = prepare(parsed.get(1)); | ||
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed.get(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be reverted.
ExecuteResult executeResult = ksqlEngine.execute( | ||
serviceContext, | ||
of(prepared, new HashMap<>(), KSQL_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be reverted.
"Cannot add table 'FOO': A table with the same name already exists"))); | ||
assertThat(e, statementText(is( | ||
"CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;"))); | ||
assertThat(executeResult.getQuery(), is(not(Optional.empty()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be reverted.
public void shouldNotThrowWhenExecutingDuplicateStream() { | ||
// Given: | ||
final List<ParsedStatement> parsed = ksqlEngine.parse( | ||
"CREATE STREAM FOO AS SELECT * FROM ORDERS; " | ||
"CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS; " | ||
+ "CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;"); | ||
|
||
givenStatementAlreadyExecuted(parsed.get(0)); | ||
|
||
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed.get(1)); | ||
|
||
// When: | ||
final KsqlStatementException e = assertThrows( | ||
KsqlStatementException.class, | ||
() -> ksqlEngine.execute( | ||
serviceContext, | ||
of(prepared, new HashMap<>(), KSQL_CONFIG) | ||
) | ||
ExecuteResult executeResult = ksqlEngine.execute( | ||
serviceContext, | ||
of(prepared, new HashMap<>(), KSQL_CONFIG) | ||
); | ||
|
||
// Then: | ||
assertThat(e, rawMessage( | ||
is( | ||
"Cannot add stream 'FOO': A stream with the same name already exists"))); | ||
assertThat(e, statementText( | ||
is( | ||
"CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;"))); | ||
assertThat(executeResult.getQuery(), is(not(Optional.empty()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be reverted.
@@ -63,6 +63,7 @@ | |||
import io.confluent.ksql.version.metrics.ActivenessRegistrar; | |||
import java.net.URL; | |||
import java.time.Duration; | |||
import java.util.Arrays; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change should be reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
fa37637
to
6781631
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hemantgs Sorry for the late review. I left 3 comments only. Are you still working in this, btw?
if (dataSource != null && !createStream.isOrReplace()) { | ||
return new DdlCommandResult(true, | ||
String.format("Cannot add stream %s: A stream with the same name " | ||
+ "already exists.", | ||
sourceName)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems redundant, but it is what DROP IF NOT EXIST does too. It has a check in the DropSourceCommand and another in this class, which returns the DdlCommandResult message without causing a failure.
@@ -911,6 +929,41 @@ public void shouldThrowIfTableIsMissingPrimaryKey() { | |||
containsString("Tables require a PRIMARY KEY. Please define the PRIMARY KEY.")); | |||
} | |||
|
|||
@Test | |||
public void shouldThrowIfStreamExits() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add two more tests shouldNotThrowWhenCreateStreamIfNotExistsIsSet and same for a table?
@@ -109,11 +109,11 @@ public void init() { | |||
|
|||
final String endpointHttp = supportConfig.getEndpointHttp(); | |||
final String endpointHttps = supportConfig.getEndpointHttps(); | |||
final String proxyURI = supportConfig.getProxy(); | |||
final String proxyUri = supportConfig.getProxy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part of the changes? Should it be reverted?
7e21ffd
to
b7aada8
Compare
b7aada8
to
b1ea153
Compare
createSourceCommand.getSchema(), | ||
createSourceCommand.getTopicName(), | ||
createSourceCommand.getFormats().getKeyFormat(), | ||
createSourceCommand.getFormats().getKeyFeatures(), | ||
createSourceCommand.getFormats().getValueFormat(), | ||
createSourceCommand.getFormats().getValueFeatures(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to build the query information from the plan instead of executing it. This avoids failing with an error if the IF NOT EXISTS
keyword is used in the CREATE_AS statement. Also, this seems a better approach than execution 'cause this injector is only registering the schema in SR.
@@ -981,27 +981,21 @@ public void shouldNotThrowWhenPreparingDuplicateTable() { | |||
public void shouldThrowWhenExecutingDuplicateTable() { | |||
// Given: | |||
final List<ParsedStatement> parsed = ksqlEngine.parse( | |||
"CREATE TABLE FOO AS SELECT * FROM TEST2; " | |||
"CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2; " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new test now that verifies the IF NOT EXISTS
works.
b1ea153
to
a8a1850
Compare
a8a1850
to
9a1c440
Compare
|
||
if (dataSource != null && !createStream.isOrReplace()) { | ||
return new DdlCommandResult(true, | ||
String.format("Cannot add stream %s: A stream with the same name " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think we should use dataSource.getDataSourceType().getKsqlType()
here and in executeCreateTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I updated the PR.
Changes reverted and added to CommandFactories Fixed and added tests Fixed and added tests Fixed tests
9a1c440
to
8a84700
Compare
Description
IF NOT EXISTS were not added to the docs for Streams , even though support has been provided in code.
This came up in discussion of #6036
Added docs for the same
Testing done
None
Reviewer checklist