Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable support for MQTT Parser in stirling #1756

Merged
merged 13 commits into from
Dec 5, 2023

Conversation

ChinmayaSharma-hue
Copy link
Contributor

@ChinmayaSharma-hue ChinmayaSharma-hue commented Oct 31, 2023

Summary: This PR adds the parser component of MQTT (v5), a newly added protocol.

Related issues: #341

Type of change: /kind feature

Test Plan: Added tests

@ChinmayaSharma-hue ChinmayaSharma-hue changed the title Extend ExtractUVarInt to support customizable kMaxVarintLen64 Enable support for MQTT Parser in stirling Oct 31, 2023
Copy link
Member

@ddelnano ddelnano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChinmayaSharma-hue really appreciate all your hard work on this and very excited to have MQTT support within Pixie! I still haven't made it the whole way through this since I'm new to MQTT. However, I wanted to post the feedback I have so far.

@@ -82,6 +82,7 @@ pl_cc_test(
"ENABLE_NATS_TRACING=true",
"ENABLE_MONGO_TRACING=true",
"ENABLE_AMQP_TRACING=true",
"ENABLE_MQTT_TRACING=true",
],
deps = [
"//src/stirling/bpf_tools/bcc_bpf:headers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should separate out any changes that aren't within the parsing code. This will be easier to review if we only have the following in this PR: protocols/mqtt/{types.h,parse.h,parser.cc} and any related build changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Noted! Will update in the next commit.

Comment on lines 38 to 39
"//src/common/json:cc_library",
"//src/common/zlib:cc_library",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these dependencies needed? I don't see any reference to json parsing or zlib

Copy link
Contributor Author

@ChinmayaSharma-hue ChinmayaSharma-hue Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are not, I just forgot to remove them when I copied the BUILD.bazel file from a different protocol. Will update in the next commit.

Comment on lines 105 to 113
struct State {
bool conn_closed = false;
};

struct StateWrapper {
State global;
std::monostate send;
std::monostate recv;
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share more details on how this state structure will be used? Below NoState is indicated, so I wasn't sure if you see value in using state or if this was accidental.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this in the beginning when I wasn't sure if this was needed, will remove this.

@@ -112,6 +112,9 @@ DEFINE_int32(stirling_enable_mux_tracing,
DEFINE_int32(stirling_enable_amqp_tracing,
gflags::Int32FromEnv("PX_STIRLING_ENABLE_AMQP_TRACING", px::stirling::TraceMode::On),
"If true, stirling will trace and process AMQP messages.");
DEFINE_int32(stirling_enable_mqtt_tracing,
gflags::Int32FromEnv("PX_STIRLING_ENABLE_MQTT_TRACING", px::stirling::TraceMode::On),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should split this out to a later PR, but this should be px::stirling::TraceMode::OnForNewerKernels. Our 4.14 kernel build is close to the max BPF program instruction count and so new protocols can't be enabled wholesale.

ParseState ParseFrame(message_type_t type, std::string_view* buf,
Message* result) {
CTX_DCHECK(type == message_type_t::kRequest || type == message_type_t::kResponse);
if (buf->size() < 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is 2 because we could have a 1 byte control header and 1 byte packet length? It might be easier to make this < 5 so that our remaining code can assume that the entire packet length will be accessible. Otherwise, we need to handle ExtractUVarInt errors differently depending on the context:

  1. Buffer size: 3 bytes, Expected Packet length: 4 bytes -- binary_decoder->ExtraceUVarInt will fail and we need to return kNeedsMoreData
  2. Buffer size: 5 bytes, Packet length payload: bogus UVarInt encoding -- binary_decoder->ExtraceUVarInt will fail and we need to return kInvalid

If we change the logic as I described above, any binary_decoder->ExtraceUVarInt error means that the UVarInt was larger than 4 bytes and is malformed or not MQTT data, so it can always be treated as kInvalid. We still need to check that the decoded value is within kMaxVarint28 that we discussed before, but it should simplify discerning the cases mentioned above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not work to return kNeedMoreData for all cases of ExtractUVarInt errors? If buffer size is 3 bytes and expected packet length is 4 then ExtractUVarInt would return an error and based on this kNeedsMoreData would be returned. I have added another function that checks whether or not the number returned by ExtractUVarInt is over 4 bytes, which would cause the function to return kInvalid. So both the cases would be taken care of in this way right?
Also, I am not sure why it would be easier to change <2 to <5. Wouldn't this eliminate cases where the buffer size is 2? (PINGREQ and PINRESP) are only 2 bytes, with remaining length set to 0.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the case when ExtractUVarInt would return insufficient number of bytes error which would happen as it goes over 4 bytes to parse (as it does not know that the limit is 4 bytes), which would prompt my code to return kNeedsMoreData when in fact it is kInvalid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not work to return kNeedMoreData for all cases of ExtractUVarInt errors?

I don't think it will work because if only 2 bytes are available a UVarInt decode could return a complete value or it could return an insufficient number of bytes error. Since UVarInt's of 2-4 bytes in length are valid in MQTT, we can't use that error as an indicator unless we know that a 4 byte UVarInt is guaranteed to parse.

Also, I am not sure why it would be easier to change <2 to <5. Wouldn't this eliminate cases where the buffer size is 2? (PINGREQ and PINRESP) are only 2 bytes, with remaining length set to 0.)

Good call. Since PINGREQ and PINGRESP are a maximum of 2 bytes, that would be a problem and is another case we need to handle.

Are you talking about the case when ExtractUVarInt would return insufficient number of bytes error which would happen as it goes over 4 bytes to parse

Correct, that would be similar to case 1 from my original comment. If we check for 5 bytes, that would allow us to treat any UVarInt decoding as kInvalid since it would guarantee that it would be 5+ bytes in size.

Could we handle the PINGREQ and PINGRESP cases once the control_packet_code_flags variable is populated? Then we can check to see if the buffer contains 5 bytes? I'm thinking something like the following:

    if (buf->size() < 2) {
      return ParseState::kNeedsMoreData;
    }

    PX_ASSIGN_OR_RETURN_ERROR(uint8_t control_packet_code_flags, decoder.ExtractBEInt<uint8_t>());
    uint8_t control_packet_code = control_packet_code_flags >> 4;
    uint8_t control_packet_flags = control_packet_code_flags & 0x0F;

    MqttControlPacketType control_packet_type = GetControlPacketType(control_packet_code);
    result->control_packet_type = ControlPacketTypeStrings[control_packet_type];

    if (control_packet_type == MqttControlPacketType::PINGREQ || 
        control_packet_type == MqttControlPacketType::PINGRESP) {
          // Decode UVarInt, check it's 1 byte in length and return success
          // if decoding fails or it is > 1 byte, return kInvalid
    }
    
    // With the control messages less than 5 bytes in size handled, we can do the following validation.
    // This would then allow us to treat any insufficient byte errors from UVarInt decoding as kInvalid since
    // there shouldn't be a UVarInt with more than 4 bytes returned.
    if (buf->size() < 5) {
      return ParseState::kNeedsMoreData;
    }
    PX_ASSIGN_OR(size_t remaining_length, decoder.ExtractUVarInt(), return ParseState::kInvalid)

Copy link
Contributor Author

@ChinmayaSharma-hue ChinmayaSharma-hue Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now I seem to have understood your point. Just to confirm, I have drawn a diagram to find out if we are on the same page.
UVarInt
You're talking about the circled out cases in the figure where ExtractUVarInt returns insufficient number of bytes error even when the buffer is complete (meaning that the full packet is present) due to incorrect encoding leading ExtractUVarInt to think there is more variable encoded data than there is. (I guess this would cause a repeated return of kNeedsMoreData to keep filling the buffer with the wrong packet data as it is invalid)
Also buf->size() < 5 returning kNeedsMoreData would work if UVarInt is more than one byte because remaining length being 2 byte or more would just mean that the buffer would definitely be bigger than 5.
So what you would be doing is just eliminate all cases of kNeedsMoreData before parsing UVarInt so that all the remaining cases of insufficient number of bytes error could be kInvalid.

Copy link
Contributor Author

@ChinmayaSharma-hue ChinmayaSharma-hue Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaves all the cases where remaining length is one byte and value of remaining length is less than 4. Like for PUBACK remaining length is 3, and the remaining length itself takes one byte, so it is less than 5 bytes which would cause the code to return kNeedsMoreData.
One way to resolve this is after checking if buffer size is less than 5, we can extract the UVarInt remaining length and check if it is one byte and then proceed.

  1. If ExtractUVarInt returns an integer more than one byte (but less than 4 bytes), that can be one of two things,
    • Encoding is wrong, kInvalid needs to be returned.
    • Buffer is incomplete, kNeedsMoreData needs to be returned.
  2. If ExtractUVarInt returns insufficient number of bytes error then it is definitely kInvalid as it is trying to consume more than 4 bytes for UVarInt.

So in the first point there needs to be a differentiation for the two cases. At this point I am at a loss as to how to do this. If the remaining length bytes itself are wrong (encoding is wrong) then there is no way to validate whether or not the buffer is incomplete or the remaining length bytes are wrong.

Copy link
Contributor Author

@ChinmayaSharma-hue ChinmayaSharma-hue Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently I have decided to do this,

  // Decoding the variable encoding of remaining length field
  size_t remaining_length;
  if (control_packet_type == MqttControlPacketType::PINGREQ ||
      control_packet_type == MqttControlPacketType::PINGRESP) {
    PX_ASSIGN_OR_RETURN_INVALID(remaining_length, decoder.ExtractUVarInt());
    if (remaining_length > 0) {
      return ParseState::kInvalid;
    }
  }

  // Eliminating cases where kNeedsMoreData needs to be returned
  // If buffer size is less tan 4, there are chances that the remaining length is not present in its entirety
  if (decoder.BufSize() < 4) {
      // Checking if buffer is complete
      PX_ASSIGN_OR_RETURN_NEEDS_MORE_DATA(remaining_length, decoder.ExtractUVarInt());
      // if remaining length is greater than 3 (4 if remaining length is included), then incomplete buffer, otherwise buffer is complete
      if (remaining_length > 3) {
          return ParseState::kNeedsMoreData;
      }
  } else {
      PX_ASSIGN_OR_RETURN_INVALID(remaining_length, decoder.ExtractUVarInt());
      if (!VariableEncodingNumBytes(remaining_length).ok()) {
          return ParseState::kInvalid;
      }
  }

  // Making sure buffer is complete according to remaining length
  if (decoder.BufSize() < remaining_length) {
    return ParseState::kNeedsMoreData;
  }

This is not perfect since it can cause problems in this section,

    // if remaining length is greater than 3 (4 if remaining length is included), then incomplete buffer, otherwise buffer is complete
    if (remaining_length > 3) {
        return ParseState::kNeedsMoreData;
    }

where I am deciding that buffer is incomplete if remaining length is greater than 3, when it could be that remaining length is greater than 3 simply because of an encoding error and the full data is still present in the buffer. But this works for most of the cases discussed.

Also <5 is replaced with <4 because the first byte is already extracted and I am using decoder's buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the detailed explanation on those cases.

If we can special case any control codes that are known to be short in length (like we did with PINGREQ and PINGRESP, it may reduce the cases of treating some of these incorrectly. For now I'm fine with keeping the implementation as is unless you see an opportunity for special casing any of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Special casing only works for PINGREQ and PINGRESP as we can guarantee their sizes to be below 5. With all the other control packets there are chances that properties are present in variable header that can make them bigger.

if (result->header_fields["password_flag"]) {
PX_ASSIGN_OR_RETURN_ERROR(size_t password_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_ERROR(std::string_view password, decoder->ExtractString(password_length));
result->payload["password"] = std::string(password);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this field is important to capture? Since we are capable of capturing everything within a connection, I think it's best to avoid sensitive information when it's easily detected.

I think we should either replace it with a string of the same length "XXXXX" or just skip over the field after advancing the buffer.

Comment on lines 637 to 659
result->header_fields["dup"] = (control_packet_flags >> 3) != 0;
result->header_fields["retain"] = (control_packet_flags & 0x1) != 0;
result->header_fields["qos"] = (control_packet_flags >> 1) & 0x3;
Copy link
Member

@ddelnano ddelnano Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are expected to be on every message, I think these would be better as individual bools on the Message struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for QOS right? Because it would be helpful to know the exact QOS (0,1 or 2) instead of just knowing whether or not the qos is 0 or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I glossed over that the qos field wasn't a bool. The data type should fit whatever we need to store for each field.

inline RecordsWithErrorCount<mqtt::Record> StitchFrames(std::deque<mqtt::Message>* req_messages,
std::deque<mqtt::Message>* resp_messages,
NoState* /*state*/) {
return StitchMessagesWithTimestampOrder<mqtt::Record>(req_messages, resp_messages);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stitcher should be the next PR after the parser changes, so I don't want to dive into this too much now. However, it seems that message ordering is only guaranteed within a given QoS level. I think using StitchMessagesWithTimestampOrder will result in invalid protocol traces if a client used multiple QoS levels.

Screenshot 2023-11-01 at 8 54 26 AM

In the next PR we will add tests for the Stitcher and we can model that situation. We will likely need to perform a similar process as StitchMessagesWithTimestampOrder, making sure that we only match frames within the same QoS since that will guarantee its assumptions hold. This is why it doesn't work with HTTP pipelining.

We can either use the QoS field as the map key to our new stitcher interface. This hasn't been documented yet as it's in the process of being merged (#1716) or we can leverage the protocol state to make sure we have the ordering correct. My initial thinking is that the former would be best, but we will have to see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I had initially used StitchMessagesWithTimestampOrder as a placeholder as I was not sure what would work. I will focus more on the stitcher in the stitcher PR. Thanks for the additional context, it's very helpful.

ParseState result_state;
std::string_view frame_view;

uint8_t payload_format_indicator_publish[] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did these payloads come from real packet captures or are they handcrafted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They came from real packet captures. I used mosquitto to generate packets with different properties and payloads.

Comment on lines 793 to 800
EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 16);
EXPECT_EQ(frame.header_fields["username_flag"], (unsigned long) 0);
EXPECT_EQ(frame.header_fields["password_flag"], (unsigned long) 0);
EXPECT_EQ(frame.header_fields["will_retain"], (unsigned long) 0);
EXPECT_EQ(frame.header_fields["will_qos"], (unsigned long) 0);
EXPECT_EQ(frame.header_fields["will_flag"], (unsigned long) 0);
EXPECT_EQ(frame.header_fields["clean_start"], (unsigned long) 1);
EXPECT_EQ(frame.header_fields["keep_alive"], (unsigned long) 60);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use UL to form the appropriate literals or remove the casting entirely. The tests seem to pass without these, so my preference is the latter. This applies to the other casting done in this file.

@@ -62,7 +62,7 @@ DEFINE_string(trace, "",
"Dynamic trace to deploy. Either (1) the path to a file containing PxL or IR trace "
"spec, or (2) <path to object file>:<symbol_name> for full-function tracing.");
DEFINE_string(print_record_batches,
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events",
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events,mqtt_events",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should hold off on updating this until the MQTT changes are released (one of the final changes).

@@ -45,5 +45,6 @@ pl_cc_library(
"//src/stirling/source_connectors/socket_tracer/protocols/nats:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/pgsql:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/redis:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/mqtt:cc_library",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be saved for a later change. Most likely once we introduce the "mqtt trace bpf test"

@@ -65,6 +65,7 @@ DECLARE_int32(stirling_enable_nats_tracing);
DECLARE_int32(stirling_enable_kafka_tracing);
DECLARE_int32(stirling_enable_mux_tracing);
DECLARE_int32(stirling_enable_amqp_tracing);
DECLARE_int32(stirling_enable_mqtt_tracing);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this file should also come in a later PR (when the MQTT trace bpf test is added).

@@ -49,7 +50,8 @@ using FrameDequeVariant = std::variant<std::monostate,
std::deque<redis::Message>,
std::deque<kafka::Packet>,
std::deque<nats::Message>,
std::deque<amqp::Frame>>;
std::deque<amqp::Frame>,
std::deque<mqtt::Message>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this file should also come in a later PR (when the MQTT trace bpf test is added).

//-----------------------------------------------------------------------------

/**
* Record is the primary output of the http stitcher.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Record is the primary output of the http stitcher.
* Record is the primary output of the MQTT stitcher.

@@ -30,3 +30,4 @@
#include "src/stirling/source_connectors/socket_tracer/protocols/nats/stitcher.h" // IWYU pragma: export
#include "src/stirling/source_connectors/socket_tracer/protocols/pgsql/stitcher.h" // IWYU pragma: export
#include "src/stirling/source_connectors/socket_tracer/protocols/redis/stitcher.h" // IWYU pragma: export
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h" // IWYU pragma: export
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should save this for the stitcher PR.

namespace protocols {

/**
* Parses a single HTTP message from the input string.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Parses a single HTTP message from the input string.
* Parses a single MQTT message from the input string.

#include "src/stirling/utils/binary_decoder.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h"

#define PX_ASSIGN_OR_RETURN_ERROR(expr, val_or) \
Copy link
Member

@ddelnano ddelnano Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to PX_ASSIGN_OR_RETURN_NEEDS_MORE_DATA? Imo kNeedsMoreData isn't an error because it's an indicator that we should continue to retry. We also have the same macro defined in the pgsql parser and this would ensure our naming is consistent (source).

In addition to this, I believe we are returning kNeedsMoreData in places that we shouldn't. Ideally we would validate that the parser has the entire payload as early as possible and return kInvalid for any subsequent decoding. These later decodings shouldn't fail because we've validated that the buffer contains enough bytes, but imo it's the correct ParseState to return. The MQTT case is a little complex because we have multiple "payload" lengths -- remaining_length and variable_header_length.

My understanding is that remaining length will include the size of the entire MQTT frame. Assuming that's correct, we should only consider returning kNeedsMoreData until we can validate that the buffer is greater than or equal to remaining length. After that point, any decoding errors should be kInvalid.

It appears the variable length header can contain optional fields. We only decode these fields when we know that they should be present in order to maintain the assumption I mentioned in the previous paragraph. It seems we are already accomplishing that (as seen in the PUBCOMP case), so that shouldn't require any changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what that would mean is only in the main parsing function before parsing the variable header or the payload, checks are done to make sure remaining length is equal to the buffer size and then for any subsequent bit extractions, kInvalid should be returned?
Right now for every extraction error kNeedsMoreData is being returned. I realise that extraction error happening is unlikely (unless remaining length field is incorrect) because we have already validated that the remaining length is equal to the buffer size so the full packet is already in the buffer. Would you suggest having PX_ASSIGN_OR_RETURN_INVALID to return kInvalid in variable header parsing and payload parsing? Because an extraction error in variable header/payload parsing, after we have already validated remaining length to be equal to buffer size, means that the remaining length field was wrong, which would make it an invalid MQTT packet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your latest changes are consistent with what I described above, so this lgtm.

struct Message: public FrameBase {
message_type_t type = message_type_t::kUnknown;

std::string control_packet_type = "UNKNOWN";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try to avoid storing strings in the data table unless it's necessary. Since this field can be modeled as an 8 bit int we should use that instead (MqttControlPacketType / uint8_t).

We typically add a pxl function to map this int back to a string for queries (docs). This allows us to minimize the storage needed for the data type while still allowing the human readable name to be used for visualizations.

Comment on lines 43 to 60
template<typename KeyType, typename ValueType>
static std::string MapToString(const std::map<KeyType, ValueType>& inputMap) {
std::string result = "{";
for (const auto& entry : inputMap) {
result += entry.first + ": ";
if constexpr (std::is_same_v<ValueType, uint32_t>) {
result += std::to_string(entry.second);
} else if constexpr (std::is_same_v<ValueType, std::string>) {
result += entry.second;
}
result += ", ";
}
if (!inputMap.empty()) {
result = result.substr(0, result.size() - 2); // Remove the trailing ", "
}
result += "}";
return result;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be replaced with the ToJSONString function? It seems like this is similar to json encoding and we should use rapidjson rather than building it by hand.

Comment on lines 67 to 83
std::string header_fields_str = "{";
for (const auto& entry : properties) {
header_fields_str += entry.first + ": " + std::string(entry.second) + ", ";
}
header_fields_str += "}";

std::string properties_str = "{";
for (const auto& entry : properties) {
properties_str += entry.first + ": " + entry.second + ", ";
}
properties_str += "}";

std::string payload_str = "{";
for (const auto& entry : properties) {
payload_str += entry.first + ": " + entry.second + ", ";
}
payload_str += "}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question regarding json encoding here.

@ddelnano
Copy link
Member

ddelnano commented Nov 8, 2023

@ChinmayaSharma-hue just posted my second round of feedback and this is shaping up very nicely! Thanks again for all the hard work on this!

In addition to the comments, can you please update the PR description to match our GitHub template? This would have been surfaced once the GitHub actions for this PR are permitted to run (I don't have that permission, but I can get that triggered today).

@ChinmayaSharma-hue
Copy link
Contributor Author

@ChinmayaSharma-hue I believe this will be my last round of comments. I know there have been a variety of things to address, but making this initial implementation solid will be worth it in the long run!

Also please check the linter build errors. Those will need to be resolved as well.

I haven't gotten to making all the required changes as of yet including the linter error fixes, I will let you know as soon as all the changes are made. There were some instances where I made silly errors which should have been fixed without alerting on your part. I hope to make less of these in the future! Thanks a lot for the guidance and support.

Copy link
Member

@ddelnano ddelnano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment regarding one of the changes from the previous round of feedback and two suggestions for fixing the runtime/int linter warnings.

Comment on lines 292 to 293
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint8_t>());
result->properties["maximum_qos"] = std::to_string(topic_alias);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint8_t>());
result->properties["maximum_qos"] = std::to_string(topic_alias);
PX_ASSIGN_OR_RETURN_INVALID(uint8_t max_qos, decoder->ExtractBEInt<uint8_t>());
result->properties["maximum_qos"] = std::to_string(max_qos);

constexpr int kMaxVarInt24 = 2097152;
constexpr int kMaxVarInt32 = 268435456;

static inline StatusOr<size_t> VariableEncodingNumBytes(unsigned long integer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static inline StatusOr<size_t> VariableEncodingNumBytes(unsigned long integer) {
static inline StatusOr<size_t> VariableEncodingNumBytes(uint64_t integer) {

break;
}
case PropertyCode::SubscriptionIdentifier: {
PX_ASSIGN_OR_RETURN_INVALID(unsigned long subscription_id, decoder->ExtractUVarInt());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PX_ASSIGN_OR_RETURN_INVALID(unsigned long subscription_id, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(uint64_t subscription_id, decoder->ExtractUVarInt());

@ChinmayaSharma-hue
Copy link
Contributor Author

I have fixed the linting errors (I ran arc lint locally), and I have fixed the other issues with the code. Sorry for the delay.

@ddelnano
Copy link
Member

@pixie-io/maintainers can we kick off the github actions for this PR? This is ready for its final review and I will be approving once the build passes.

@ddelnano ddelnano requested a review from a team November 21, 2023 18:37
Copy link
Member

@ddelnano ddelnano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for this contribution @ChinmayaSharma-hue and looking forward to working together on the upcoming ones!

@ddelnano ddelnano requested a review from a team November 21, 2023 18:37
@JamesMBartlett
Copy link
Member

Thanks for the contribution @ChinmayaSharma-hue. We require all of our commits to be GPG signed. Could you please follow this guide and sign your commits: https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits

Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
…e tests

Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
@ChinmayaSharma-hue
Copy link
Contributor Author

The commits are now gpg signed and verified.

@JamesMBartlett JamesMBartlett merged commit b1aa1a6 into pixie-io:main Dec 5, 2023
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants