Skip to content

Commit

Permalink
add a stateless (sessionId-less) version of ACK (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Sep 28, 2018
1 parent 5faeceb commit c38ea6f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,31 @@ public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
public Mono<Empty> ack(Mono<AckRequest> request) {
return request
.flatMap(ack -> {
val subscription = subscriptions.get(ack.getAssignment().getSessionId());
String topic;
GroupId groupId;
int partition;

if (subscription == null) {
log.warn("Subscription is null, returning empty Publisher. Request: {}", ack.toString().replace("\n", "\\n"));
return Mono.empty();
if (ack.hasAssignment()) {
val subscription = subscriptions.get(ack.getAssignment().getSessionId());

if (subscription == null) {
log.warn("Subscription is null, returning empty Publisher. Request: {}", ack.toString().replace("\n", "\\n"));
return Mono.empty();
}

topic = subscription.getTopic();
groupId = subscription.getGroupId();
partition = ack.getAssignment().getPartition();
} else {
topic = ack.getTopic();
groupId = GroupId.of(ack.getGroup(), ack.getGroupVersion());
partition = ack.getPartition();
}

return Mono.fromCompletionStage(positionsStorage.update(
subscription.getTopic(),
subscription.getGroupId(),
ack.getAssignment().getPartition(),
topic,
groupId,
partition,
ack.getOffset()
));
})
Expand Down
30 changes: 30 additions & 0 deletions app/src/test/java/com/github/bsideup/liiklus/AckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,36 @@ public void testManualAck() throws Exception {
.containsEntry(partition, 100L);
}

@Test
public void testStatelessAck() throws Exception {
int partition = 1;
int groupVersion = 1;
AckRequest ackRequest = AckRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.setGroupVersion(groupVersion)
.setPartition(partition)
.setOffset(100)
.build();

stub.ack(ackRequest).block(Duration.ofSeconds(10));

Map<Integer, Long> positions = stub
.getOffsets(
GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.setGroupVersion(groupVersion)
.build()
)
.map(GetOffsetsReply::getOffsetsMap)
.block(Duration.ofSeconds(10));

assertThat(positions)
.isNotNull()
.containsEntry(partition, 100L);
}

@Test
public void testAlwaysLatest() throws Exception {
Integer partition = stub.subscribe(subscribeRequest)
Expand Down
25 changes: 11 additions & 14 deletions app/src/test/java/com/github/bsideup/liiklus/GroupVersionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class GroupVersionTest extends AbstractIntegrationTest {

private static final int PARTITION = 0;
private static final int PARTITION = 1;

public static final int NUM_OF_RECORDS_PER_PARTITION = 10;

Expand Down Expand Up @@ -133,18 +133,15 @@ private void ackOffset(int groupVersion, long offset) {
}

private void ackOffset(String groupName, int groupVersion, long offset) {
stub
.subscribe(
SubscribeRequest.newBuilder()
.setTopic(topic)
.setGroup(groupName)
.setGroupVersion(groupVersion)
.build()
)
.map(SubscribeReply::getAssignment)
.filter(it -> it.getPartition() == 0)
.delayUntil(assignment -> stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(offset).build()))
.blockFirst(Duration.ofSeconds(10));
val ackRequest = AckRequest.newBuilder()
.setTopic(topic)
.setGroup(groupName)
.setGroupVersion(groupVersion)
.setOffset(offset)
.setPartition(PARTITION)
.build();

stub.ack(ackRequest).block(Duration.ofSeconds(10));
}

private List<Record> getAllRecords(Integer groupVersion) {
Expand All @@ -168,7 +165,7 @@ private Flux<Record> getRecords(String groupName, Optional<Integer> groupVersion
.build()
)
.map(SubscribeReply::getAssignment)
.filter(it -> it.getPartition() == 0)
.filter(it -> it.getPartition() == PARTITION)
.flatMap(assignment -> stub.receive(ReceiveRequest.newBuilder().setAssignment(assignment).build()))
.map(ReceiveReply::getRecord);
}
Expand Down
7 changes: 6 additions & 1 deletion protocol/src/main/proto/LiiklusService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ message SubscribeReply {
}

message AckRequest {
Assignment assignment = 1;
Assignment assignment = 1 [deprecated=true];

string topic = 3;
string group = 4;
uint32 groupVersion = 5;
uint32 partition = 6;

uint64 offset = 2;
}
Expand Down

0 comments on commit c38ea6f

Please sign in to comment.