The reference Java client that allows query and write for the InfluxDB 2.0 by a reactive way.
- Querying data using Flux language
- Writing data using
- Line Protocol
- Data Point
- POJO
- Advanced Usage
For querying data we use QueryReactiveApi that use Reactive-Streams Pattern for streaming query results and also support query raw response.
The following example demonstrates querying using the Flux language:
package example;
import com.influxdb.client.reactive.InfluxDBClientReactive;
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
import com.influxdb.client.reactive.QueryReactiveApi;
public class InfluxDB2ReactiveExample {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
queryApi
.query(flux, "my-org")
//
// Filter records by measurement name
//
.filter(it -> "temperature".equals(it.getMeasurement()))
//
// Take first 10 records
//
.take(10)
.subscribe(fluxRecord -> {
//
// The callback to consume a FluxRecord.
//
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
});
influxDBClient.close();
}
}
The Raw query allows direct processing original CSV response:
package example;
import com.influxdb.client.reactive.InfluxDBClientReactive;
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
import com.influxdb.client.reactive.QueryReactiveApi;
public class InfluxDB2ReactiveExampleRaw {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
queryApi
.queryRaw(flux, "my-org")
//
// Take first 10 records
//
.take(10)
.subscribe(line -> {
//
// The callback to consume a line of CSV response
//
System.out.println("Response: " + line);
});
influxDBClient.close();
}
}
The mapping result to POJO is also supported:
package example;
import java.time.Instant;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.reactive.InfluxDBClientReactive;
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
import com.influxdb.client.reactive.QueryReactiveApi;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
public class InfluxDB2ReactiveExamplePojo {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"temperature\")";
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
queryApi
.query(flux, "my-org", Temperature.class)
//
// Take first 10 records
//
.take(10)
.subscribe(temperature -> {
//
// The callback to consume a FluxRecord mapped to POJO.
//
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
});
influxDBClient.close();
}
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
For writing data we use WriteReactiveApi that supports same configuration as non reactive client:
- writing data using Line Protocol, Data Point, POJO
- use batching for writes
- use client backpressure strategy
- produces events that allow user to be notified and react to this events
WriteSuccessEvent
- published when arrived the success response from Platform serverBackpressureEvent
- published when is client backpressure appliedWriteErrorEvent
- published when occurs a unhandled exceptionWriteRetriableErrorEvent
- published when occurs a retriable error
- use GZIP compression for data
The following example demonstrates how to write measurements every 10 seconds:
package example;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.reactive.InfluxDBClientReactive;
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
import com.influxdb.client.reactive.QueryReactiveApi;
import com.influxdb.client.reactive.WriteReactiveApi;
import io.reactivex.Flowable;
public class InfluxDB2ReactiveExampleWriteEveryTenSeconds {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) throws InterruptedException {
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token);
//
// Write data
//
WriteReactiveApi writeApi = influxDBClient.getWriteReactiveApi();
Flowable<Temperature> measurements = Flowable.interval(10, TimeUnit.SECONDS)
.map(time -> {
Temperature temperature = new Temperature();
temperature.location = getLocation();
temperature.value = getValue();
temperature.time = Instant.now();
return temperature;
});
writeApi.writeMeasurements("my-bucket", "my-org", WritePrecision.NS, measurements);
Thread.sleep(30_000);
influxDBClient.close();
}
private static Double getValue() {
Random r = new Random();
return -20 + 70 * r.nextDouble();
}
private static String getLocation() {
return "Prague";
}
@Measurement(name = "temperature")
private static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
A client can be configured via configuration file. The configuration file has to be named as influx2.properties
and has to be in root of classpath.
The following options are supported:
Property name | default | description |
---|---|---|
influx2.url | - | the url to connect to InfluxDB |
influx2.org | - | default destination organization for writes and queries |
influx2.bucket | - | default destination bucket for writes |
influx2.token | - | the token to use for the authorization |
influx2.logLevel | NONE | rest client verbosity level |
influx2.readTimeout | 10000 ms | read timeout |
influx2.writeTimeout | 10000 ms | write timeout |
influx2.connectTimeout | 10000 ms | socket timeout |
The influx2.readTimeout
, influx2.writeTimeout
and influx2.connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
influx2.url=http://localhost:9999
influx2.org=my-org
influx2.bucket=my-bucket
influx2.token=my-token
influx2.logLevel=BODY
influx2.readTimeout=5s
influx2.writeTimeout=10s
influx2.connectTimeout=5s
and then:
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create();
A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory
.create("http://localhost:9999?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)
The following options are supported:
Property name | default | description |
---|---|---|
org | - | default destination organization for writes and queries |
bucket | - | default destination bucket for writes |
token | - | the token to use for the authorization |
logLevel | NONE | rest client verbosity level |
readTimeout | 10000 ms | read timeout |
writeTimeout | 10000 ms | write timeout |
connectTimeout | 10000 ms | socket timeout |
The readTimeout
, writeTimeout
and connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
InfluxDBClientReactive
does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data's size, you can call:
influxDBClient.enableGzip();
The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that
applying the BODY
LogLevel will disable chunking while streaming and will load the whole response into memory.
influxDBClient.setLogLevel(LogLevel.HEADERS)
Server availability can be checked using the influxDBClient.health()
endpoint.
Construct queries using the flux-dsl query builder
package example;
import java.time.temporal.ChronoUnit;
import com.influxdb.client.reactive.InfluxDBClientReactive;
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
import com.influxdb.client.reactive.QueryReactiveApi;
import com.influxdb.query.dsl.Flux;
import com.influxdb.query.dsl.functions.restriction.Restrictions;
public class InfluxDB2ReactiveExampleDSL {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token);
//
// Query data
//
Flux flux = Flux.from("my-bucket")
.range(-30L, ChronoUnit.MINUTES)
.filter(Restrictions.and(Restrictions.measurement().equal("temperature")));
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
queryApi
.query(flux.toString(), "my-org")
.subscribe(fluxRecord -> {
//
// The callback to consume a FluxRecord.
//
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
});
influxDBClient.close();
}
}
The latest version for Maven dependency:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-reactive</artifactId>
<version>1.6.0</version>
</dependency>
Or when using with Gradle:
dependencies {
compile "com.influxdb:influxdb-client-reactive:1.6.0"
}
The snapshots are deployed into OSS Snapshot repository.
<repository>
<id>ossrh</id>
<name>OSS Snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
repositories {
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
}