Skip to content

Commit

Permalink
Dependency updates: grpc-spring-boot-starter 2.1.5->2.3.2, grpc 1.11.…
Browse files Browse the repository at this point in the history
…0 -> 1.12.0, reactor-grpc-stub 0.8.1 -> 0.8.2
  • Loading branch information
bsideup committed Jun 2, 2018
1 parent dee3613 commit 20fd7ef
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 93 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ Example code using [Project Reactor](http://projectreactor.io) and [reactive-grp
```java
val stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
stub
.subscribe(Mono.just(
.subscribe(
SubscribeRequest.newBuilder()
.setTopic("user-events")
.setGroup("analytics")
.setAutoOffsetReset(AutoOffsetReset.EARLIEST)
.build()
))
)
.flatMap(reply -> stub
.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build()))
.receive(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build())
.window(1000) // ACK every 1000th records
.concatMap(
batch -> batch
Expand All @@ -76,13 +76,12 @@ stub
.concatMap(record -> Mono.delay(Duration.ofMillis(100)))
.sample(Duration.ofSeconds(5)) // ACK every 5 seconds
.onBackpressureLatest()
.delayUntil(record -> stub.ack(Mono.just(
.delayUntil(record -> stub.ack(
AckRequest.newBuilder()
.setAssignment(reply.getAssignment())
.setOffset(record.getOffset())
.build()
))
),
)),
1
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Mono<Empty> ack(Mono<AckRequest> request) {
ack.getOffset()
));
})
.then(Mono.just(Empty.getDefaultInstance()))
.thenReturn(Empty.getDefaultInstance())
.log("ack", Level.SEVERE, SignalType.ON_ERROR);
}

Expand Down
57 changes: 29 additions & 28 deletions app/src/test/java/com/github/bsideup/liiklus/AckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
Expand All @@ -28,28 +27,29 @@ public void setUpAckTest() throws Exception {

// Will create a topic
stub
.publish(Mono.just(
.publish(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setValue(ByteString.copyFromUtf8("bar"))
.build()
))
)
.block();
}

@Test
public void testManualAck() throws Exception {
Integer partition = stub.subscribe(Mono.just(subscribeRequest))
Integer partition = stub.subscribe(subscribeRequest)
.take(1)
.delayUntil(it -> stub.ack(Mono.just(AckRequest.newBuilder().setAssignment(it.getAssignment()).setOffset(100).build())))
.delayUntil(it -> stub.ack(AckRequest.newBuilder().setAssignment(it.getAssignment()).setOffset(100).build()))
.map(it -> it.getAssignment().getPartition())
.blockFirst(Duration.ofSeconds(30));

Map<Integer, Long> positions = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
.getOffsets(
GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build()
)
.map(GetOffsetsReply::getOffsetsMap)
.block(Duration.ofSeconds(10));
Expand All @@ -61,23 +61,23 @@ public void testManualAck() throws Exception {

@Test
public void testAlwaysLatest() throws Exception {
Integer partition = stub.subscribe(Mono.just(subscribeRequest))
Integer partition = stub.subscribe(subscribeRequest)
.map(SubscribeReply::getAssignment)
.concatMap(assignment ->
stub.ack(Mono.just(AckRequest.newBuilder().setAssignment(assignment).setOffset(10).build()))
.then(stub.ack(Mono.just(AckRequest.newBuilder().setAssignment(assignment).setOffset(200).build())))
.then(stub.ack(Mono.just(AckRequest.newBuilder().setAssignment(assignment).setOffset(100).build())))
.then(Mono.just(assignment))
.delayUntil(assignment ->
stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(10).build())
.then(stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(200).build()))
.then(stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(100).build()))
)
.take(1)
.map(Assignment::getPartition)
.blockFirst(Duration.ofSeconds(10));

Map<Integer, Long> positions = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
.getOffsets(
GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build()
)
.map(GetOffsetsReply::getOffsetsMap)
.block(Duration.ofSeconds(10));
Expand All @@ -94,28 +94,29 @@ public void testInterruption() throws Exception {
ByteString keyBytes = ByteString.copyFromUtf8(key);

Map<String, Integer> receiveStatus = Flux.range(0, 10)
.concatMap(i -> stub.publish(Mono.just(
.concatMap(i -> stub.publish(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setKey(keyBytes)
.setValue(ByteString.copyFromUtf8("foo-" + i))
.build()
)))
))
.thenMany(
Flux
.defer(() -> stub
.subscribe(Mono.just(subscribeRequest))
.subscribe(subscribeRequest)
.filter(it -> it.getAssignment().getPartition() == partition)
.flatMap(it -> stub
.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build()))
.receive(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())
.map(ReceiveReply::getRecord)
.buffer(5)
.delayUntil(batch -> stub
.ack(Mono.just(AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(batch.get(batch.size() - 1).getOffset())
.build()
))
.ack(
AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(batch.get(batch.size() - 1).getOffset())
.build()
)
)
)
.take(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Collection;
Expand All @@ -31,13 +30,13 @@ public void setUpConsumerGroupsTest() throws Exception {
// Will create a topic and initialize every partition
Flux.fromIterable(PARTITION_UNIQUE_KEYS)
.flatMap(key -> stub
.publish(Mono.just(
.publish(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8("bar"))
.build()
))
)
)
.blockLast();
}
Expand All @@ -46,8 +45,8 @@ public void setUpConsumerGroupsTest() throws Exception {
public void testConsumerGroups() {
Map<String, Collection<SubscribeReply>> assignments = Flux
.merge(
stub.subscribe(Mono.just(subscribeRequest)),
stub.subscribe(Mono.just(subscribeRequest))
stub.subscribe(subscribeRequest),
stub.subscribe(subscribeRequest)
)
.distinct(it -> it.getAssignment().getPartition())
.take(NUM_PARTITIONS)
Expand Down
51 changes: 27 additions & 24 deletions app/src/test/java/com/github/bsideup/liiklus/PositionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public void setUpConsumerGroupsTest() throws Exception {
Flux.fromIterable(PARTITION_UNIQUE_KEYS)
.flatMap(key -> Mono
.defer(() -> stub
.publish(Mono.just(
.publish(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8("bar"))
.build()
))
)
)
.repeat(10)
)
Expand All @@ -63,22 +63,22 @@ public void testExternalPositions() {
}

ReceiveReply reply = stub
.subscribe(Mono.just(subscribeRequest))
.subscribe(subscribeRequest)
.map(SubscribeReply::getAssignment)
.filter(it -> it.getPartition() == topicPartition.partition())
.flatMap(assignment -> stub
.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(assignment).build()))
.delayUntil(it -> stub.ack(Mono.just(AckRequest.newBuilder().setAssignment(assignment).setOffset(it.getRecord().getOffset()).build())))
.receive(ReceiveRequest.newBuilder().setAssignment(assignment).build())
.delayUntil(it -> stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(it.getRecord().getOffset()).build()))
)
.blockFirst(Duration.ofSeconds(10));

assertThat(reply.getRecord().getOffset())
.isEqualTo(5);

reply = stub
.subscribe(Mono.just(subscribeRequest))
.subscribe(subscribeRequest)
.filter(it -> it.getAssignment().getPartition() == topicPartition.partition())
.flatMap(it -> stub.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())))
.flatMap(it -> stub.receive(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build()))
.blockFirst(Duration.ofSeconds(10));

assertThat(reply.getRecord().getOffset())
Expand All @@ -90,38 +90,40 @@ public void testGetOffsets() throws Exception {
val key = UUID.randomUUID().toString();
val partition = getPartitionByKey(key);

val publishReply = stub.publish(Mono.just(
val publishReply = stub.publish(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8("bar"))
.build()
)).block(Duration.ofSeconds(10));
).block(Duration.ofSeconds(10));

assertThat(publishReply)
.hasFieldOrPropertyWithValue("partition", partition);

val reportedOffset = publishReply.getOffset();

stub
.subscribe(Mono.just(subscribeRequest))
.subscribe(subscribeRequest)
.filter(it -> it.getAssignment().getPartition() == partition)
.flatMap(it -> stub.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build()))
.flatMap(it -> stub.receive(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())
.map(ReceiveReply::getRecord)
.filter(record -> key.equals(record.getKey().toStringUtf8()))
.delayUntil(record -> stub.ack(Mono.just(AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(record.getOffset())
.build()
)))
.delayUntil(record -> stub.ack(
AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(record.getOffset())
.build()
))
)
.blockFirst(Duration.ofSeconds(10));

val getOffsetsReply = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
.getOffsets(
GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build()
)
.block(Duration.ofSeconds(10));

Expand All @@ -132,10 +134,11 @@ public void testGetOffsets() throws Exception {
@Test
public void testGetEmptyOffsets() throws Exception {
val getOffsetsReply = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(UUID.randomUUID().toString())
.build())
.getOffsets(
GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(UUID.randomUUID().toString())
.build()
)
.block(Duration.ofSeconds(10));

Expand Down
21 changes: 10 additions & 11 deletions app/src/test/java/com/github/bsideup/liiklus/SmokeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.assertj.core.api.Condition;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.time.Duration;
Expand All @@ -33,20 +32,20 @@ public void testPublishSubscribe() throws Exception {
String key = "foo";
List<String> values = IntStream.range(0, 10).mapToObj(i -> "bar-" + i).collect(Collectors.toList());
List<ReceiveReply> records = Flux.fromIterable(values)
.concatMap(it -> stub.publish(Mono.just(PublishRequest.newBuilder()
.setTopic(subscribeAction.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(it))
.build()
)))
.concatMap(it -> stub.publish(
PublishRequest.newBuilder()
.setTopic(subscribeAction.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(it))
.build()
))
.thenMany(
stub.subscribe(Mono.just(subscribeAction))
.flatMap(it -> stub.receive(Mono.just(
stub.subscribe(subscribeAction)
.flatMap(it -> stub.receive(
ReceiveRequest.newBuilder()
.setAssignment(it.getAssignment())
.build()
))
)
))
)
.take(values.size())
.collectList()
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ configure(subprojects.findAll { !it.name.startsWith("examples/") }) {
}

dependencies {
dependency 'org.lognet:grpc-spring-boot-starter:2.1.5'
dependency 'org.lognet:grpc-spring-boot-starter:2.3.2'

dependencySet(group: 'com.google.protobuf', version: '3.5.1') {
entry 'protoc'
Expand All @@ -43,7 +43,7 @@ configure(subprojects.findAll { !it.name.startsWith("examples/") }) {

dependency 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'

dependencySet(group: 'io.grpc', version: '1.11.0') {
dependencySet(group: 'io.grpc', version: '1.12.0') {
entry 'grpc-netty'
entry 'grpc-core'
entry 'grpc-services'
Expand All @@ -52,7 +52,7 @@ configure(subprojects.findAll { !it.name.startsWith("examples/") }) {
entry 'protoc-gen-grpc-java'
}

dependency 'com.salesforce.servicelibs:reactor-grpc-stub:0.8.1'
dependency 'com.salesforce.servicelibs:reactor-grpc-stub:0.8.2'

dependency 'org.awaitility:awaitility:3.0.0'
}
Expand Down
Loading

0 comments on commit 20fd7ef

Please sign in to comment.