Skip to content

Commit

Permalink
Merge pull request #280 from pradithya/remove_transaction
Browse files Browse the repository at this point in the history
Remove redis transaction
  • Loading branch information
pradithya aria committed Oct 30, 2019
2 parents c471073 + 74bff10 commit 82569b0
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,16 @@ public static class WriteDoFn extends DoFn<RedisMutation, Void> {
}

public WriteDoFn withBatchSize(int batchSize) {
if (batchSize > 0) this.batchSize = batchSize;
if (batchSize > 0) {
this.batchSize = batchSize;
}
return this;
}

public WriteDoFn withTimeout(int timeout) {
if (timeout > 0) this.timeout = timeout;
if (timeout > 0) {
this.timeout = timeout;
}
return this;
}

Expand All @@ -155,7 +159,6 @@ public void setup() {
@StartBundle
public void startBundle() {
pipeline = jedis.pipelined();
pipeline.multi();
batchCount = 0;
}

Expand All @@ -168,9 +171,7 @@ public void processElement(ProcessContext context) {
}
batchCount++;
if (batchCount >= batchSize) {
pipeline.exec();
pipeline.sync();
pipeline.multi();
batchCount = 0;
}
}
Expand All @@ -197,10 +198,7 @@ private Response<?> writeRecord(RedisMutation mutation) {

@FinishBundle
public void finishBundle() {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
}
pipeline.sync();
batchCount = 0;
}

Expand Down

0 comments on commit 82569b0

Please sign in to comment.