Skip to content

Commit

Permalink
Idempotent Producer - batch retry logic
Browse files Browse the repository at this point in the history
Unit test with mock broker

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
edoardocomar and mimaison committed Oct 1, 2018
1 parent 8e2b04b commit 4e5d477
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 143 deletions.
129 changes: 86 additions & 43 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}

Logger.Printf("Obtained a ProducerId: %d epoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
}

return txnmgr, nil
Expand All @@ -100,8 +100,8 @@ type asyncProducer struct {
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
brokerLock sync.Mutex

txnmgr *transactionManager
Expand Down Expand Up @@ -142,8 +142,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}

Expand Down Expand Up @@ -381,9 +381,9 @@ func (tp *topicProducer) dispatch() {
continue
}
}
// All messages being retried (sent or not) have already had their retry count updated
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
//Logger.Printf("Message %s for TP %s-%d got sequence number: %d\n", msg.Value, msg.Topic, msg.Partition, msg.sequenceNumber)
}

handler := tp.handlers[msg.Partition]
Expand Down Expand Up @@ -451,9 +451,9 @@ type partitionProducer struct {
partition int32
input <-chan *ProducerMessage

leader *Broker
breaker *breaker.Breaker
output chan<- *ProducerMessage
leader *Broker
breaker *breaker.Breaker
brokerProducer *brokerProducer

// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
Expand Down Expand Up @@ -488,9 +488,9 @@ func (pp *partitionProducer) dispatch() {
// on the first message
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}

for msg := range pp.input {
Expand Down Expand Up @@ -522,7 +522,7 @@ func (pp *partitionProducer) dispatch() {
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
// without breaking any of our ordering guarantees

if pp.output == nil {
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
Expand All @@ -531,11 +531,11 @@ func (pp *partitionProducer) dispatch() {
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

pp.output <- msg
pp.brokerProducer.input <- msg
}

if pp.output != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
if pp.brokerProducer != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
}
}

Expand All @@ -547,20 +547,20 @@ func (pp *partitionProducer) newHighWatermark(hwm int) {
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
pp.retryState[pp.highWatermark].expectChaser = true
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}

// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
pp.output = nil
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
}

func (pp *partitionProducer) flushRetryBuffers() {
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
for {
pp.highWatermark--

if pp.output == nil {
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
goto flushDone
Expand All @@ -569,7 +569,7 @@ func (pp *partitionProducer) flushRetryBuffers() {
}

for _, msg := range pp.retryState[pp.highWatermark].buf {
pp.output <- msg
pp.brokerProducer.input <- msg
}

flushDone:
Expand All @@ -594,16 +594,16 @@ func (pp *partitionProducer) updateLeader() error {
return err
}

pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}

return nil
})
}

// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
Expand Down Expand Up @@ -637,7 +637,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
close(responses)
})

return input
return bp
}

type brokerProducerResponse struct {
Expand All @@ -652,7 +652,7 @@ type brokerProducer struct {
parent *asyncProducer
broker *Broker

input <-chan *ProducerMessage
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse

Expand Down Expand Up @@ -797,62 +797,105 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
// we iterate through the blocks in the request set, not the response, so that we notice
// if the response is missing a block completely
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
bp.parent.returnSuccesses(msgs)
bp.parent.returnSuccesses(pSet.msgs)
return
}

block := response.GetBlock(topic, partition)
if block == nil {
bp.parent.returnErrors(msgs, ErrIncompleteResponse)
bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
return
}
fmt.Printf("response has error %v", block.Err)

switch block.Err {
// Success
case ErrNoError:
if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
for _, msg := range msgs {
for _, msg := range pSet.msgs {
msg.Timestamp = block.Timestamp
}
}
for i, msg := range msgs {
for i, msg := range pSet.msgs {
msg.Offset = block.Offset + int64(i)
}
bp.parent.returnSuccesses(msgs)
bp.parent.returnSuccesses(pSet.msgs)
// Duplicate
case ErrDuplicateSequenceNumber:
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
bp.parent.retryMessages(msgs, block.Err)
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
bp.parent.retryBatch(topic, partition, pSet, block.Err)

// Other non-retriable errors
default:
bp.parent.returnErrors(msgs, block.Err)
bp.parent.returnErrors(pSet.msgs, block.Err)
}
})
}

func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
produceSet := newProduceSet(p)
produceSet.msgs[topic] = make(map[int32]*partitionSet)
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, kerr)
return
}
msg.retries++
}
// extremely pessimistic strategy - refreshing metadata for every batch retried. Should be improved
err := p.client.RefreshMetadata(topic)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while refreshing metadata\n", topic, partition, err)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
switch err.(type) {
case PacketEncodingError:
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.returnErrors(msgs, err)
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.returnErrors(pSet.msgs, err)
})
default:
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
})
bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
})
bp.rollOver()
}
Expand Down Expand Up @@ -949,7 +992,7 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

Expand All @@ -966,13 +1009,13 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessag
return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

p.brokerRefs[bp]--
if p.brokerRefs[bp] == 0 {
close(bp)
close(bp.input)
delete(p.brokerRefs, bp)

if p.brokers[broker] == bp {
Expand Down
Loading

0 comments on commit 4e5d477

Please sign in to comment.