Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(3525): SET should only affect statements after it #3529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix(3525): sET should only affect statements after it
Fixes #3525

This commit fixes a regression that sees an old issue reappearing where by a SET statement affects not just statements that follow it, but also statements before it. The primary cause of this is the fact that `ConfiguredStatement` contains a reference to a mutable map containing the property overrides. This map is mutated by subsequent SET statements.  The fix is to make for `ConfiguredStatement` to take a immutable defensive copy, as is good programming practice.  This involves adding explicit handling of SET and UNSET statements, which can no long mutate the overrides in `ConfiguredStatement`.
  • Loading branch information
big-andy-coates committed Oct 10, 2019
commit cc4e2dd861c9c6a43356c8a3421c769d6e97b62c
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,23 +123,25 @@ public List<QueryMetadata> sql(final String sql) {
return sql(sql, Collections.emptyMap());
}

public List<QueryMetadata> sql(final String sql, final Map<String, Object> overriddenProperties) {
public List<QueryMetadata> sql(final String sql, final Map<String, ?> overriddenProperties) {
final List<ParsedStatement> statements = ksqlEngine.parse(sql);

final KsqlExecutionContext sandbox = ksqlEngine.createSandbox(ksqlEngine.getServiceContext());
final Map<String, Object> validationOverrides = new HashMap<>(overriddenProperties);
for (ParsedStatement stmt : statements) {
execute(
sandbox,
stmt,
ksqlConfig,
overriddenProperties,
validationOverrides,
injectorFactory.apply(sandbox, sandbox.getServiceContext()));
}

final List<QueryMetadata> queries = new ArrayList<>();
final Injector injector = injectorFactory.apply(ksqlEngine, serviceContext);
final Map<String, Object> executionOverrides = new HashMap<>(overriddenProperties);
for (final ParsedStatement parsed : statements) {
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, injector)
execute(ksqlEngine, parsed, ksqlConfig, executionOverrides, injector)
.getQuery()
.ifPresent(queries::add);
}
Expand Down Expand Up @@ -181,32 +183,42 @@ private static ExecuteResult execute(
final KsqlExecutionContext executionContext,
final ParsedStatement stmt,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final Injector injector) {
final Map<String, Object> mutableSessionPropertyOverrides,
final Injector injector
) {
final PreparedStatement<?> prepared = executionContext.prepare(stmt);
final ConfiguredStatement<?> configured =
injector.inject(ConfiguredStatement.of(prepared, overriddenProperties, ksqlConfig));

final ConfiguredStatement<?> configured = injector.inject(ConfiguredStatement.of(
prepared,
mutableSessionPropertyOverrides,
ksqlConfig
));

final CustomExecutor executor =
CustomExecutors.EXECUTOR_MAP.getOrDefault(
configured.getStatement().getClass(),
executionContext::execute);
(s, props) -> executionContext.execute(s));

return executor.apply(configured);
return executor.apply(configured, mutableSessionPropertyOverrides);
}

@FunctionalInterface
private interface CustomExecutor extends Function<ConfiguredStatement<?>, ExecuteResult> { }
private interface CustomExecutor {
ExecuteResult apply(
ConfiguredStatement<?> statement,
Map<String, Object> mutableSessionPropertyOverrides
);
}

@SuppressWarnings("unchecked")
private enum CustomExecutors {

SET_PROPERTY(SetProperty.class, stmt -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt);
SET_PROPERTY(SetProperty.class, (stmt, props) -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
UNSET_PROPERTY(UnsetProperty.class, stmt -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt);
UNSET_PROPERTY(UnsetProperty.class, (stmt, props) -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
})
;
Expand Down Expand Up @@ -238,9 +250,11 @@ private CustomExecutor getExecutor() {
return this::execute;
}

public ExecuteResult execute(final ConfiguredStatement<?> statement) {
return executor.apply(statement);
public ExecuteResult execute(
final ConfiguredStatement<?> statement,
final Map<String, Object> mutableSessionPropertyOverrides
) {
return executor.apply(statement, mutableSessionPropertyOverrides);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private InsertValuesExecutor(

public void execute(
final ConfiguredStatement<InsertValues> statement,
final Map<String, ?> sessionProperties,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.Map;

public final class PropertyOverrider {

private PropertyOverrider() { }

public static void set(final ConfiguredStatement<SetProperty> statement) {
public static void set(
final ConfiguredStatement<SetProperty> statement,
final Map<String, Object> mutableProperties
) {
final SetProperty setProperty = statement.getStatement();
throwIfInvalidProperty(setProperty.getPropertyName(), statement.getStatementText());
throwIfInvalidPropertyValues(setProperty, statement);
statement.getOverrides().put(setProperty.getPropertyName(), setProperty.getPropertyValue());
mutableProperties.put(setProperty.getPropertyName(), setProperty.getPropertyValue());
}

public static void unset(final ConfiguredStatement<UnsetProperty> statement) {
public static void unset(
final ConfiguredStatement<UnsetProperty> statement,
final Map<String, Object> mutableProperties
) {
final UnsetProperty unsetProperty = statement.getStatement();
throwIfInvalidProperty(unsetProperty.getPropertyName(), statement.getStatementText());
statement.getOverrides().remove(unsetProperty.getPropertyName());
mutableProperties.remove(unsetProperty.getPropertyName());
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_INFERRED") // clone has side-effects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.ksql.statement;

import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -25,6 +29,7 @@
* A prepared statement paired with the configurations needed to fully
* execute it.
*/
@Immutable
public final class ConfiguredStatement<T extends Statement> {

private final PreparedStatement<T> statement;
Expand All @@ -33,20 +38,20 @@ public final class ConfiguredStatement<T extends Statement> {

public static <S extends Statement> ConfiguredStatement<S> of(
final PreparedStatement<S> statement,
final Map<String, Object> overrides,
final Map<String, ?> overrides,
final KsqlConfig config
) {
return new ConfiguredStatement<>(statement, overrides, config);
}

private ConfiguredStatement(
final PreparedStatement<T> statement,
final Map<String, Object> overrides,
final Map<String, ?> overrides,
final KsqlConfig config
) {
this.statement = Objects.requireNonNull(statement, "statement");
this.overrides = Objects.requireNonNull(overrides, "overrides");
this.config = Objects.requireNonNull(config, "config");
this.statement = requireNonNull(statement, "statement");
this.overrides = ImmutableMap.copyOf(requireNonNull(overrides, "overrides"));
this.config = requireNonNull(config, "config");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

package io.confluent.ksql.embedded;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -48,7 +45,6 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -364,43 +360,81 @@ public void shouldInferTopicAfterInferringSchema() {
ksqlContext.sql("Some SQL", SOME_PROPERTIES);

// Then:
verify(ksqlEngine).execute(eq(STMT_1_WITH_TOPIC));
verify(ksqlEngine).execute(STMT_1_WITH_TOPIC);
}

@SuppressWarnings("unchecked")
@Test
public void shouldSetProperty() {
// Given:
final Map<String, Object> properties = new HashMap<>();
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final PreparedStatement<SetProperty> set = PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
);

when(ksqlEngine.prepare(any()))
.thenReturn(
(PreparedStatement) PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")));
.thenReturn((PreparedStatement) set)
.thenReturn(PREPARED_STMT_0);

// When:
ksqlContext.sql("SQL;", properties);
ksqlContext.sql("SQL;", ImmutableMap.of());

// Then:
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
SOME_CONFIG
));
}

@SuppressWarnings("unchecked")
@Test
public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() {
// Given:
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final PreparedStatement<SetProperty> set = PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
);

when(ksqlEngine.prepare(any()))
.thenReturn((PreparedStatement) PREPARED_STMT_0)
.thenReturn(set);

// When:
ksqlContext.sql("SQL;", ImmutableMap.of());

// Then:
assertThat(properties, hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG
));
}

@SuppressWarnings("unchecked")
@Test
public void shouldUnsetProperty() {
// Given:
final Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final Map<String, Object> properties = ImmutableMap
.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final PreparedStatement<UnsetProperty> unset = PreparedStatement.of(
"UNSET SOMETHING",
new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));

when(ksqlEngine.prepare(any()))
.thenReturn(
(PreparedStatement) PreparedStatement.of(
"UNSET SOMETHING",
new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
.thenReturn((PreparedStatement) unset)
.thenReturn(PREPARED_STMT_0);

// When:
ksqlContext.sql("SQL;", properties);

// Then:
assertThat(properties, not(hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")));
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG
));
}
}
Loading