Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken error handling around Idempotent producer + Ensure strict ordering when Net.MaxOpenRequests = 1 #2943

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

richardartoul
Copy link
Contributor

There have been numerous issues filed lately that:

  1. Strict ordering no longer works after request pipelining was introduced even when Net.MaxOpenRequests is set to 1.
  2. Requests can still end up failing / mis-sequenced when Idempotent producer is enabled.

I was able to reproduce these issues both locally and in a staging environment and have fixed both of them. P.R has many comments explaining the changes.

Relevant issues:

  1. AsyncProducer produces messages in out-of-order when retries happen #2619
  2. Sarama Async Producer Encounters 'Out of Order' Error: what are the reasons? #2803
  3. Does sarama still guarantee message ordering? #2860

Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
@richardartoul
Copy link
Contributor Author

richardartoul commented Jul 20, 2024

Hmm... I think it may still be possible for concurrent requests to be issued sendResponse writes to a channel, and once the response is picked up off that channel there could be an inflight request from the next request + an inflight retry actually. I'm not sure how to resolve that other then somehow "waiting" for a response to be processed (either failing as an error or being retried)

Copy link
Contributor

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m somewhat concerned by the number of panic()s in the code. Are these really so serious that they need to potentially crash the whole program? and/or kill off a processing goroutine, without any information up-the-chain that processing has terminated?

@@ -249,6 +250,19 @@ func (pe ProducerError) Unwrap() error {
type ProducerErrors []*ProducerError

func (pe ProducerErrors) Error() string {
if len(pe) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this ever actually be produced with zero messages?

If it’s unlikely to ever happen with len(pe) == 0 then that should be the guard condition, and then the complex error message should be the unindented path.

@@ -695,6 +709,9 @@ func (pp *partitionProducer) dispatch() {
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
if msg.hasSequence {
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://go.dev/wiki/CodeReviewComments#dont-panic

Is the condition here so bad that we need to panic? (That is, is it entirely unrecoverable?)

Comment on lines +89 to +103
Logger.Println(
"assertion failed: message out of sequence added to batch",
"producer_id",
ps.producerID,
set.recordsToSend.RecordBatch.ProducerID,
"producer_epoch",
ps.producerEpoch,
set.recordsToSend.RecordBatch.ProducerEpoch,
"sequence_number",
msg.sequenceNumber,
set.recordsToSend.RecordBatch.FirstSequence,
"buffer_count",
ps.bufferCount,
"msg_has_sequence",
msg.hasSequence)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend leaving the log message on the same line as the method call, so that it’s easily findable via grep, and otherwise one-line isolates well.

Additionally, if the line is long enough to break up, then the possibility of adding even more fields is high, so each entry should end with a comma and a newline, so adding new fiels to the end of the call don’t produce unnecessary line changes, where the only change is in punctuation due to syntax requirements.

Then I would pair up each log field name with the log field value, all together:

			Logger.Println("assertion failed: message out of sequence added to batch",
				"producer_id", ps.producerID, set.recordsToSend.RecordBatch.ProducerID,
				"producer_epoch", ps.producerEpoch, set.recordsToSend.RecordBatch.ProducerEpoch,
				"sequence_number", msg.sequenceNumber, set.recordsToSend.RecordBatch.FirstSequence,
				"buffer_count", ps.bufferCount,
				"msg_has_sequence", msg.hasSequence,
			)

}

if !succeeded {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, no more retries\n", topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Newlines at the end should be unnecessary for loggers? (I mean, this is generally the case, but I don’t know if that is specifically true here.)

Three % verbs are specified but only two arguments are given.

// as expected. This retry loop is very important since prematurely (and unnecessarily) failing
// an idempotent batch is ~equivalent to data loss.
succeeded := false
for i := 0; i < p.conf.Producer.Retry.Max; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using a different variable name if we’re trying retries/tries rather than indices.

[off-by-one smell] Are we counting retries, or tries? That is, if I’ve asked for 5 retries max, then that’s 6 total tries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants