Skip to content

Commit

Permalink
fix: improve join predictability by increasing max task idle (conflue…
Browse files Browse the repository at this point in the history
…ntinc#5594)

* fix: improve join predictability by increasing max task idle

improvement for: confluentinc#5537

This change increases the Stream's `max.task.idle.ms` from its default `0` to `500`ms, which should provide a better out-of-the-box join experience for anyone starting a join with a large backlog of messages, i.e. a backlog large enough to mean the broker may only return records for one side of the join. See confluentinc#5537 for more info.


Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates committed Jul 29, 2020
1 parent b39763b commit c643d8f
Show file tree
Hide file tree
Showing 27 changed files with 118 additions and 70 deletions.
5 changes: 5 additions & 0 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ bootstrap.servers=localhost:9092
# Enable snappy compression for the Kafka producers
compression.type=snappy

# By default, wait up to 500 millis to allow a second poll to Kafka if one side of a join is
# exhausted. This improves join predictability.
# Increasing this setting improves join predictability at the cost of increased end-to-end latency.
# ksql.streams.max.task.idle.ms=?

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties
# ksql.connect.configs.topic=ksql-connect-configs
Expand Down
5 changes: 5 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ bootstrap.servers=localhost:9092
# Enable snappy compression for the Kafka producers
compression.type=snappy

# By default, wait up to 500 millis to allow a second poll to Kafka if one side of a join is
# exhausted. This improves join predictability.
# Increasing this setting improves join predictability at the cost of increased end-to-end latency.
# ksql.streams.max.task.idle.ms=?

#------ Connect -------

# uncomment the below to start an embedded Connect worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.InsertsPublisher;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.OutputRefinement;

import java.util.Objects;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_SUPPRESS_ENABLED_DOC =
"Feature flag for suppression, specifically EMIT FINAL";

// Defaults for config NOT defined by this class's ConfigDef:
static final ImmutableMap<String, ?> NON_KSQL_DEFAULTS = ImmutableMap
.<String, Object>builder()
// Improve join predictability by generally allowing a second poll to ensure both sizes
// of a join have data. See https://github.com/confluentinc/ksql/issues/5537.
.put(KSQL_STREAMS_PREFIX + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L)
.build();

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -792,7 +800,7 @@ public KsqlConfig(final Map<?, ?> props) {
}

private KsqlConfig(final ConfigGeneration generation, final Map<?, ?> props) {
super(configDef(generation), props);
super(configDef(generation), addNonKsqlDefaults(props));

final Map<String, Object> streamsConfigDefaults = new HashMap<>();
streamsConfigDefaults.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KsqlConstants
Expand Down Expand Up @@ -821,6 +829,12 @@ private KsqlConfig(final ConfigGeneration generation, final Map<?, ?> props) {
this.ksqlStreamConfigProps = buildStreamingConfig(streamsConfigDefaults, originals());
}

private static Map<?, ?> addNonKsqlDefaults(final Map<?, ?> props) {
final Map<Object, Object> withDefaults = new HashMap<>(props);
NON_KSQL_DEFAULTS.forEach(withDefaults::putIfAbsent);
return withDefaults;
}

private boolean getBooleanConfig(final String config, final boolean defaultValue) {
final Object value = originals().get(config);
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import java.util.Optional;

import io.confluent.ksql.parser.OutputRefinement;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,13 @@ public void shouldHaveCorrectOriginalsAfterCloneWithOverwrite() {
));

// Then:
assertThat(cloned.originals(), is(ImmutableMap.of(
KsqlConfig.KSQL_SERVICE_ID_CONFIG, "overridden-id",
KsqlConfig.KSQL_WRAP_SINGLE_VALUES, "true",
KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "bob"
)));
assertThat(cloned.originals(), is(ImmutableMap.builder()
.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "overridden-id")
.put(KsqlConfig.KSQL_WRAP_SINGLE_VALUES, "true")
.put(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "bob")
.putAll(KsqlConfig.NON_KSQL_DEFAULTS)
.build()
));
}

@Test
Expand Down Expand Up @@ -575,4 +577,50 @@ public void shouldFilterProducerConfigs() {
assertThat(ksqlConfig.getProducerClientConfigProps(), hasEntry(ProducerConfig.CLIENT_ID_CONFIG, null));
assertThat(ksqlConfig.getProducerClientConfigProps(), not(hasKey("not.a.config")));
}

@Test
public void shouldDefaultStreamsMinTaskIdleConfig() {
// When:
final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of());

// Then:
assertThat(
ksqlConfig.getKsqlStreamConfigProps(),
hasEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L)
);
}

@Test
public void shouldUsePrefixedStreamsMinTaskIdleConfig() {
// Given:
final ImmutableMap<String, ?> props = ImmutableMap.of(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L
);

// When:
final KsqlConfig ksqlConfig = new KsqlConfig(props);

// Then:
assertThat(
ksqlConfig.getKsqlStreamConfigProps(),
hasEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L)
);
}

@Test
public void shouldUseNonPrefixedStreamsMinTaskIdleConfig() {
// Given:
final ImmutableMap<String, ?> props = ImmutableMap.of(
StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 1000L
);

// When:
final KsqlConfig ksqlConfig = new KsqlConfig(props);

// Then:
assertThat(
ksqlConfig.getKsqlStreamConfigProps(),
hasEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 1000L)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.util.KsqlException;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

package io.confluent.ksql.analyzer;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.windows.HoppingWindowExpression;
Expand All @@ -27,23 +33,15 @@
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.function.BiFunction;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import java.util.Optional;
import java.util.function.BiFunction;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

public class RewrittenAnalysisTest {

@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.serde.RefinementInfo;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.BiFunction;

import io.confluent.ksql.serde.RefinementInfo;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.INNER;
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.LEFT;
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.OUTER;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE_FORCE_CHANGELOG;
import static io.confluent.ksql.planner.plan.PlanTestUtil.getNodeByName;
import static java.util.Optional.empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.serde.RefinementInfo;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import io.confluent.ksql.util.KsqlConfig;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class TestExecutor implements Closeable {
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)
.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L)
.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "some.ksql.service.id")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.serde.RefinementInfo;

import java.util.Optional;
import java.util.OptionalInt;

import org.junit.Test;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;

import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,25 @@

package io.confluent.ksql.rest.server;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.Pair;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class BackupReplayFileTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@

package io.confluent.ksql.rest.server;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import com.google.common.base.Ticker;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -38,13 +35,14 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class CommandTopicBackupImplTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.QueuedCommand;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
Expand Down
Loading

0 comments on commit c643d8f

Please sign in to comment.