Skip to content

Commit

Permalink
[FLINK-25771][connectors][Cassandra][test] Raise all read/write/misce…
Browse files Browse the repository at this point in the history
…llaneous requests timeouts
  • Loading branch information
echauchot authored and fapaul committed Jan 31, 2022
1 parent c5483c6 commit 9d44bc0
Showing 1 changed file with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -81,6 +86,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.testcontainers.containers.output.Slf4jLogConsumer;

import scala.collection.JavaConverters;
import scala.collection.Seq;

Expand All @@ -97,15 +104,17 @@ public class CassandraConnectorITCase
Tuple3<String, Integer, Integer>,
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {

@ClassRule
public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();

private static final int MAX_CONNECTION_RETRY = 3;
private static final long CONNECTION_RETRY_DELAY = 500L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
private static final String TABLE_POJO = "test";
private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@ClassRule
public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
@Rule public final RetryRule retryRule = new RetryRule();

private static final int PORT = 9042;
Expand Down Expand Up @@ -199,11 +208,13 @@ protected Cluster buildCluster(Cluster.Builder builder) {
public static CassandraContainer createCassandraContainer() {
CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_3);
cassandra.withJmxReporting(false);
cassandra.withLogConsumer(LOG_CONSUMER);
return cassandra;
}

@BeforeClass
public static void startAndInitializeCassandra() {
raiseCassandraRequestsTimeouts();
// CASSANDRA_CONTAINER#start() already contains retrials
CASSANDRA_CONTAINER.start();
cluster = CASSANDRA_CONTAINER.getCluster();
Expand Down Expand Up @@ -237,6 +248,30 @@ public static void startAndInitializeCassandra() {
CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
}

private static void raiseCassandraRequestsTimeouts() {
try {
final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
CASSANDRA_CONTAINER.copyFileFromContainer(
"/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
String configuration =
new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
String patchedConfiguration =
configuration
.replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
.replaceAll(
"read_request_timeout_in_ms: [0-9]+",
"read_request_timeout_in_ms: 15000")
.replaceAll(
"write_request_timeout_in_ms: [0-9]+",
"write_request_timeout_in_ms: 6000");
Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
CASSANDRA_CONTAINER.withConfigurationOverride(
configurationPath.toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException("Unable to open Cassandra configuration file ", e);
}
}

@Before
public void createTable() {
tableID = random.nextInt(Integer.MAX_VALUE);
Expand Down

0 comments on commit 9d44bc0

Please sign in to comment.