Skip to content

Commit

Permalink
Add producer support for Kafka 0.11 Records
Browse files Browse the repository at this point in the history
This changeset introduces support for producing messages in the new
Kafka 0.11 Record format (including attaching headers).
It doesn't support transactions or idempotent messages.
  • Loading branch information
vlad-arista committed Nov 2, 2017
1 parent 5a79f9f commit 5fd60c2
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 80 deletions.
23 changes: 20 additions & 3 deletions async_producer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"encoding/binary"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -119,6 +120,10 @@ type ProducerMessage struct {
// StringEncoder and ByteEncoder.
Value Encoder

// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []RecordHeader

// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
Expand Down Expand Up @@ -146,8 +151,16 @@ type ProducerMessage struct {

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.

func (m *ProducerMessage) byteSize() int {
size := producerMessageOverhead
func (m *ProducerMessage) byteSize(version int) int {
var size int
if version >= 2 {
size = maximumRecordOverhead
for _, h := range m.Headers {
size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
}
} else {
size = producerMessageOverhead
}
if m.Key != nil {
size += m.Key.Length()
}
Expand Down Expand Up @@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{}
for topic, partitions := range req.msgSets {
for topic, partitions := range req.records {
for partition := range partitions {
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
}
Expand Down
152 changes: 100 additions & 52 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,71 @@ const (
)

type ProduceRequest struct {
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
msgSets map[string]map[int32]*MessageSet
TransactionalID *string
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
records map[string]map[int32]Records
}

func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
var topicRecordCount int64
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
}
return topicRecordCount
}

func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
if recordBatch.compressedRecords != nil {
compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
compressionRatioMetric.Update(compressionRatio)
topicCompressionRatioMetric.Update(compressionRatio)
}

return int64(len(recordBatch.Records))
}

func (r *ProduceRequest) encode(pe packetEncoder) error {
if r.Version >= 3 {
if err := pe.putNullableString(r.TransactionalID); err != nil {
return err
}
}
pe.putInt16(int16(r.RequiredAcks))
pe.putInt32(r.Timeout)
err := pe.putArrayLength(len(r.msgSets))
if err != nil {
return err
}
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
if metricRegistry != nil {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}

totalRecordCount := int64(0)
for topic, partitions := range r.msgSets {

err := pe.putArrayLength(len(r.records))
if err != nil {
return err
}

for topic, partitions := range r.records {
err = pe.putString(topic)
if err != nil {
return err
Expand All @@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, msgSet := range partitions {
for id, records := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
err = records.encode(pe)
if err != nil {
return err
}
Expand All @@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
return err
}
if metricRegistry != nil {
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
if r.Version >= 3 {
topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
} else {
topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
Expand All @@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
}

func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
r.Version = version

if version >= 3 {
id, err := pd.getNullableString()
if err != nil {
return err
}
r.TransactionalID = id
}
requiredAcks, err := pd.getInt16()
if err != nil {
return err
Expand All @@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if topicCount == 0 {
return nil
}
r.msgSets = make(map[string]map[int32]*MessageSet)

r.records = make(map[string]map[int32]Records)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
Expand All @@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if err != nil {
return err
}
r.msgSets[topic] = make(map[int32]*MessageSet)
r.records[topic] = make(map[int32]Records)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
messageSetSize, err := pd.getInt32()
size, err := pd.getInt32()
if err != nil {
return err
}
msgSetDecoder, err := pd.getSubset(int(messageSetSize))
recordsDecoder, err := pd.getSubset(int(size))
if err != nil {
return err
}
msgSet := &MessageSet{}
err = msgSet.decode(msgSetDecoder)
if err != nil {
var records Records
if version >= 3 {
records = newDefaultRecords(nil)
} else {
records = newLegacyRecords(nil)
}
if err := records.decode(recordsDecoder); err != nil {
return err
}
r.msgSets[topic][partition] = msgSet
r.records[topic][partition] = records
}
}

return nil
}

Expand All @@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return minVersion
}
}

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
if r.records == nil {
r.records = make(map[string]map[int32]Records)
}

if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
if r.records[topic] == nil {
r.records[topic] = make(map[int32]Records)
}
}

set := r.msgSets[topic][partition]
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
r.ensureRecords(topic, partition)
set := r.records[topic][partition].msgSet

if set == nil {
set = new(MessageSet)
r.msgSets[topic][partition] = set
r.records[topic][partition] = newLegacyRecords(set)
}

set.addMessage(msg)
}

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
}

if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
}
r.ensureRecords(topic, partition)
r.records[topic][partition] = newLegacyRecords(set)
}

r.msgSets[topic][partition] = set
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
r.ensureRecords(topic, partition)
r.records[topic][partition] = newDefaultRecords(batch)
}
56 changes: 56 additions & 0 deletions produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"testing"
"time"
)

var (
Expand Down Expand Up @@ -32,6 +33,41 @@ var (
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

produceRequestOneRecord = []byte{
0xFF, 0xFF, // Transaction ID
0x01, 0x23, // Required Acks
0x00, 0x00, 0x04, 0x44, // Timeout
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0xAD, // Partition
0x00, 0x00, 0x00, 0x52, // Records length
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x46,
0x00, 0x00, 0x00, 0x00,
0x02,
0x54, 0x79, 0x61, 0xFD,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// record
0x28,
0x00,
0x0A,
0x00,
0x08, 0x01, 0x02, 0x03, 0x04,
0x06, 0x05, 0x06, 0x07,
0x02,
0x06, 0x08, 0x09, 0x0A,
0x04, 0x0B, 0x0C,
}
)

func TestProduceRequest(t *testing.T) {
Expand All @@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) {

request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
testRequest(t, "one message", request, produceRequestOneMessage)

request.Version = 3
batch := &RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{0x01, 0x02, 0x03, 0x04},
Value: []byte{0x05, 0x06, 0x07},
Headers: []*RecordHeader{{
Key: []byte{0x08, 0x09, 0x0A},
Value: []byte{0x0B, 0x0C},
}},
}},
}
request.AddBatch("topic", 0xAD, batch)
packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
batch.Records[0].length.startOffset = 0
testRequestDecode(t, "one record", request, packet)
}
2 changes: 2 additions & 0 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return minVersion
}
Expand Down
Loading

0 comments on commit 5fd60c2

Please sign in to comment.