Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CREATE IF NOT EXISTS does not work at all #6073

Merged
merged 9 commits into from
Dec 8, 2020

Conversation

hemantgs
Copy link
Contributor

Description

IF NOT EXISTS were not added to the docs for Streams , even though support has been provided in code.
This came up in discussion of #6036
Added docs for the same

Testing done

None

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@hemantgs hemantgs requested review from JimGalasyn and a team as code owners August 21, 2020 15:38
@ghost
Copy link

ghost commented Aug 21, 2020

@confluentinc It looks like @hemantgs just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@spena
Copy link
Member

spena commented Aug 31, 2020

There's a bug in the CREATE STREAM IF NOT EXISTS command. It still returns an error if the stream does exists.
#6050

We should fix the above and the doc.

@hemantgs
Copy link
Contributor Author

hemantgs commented Sep 1, 2020

@spena Is someone looking at it ? else I can try 👍

@big-andy-coates
Copy link
Contributor

@spena Is someone looking at it ? else I can try 👍

May as well fix in this PR if you have the time @hemantgs

@hemantgs
Copy link
Contributor Author

hemantgs commented Sep 1, 2020

@spena Is someone looking at it ? else I can try +1

May as well fix in this PR if you have the time @hemantgs

Sure 👍

@@ -75,6 +75,13 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
withQuery,
getKsqlTopic(createStream)
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates @spena
I noticed that CreateStreamCommand did not have the boolean flag for ifNotExists set here , I thought if this approach is ok , I will add that too and fail the query if its not set and source exists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The place to put the check is in the CreateSourceFactory. You'll need to pass the MetaStore in to the factory's constructor.

See DropSourceFactory for an example of the pattern.

@hemantgs
Copy link
Contributor Author

hemantgs commented Sep 1, 2020

@spena Is someone looking at it ? else I can try +1

May as well fix in this PR if you have the time @hemantgs

I took a stab at this 😄 , can you check if I am in the right path , if yes then I will fix the tests and add more , and add the ifNotExists boolean to CreateStreamCommand as well

@big-andy-coates
Copy link
Contributor

What out for merge clashes with #6041. Would hate to lose some of your doc changes in a merge error.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick review before I sign off. Thanks!

Comment on lines 25 to 26
If the IF NOT EXISTS clause is present, the statement doesn't fail if the
stream doesn't exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If the IF NOT EXISTS clause is present, the statement doesn't fail if the
stream doesn't exist.
If the IF NOT EXISTS clause is present, the statement won't fail if a
stream with the same name already exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this

Comment on lines 22 to 23
If the IF NOT EXISTS clause is present, the statement doesn't fail if the
table doesn't exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If the IF NOT EXISTS clause is present, the statement doesn't fail if the
table doesn't exist.
If the IF NOT EXISTS clause is present, the statement won't fail if a
table with the same name already exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

@@ -75,6 +75,13 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
withQuery,
getKsqlTopic(createStream)
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The place to put the check is in the CreateSourceFactory. You'll need to pass the MetaStore in to the factory's constructor.

See DropSourceFactory for an example of the pattern.

@@ -77,22 +77,20 @@ public DataSource getSource(final SourceName sourceName) {
}

@Override
public void putSource(final DataSource dataSource, final boolean allowReplace) {
public boolean validateSource(final DataSource dataSource, final boolean allowReplace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be splitting this into two methods, as this makes it much more likely someone will call putSource and inadvertently overwrite a source that they shouldn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point,
I looked at DropSourceFactory , even there

 if (dataSource == null) {
      if (!ifExists) {
        throw new KsqlException("Source " + sourceName.text() + " does not exist.");
      }
    }

We throw an Exception which will Fail the command which should not be the outcome right ,

The only place I could find other than DdlCommandExec that had capability to pass along a warning was in KsqlResource.handleKsqlStatements()
where if an exception is thrown in MetaStoreImpl we could possibly catch that and throw a warning like

catch (final KsqlStatementException e) {
      LOG.info("Processed unsuccessfully: " + request + ", reason: " + e.getMessage());
      Errors.badStatement("",e.getSqlStatement(),new KsqlEntityList(
          Arrays.asList(new WarningEntity(e.getSqlStatement(),"Source Exists"))
      ));

Even here the first parameter to badStatement() seems it needs to be an empty string , else the msg is printed in the client in red . What do you suggest we do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw an Exception which will Fail the command which should not be the outcome right ,

That code looks correct to me for dropping a source. It means if I say DROP TABLE IF EXISTS FOO that it won't throw if FOO doesn't exist, which is what we want.

As to where to put this new check - you'll need two changes:

  1. as above, put a check in CreateSourceFactory so that it throws an exception if IF NOT EXIST is not present and a source with the same name already exists. (Following a similar pattern to DropSourceFactory).

  2. In DdlCommandExec.executeCreateStream and executeCreateTable you will need to add a check to see if a source with the same name already exists and isOrReplace() is false, e.g.

    @Override
    public DdlCommandResult executeCreateStream(final CreateStreamCommand createStream) {
      final SourceName sourceName = createStream.getSourceName();
      final DataSource existing = metaStore.getSource(sourceName);
      if (existing != null && !createStream.isOrReplace()) {
        return new DdlCommandResult(true, "Stream " + sourceName + " does not exist.");
      }

      final KsqlStream<?> ksqlStream = new KsqlStream<>(
          sql,
          sourceName,
          createStream.getSchema(),
          createStream.getFormats().getOptions(),
          createStream.getTimestampColumn(),
          withQuery,
          getKsqlTopic(createStream)
      );
      
      metaStore.putSource(ksqlStream, createStream.isOrReplace());
      return new DdlCommandResult(true, "Stream created");
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code looks correct to me for dropping a source. It means if I say DROP TABLE IF EXISTS FOO that it won't throw if FOO doesn't exist, which is what we want.

@big-andy-coates I always seem to confuse myself with that part :| , so anyway I have made the changes
i have made changes to KsqlEngineTest.shouldNotThrowWhenExecutingDuplicateStream() and KsqlEngineTest.shouldThrowWhenExecutingDuplicateTable() , these tests were failing when the query plans for both create source queries had different Kafka topics one had FOO(stream name) and other had BAR, before this PR the test would have failed much before getting to the planning phase, so I have made changes to the queries in the tests , I am unsure if that is alright

Also build in Jenkins has failed I get 404 when I try to open the link ,
When I try mvn verify I get

[INFO] [ERROR] Failed to execute goal on project my-first-test-udf: Could not resolve dependencies for project com.acme.ksql.functions:my-first-test-udf:jar:0.1.0-SNAPSHOT: Could not find artifact io.confluent.ksql:ksqldb-udf:jar:6.1.0-SNAPSHOT in confluent (https://packages.confluent.io/maven/) -> [Help 1]
[INFO] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project my-first-test-udf: Could not resolve dependencies for project com.acme.ksql.functions:my-first-test-udf:jar:0.1.0-SNAPSHOT: Could not find artifact io.confluent.ksql:ksqldb-udf:jar:6.1.0-SNAPSHOT in confluent (https://packages.confluent.io/maven/)

and it fails
I am a bit unsure as to how to run all the tests

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hemantgs, this is looking close, except looks like you've still got some changes in there from the old approach.

Also, would you mind adding some json QTT style tests to cover adding tables/streams when a source (or the opposite type) already exists with the same name:

  • Positive tests: i.e. with IF NOT EXISTS, then ensure selecting from the source is selecting from the pre-existing source, not the new source, i.e. testing that the source was NOT replaced.
  • Negative tests: i.e. without IF NOT EXISTS and testing the exception is thrown and the error message is correct.

Thanks!

) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.serdeOptionsSupplier =
Objects.requireNonNull(serdeOptionsSupplier, "serdeOptionsSupplier");
this.keySerdeFactory = Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
this.metaStore = metaStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: validate params that will be stored in object state; ensuring object does not get into an invalid state. i.e. use requireNonNull.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done

@@ -115,6 +131,17 @@ public CreateTableCommand createTableCommand(
final KsqlConfig ksqlConfig
) {
final SourceName sourceName = statement.getName();
final DataSource dataSource = metaStore.getSource(sourceName);

if (dataSource != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (dataSource != null) {
if (dataSource != null && !statement.isOrReplace()) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done

Comment on lines 97 to 98
String.format("Cannot add %s '%s': A %s with the same name already exists",
sourceType, sourceName.text(), sourceType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String.format("Cannot add %s '%s': A %s with the same name already exists",
sourceType, sourceName.text(), sourceType));
String.format("Cannot add stream '%s': A %s with the same name already exists",
sourceName.text(), sourceType.toLowerCase()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done

Comment on lines 140 to 141
String.format("Cannot add %s '%s': A %s with the same name already exists",
sourceType, sourceName.text(), sourceType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String.format("Cannot add %s '%s': A %s with the same name already exists",
sourceType, sourceName.text(), sourceType));
String.format("Cannot add table '%s': A %s with the same name already exists",
sourceName.text(), sourceType.toLowerCase()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done

Comment on lines 72 to 93
if (dataSource != null && !createStream.isOrReplace()) {
return new DdlCommandResult(true,
String.format("Cannot add stream %s: A stream with the same name "
+ "already exists.",
sourceName));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this check now redundant, given there is code in CreateSourceFactory to do this now? We should let the code do what it used to do, which is throw an exception, not return a success code!

Copy link
Contributor Author

@hemantgs hemantgs Sep 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates The code in CreateSourceFactory checks if a dataSource already exists , if isOrReplace() is not set and ifNotExists is also not set and throws exception , which seems right
If the code passes the checks in that gate ,it means in DdlCommandExec it would mean the ifNotExists flag is set to true and we need to ensure that the statement does NOT fail ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems redundant, but it is what DROP IF NOT EXIST does too. It has a check in the DropSourceCommand and another in this class, which returns the DdlCommandResult message without causing a failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out why we need this repeated code. Turns out that CreateSourceFactory is only called for CREATE without queries statements. For CREATE AS, a different code path is called, which creates the Create command similar to the CreateSourceFactory, but without the factory validations and with more parameters. That path is EngineExecutor.maybeCreateSinkDdl.

I was thinking on refactor the code, but seems it requires several changes. For now, I leave this temporary and do a check inside the maybeCreateSinkDdl for the isNotExists value. I'll do the refactor in another PR.

+ "CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;");

givenStatementAlreadyExecuted(parsed.get(0));

final PreparedStatement<?> prepared = prepare(parsed.get(1));
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed.get(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.

Comment on lines 992 to 994
ExecuteResult executeResult = ksqlEngine.execute(
serviceContext,
of(prepared, new HashMap<>(), KSQL_CONFIG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.

"Cannot add table 'FOO': A table with the same name already exists")));
assertThat(e, statementText(is(
"CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;")));
assertThat(executeResult.getQuery(), is(not(Optional.empty())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.

Comment on lines 1033 to 1326
public void shouldNotThrowWhenExecutingDuplicateStream() {
// Given:
final List<ParsedStatement> parsed = ksqlEngine.parse(
"CREATE STREAM FOO AS SELECT * FROM ORDERS; "
"CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS; "
+ "CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;");

givenStatementAlreadyExecuted(parsed.get(0));

final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed.get(1));

// When:
final KsqlStatementException e = assertThrows(
KsqlStatementException.class,
() -> ksqlEngine.execute(
serviceContext,
of(prepared, new HashMap<>(), KSQL_CONFIG)
)
ExecuteResult executeResult = ksqlEngine.execute(
serviceContext,
of(prepared, new HashMap<>(), KSQL_CONFIG)
);

// Then:
assertThat(e, rawMessage(
is(
"Cannot add stream 'FOO': A stream with the same name already exists")));
assertThat(e, statementText(
is(
"CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;")));
assertThat(executeResult.getQuery(), is(not(Optional.empty())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.

@@ -63,6 +63,7 @@
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Member

@spena spena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hemantgs Sorry for the late review. I left 3 comments only. Are you still working in this, btw?

Comment on lines 72 to 93
if (dataSource != null && !createStream.isOrReplace()) {
return new DdlCommandResult(true,
String.format("Cannot add stream %s: A stream with the same name "
+ "already exists.",
sourceName));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems redundant, but it is what DROP IF NOT EXIST does too. It has a check in the DropSourceCommand and another in this class, which returns the DdlCommandResult message without causing a failure.

@@ -911,6 +929,41 @@ public void shouldThrowIfTableIsMissingPrimaryKey() {
containsString("Tables require a PRIMARY KEY. Please define the PRIMARY KEY."));
}

@Test
public void shouldThrowIfStreamExits() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add two more tests shouldNotThrowWhenCreateStreamIfNotExistsIsSet and same for a table?

@@ -109,11 +109,11 @@ public void init() {

final String endpointHttp = supportConfig.getEndpointHttp();
final String endpointHttps = supportConfig.getEndpointHttps();
final String proxyURI = supportConfig.getProxy();
final String proxyUri = supportConfig.getProxy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of the changes? Should it be reverted?

Comment on lines +128 to +134
createSourceCommand.getSchema(),
createSourceCommand.getTopicName(),
createSourceCommand.getFormats().getKeyFormat(),
createSourceCommand.getFormats().getKeyFeatures(),
createSourceCommand.getFormats().getValueFormat(),
createSourceCommand.getFormats().getValueFeatures(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to build the query information from the plan instead of executing it. This avoids failing with an error if the IF NOT EXISTS keyword is used in the CREATE_AS statement. Also, this seems a better approach than execution 'cause this injector is only registering the schema in SR.

@@ -981,27 +981,21 @@ public void shouldNotThrowWhenPreparingDuplicateTable() {
public void shouldThrowWhenExecutingDuplicateTable() {
// Given:
final List<ParsedStatement> parsed = ksqlEngine.parse(
"CREATE TABLE FOO AS SELECT * FROM TEST2; "
"CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2; "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new test now that verifies the IF NOT EXISTS works.

@spena spena requested a review from a team November 13, 2020 22:06
@spena spena added this to the 0.15.0 milestone Dec 3, 2020
@spena spena changed the title docs: Added missing IF NOT EXISTS to Streams and Tables doc fix: CREATE IF NOT EXISTS does not work at all Dec 4, 2020
@spena spena requested review from a team and removed request for a team December 4, 2020 20:47

if (dataSource != null && !createStream.isOrReplace()) {
return new DdlCommandResult(true,
String.format("Cannot add stream %s: A stream with the same name "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think we should use dataSource.getDataSourceType().getKsqlType() here and in executeCreateTable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I updated the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants