Skip to content

Commit

Permalink
[Add Local Addr & Port 2/3] Add columns to socket tracer (pixie-io#1807)
Browse files Browse the repository at this point in the history
Summary: Adds data columns for local IP address and port to the socket
tracer, which are populated by pixie-io#1808 and pixie-io#1809. This will support
standalone pem entity relationships.

Type of change: /kind feature

Test Plan: Existing targets

Signed-off-by: Benjamin Kilimnik <bkilimnik@pixielabs.ai>
  • Loading branch information
benkilimnik committed Jan 25, 2024
1 parent 0ec2c3a commit fdce3fd
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/carnot/goplanner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ relation_map {
column_type: INT64
column_semantic_type: ST_PORT
}
columns {
column_name: "local_addr"
column_type: STRING
column_semantic_type: ST_IP_ADDRESS
}
columns {
column_name: "local_port"
column_type: INT64
column_semantic_type: ST_PORT
}
columns {
column_name: "protocol"
column_type: INT64
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/amqp_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static constexpr DataElement kAMQPElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{
"frame_type", "AMQP request command",
Expand Down
16 changes: 16 additions & 0 deletions src/stirling/source_connectors/socket_tracer/canonical_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ constexpr DataElement kRemotePort = {
types::PatternType::GENERAL,
};

constexpr DataElement kLocalAddr = {
"local_addr",
"IP address of the local endpoint.",
types::DataType::STRING,
types::SemanticType::ST_IP_ADDRESS,
types::PatternType::GENERAL,
};

constexpr DataElement kLocalPort = {
"local_port",
"Port of the local endpoint.",
types::DataType::INT64,
types::SemanticType::ST_PORT,
types::PatternType::GENERAL,
};

constexpr DataElement kTraceRole = {
"trace_role",
"The role (client-or-server) of the process that owns the connections.",
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/cass_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static constexpr DataElement kCQLElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_op", "Request opcode",
types::DataType::INT64,
Expand Down
7 changes: 7 additions & 0 deletions src/stirling/source_connectors/socket_tracer/conn_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ class ConnTracker : NotCopyMoveable {
*/
const SockAddr& remote_endpoint() const { return open_info_.remote_addr; }

/**
* Get local IP endpoint of the connection.
*/
const SockAddr& local_endpoint() const { return open_info_.local_addr; }

/**
* Get the connection information (e.g. remote IP, port, PID, etc.) for this connection.
*/
Expand Down Expand Up @@ -801,6 +806,8 @@ std::string DebugString(const ConnTracker& c, std::string_view prefix) {
info += absl::Substitute("state=$0\n", magic_enum::enum_name(c.state()));
info += absl::Substitute("$0remote_addr=$1:$2\n", prefix, c.remote_endpoint().AddrStr(),
c.remote_endpoint().port());
info += absl::Substitute("$0local_addr=$1:$2\n", prefix, c.local_endpoint().AddrStr(),
c.local_endpoint().port());
info += absl::Substitute("$0protocol=$1\n", prefix, magic_enum::enum_name(c.protocol()));
if constexpr (std::is_same_v<TFrameType, protocols::http2::Stream>) {
info += c.http2_client_streams_.DebugString(absl::StrCat(prefix, " "));
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/dns_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ static constexpr DataElement kDNSElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_header", "Request header",
types::DataType::STRING,
Expand Down
4 changes: 4 additions & 0 deletions src/stirling/source_connectors/socket_tracer/http_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ constexpr DataElement kHTTPElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"major_version", "HTTP major version, can be 1 or 2",
types::DataType::INT64,
Expand Down Expand Up @@ -112,6 +114,8 @@ constexpr int kHTTPTimeIdx = kHTTPTable.ColIndex("time_");
constexpr int kHTTPUPIDIdx = kHTTPTable.ColIndex("upid");
constexpr int kHTTPRemoteAddrIdx = kHTTPTable.ColIndex("remote_addr");
constexpr int kHTTPRemotePortIdx = kHTTPTable.ColIndex("remote_port");
constexpr int kHTTPLocalAddrIdx = kHTTPTable.ColIndex("local_addr");
constexpr int kHTTPLocalPortIdx = kHTTPTable.ColIndex("local_port");
constexpr int kHTTPTraceRoleIdx = kHTTPTable.ColIndex("trace_role");
constexpr int kHTTPMajorVersionIdx = kHTTPTable.ColIndex("major_version");
constexpr int kHTTPMinorVersionIdx = kHTTPTable.ColIndex("minor_version");
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/kafka_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ static constexpr DataElement kKafkaElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_cmd", "Kafka request command",
types::DataType::INT64,
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/mongodb_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ static constexpr DataElement kMongoDBElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_cmd", "MongoDB request command",
types::DataType::STRING,
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/mux_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ static constexpr DataElement kMuxElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_type", "Mux message request type",
types::DataType::INT64,
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/mysql_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static constexpr DataElement kMySQLElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_cmd", "MySQL request command",
types::DataType::INT64,
Expand Down
4 changes: 4 additions & 0 deletions src/stirling/source_connectors/socket_tracer/nats_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ constexpr DataElement kNATSElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"cmd", "The name of the command.",
types::DataType::STRING, types::SemanticType::ST_NONE, types::PatternType::GENERAL},
Expand All @@ -55,6 +57,8 @@ constexpr int kTime = kNATSTable.ColIndex("time_");
constexpr int kUPID = kNATSTable.ColIndex("upid");
constexpr int kRemoteAddr = kNATSTable.ColIndex("remote_addr");
constexpr int kRemotePort = kNATSTable.ColIndex("remote_port");
constexpr int kLocalAddr = kNATSTable.ColIndex("local_addr");
constexpr int kLocalPort = kNATSTable.ColIndex("local_port");
constexpr int kRemoteRole = kNATSTable.ColIndex("trace_role");
constexpr int kCMD = kNATSTable.ColIndex("cmd");
constexpr int kOptions = kNATSTable.ColIndex("body");
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/pgsql_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ static constexpr DataElement kPGSQLElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_cmd", "PostgreSQL request command code",
types::DataType::STRING,
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/redis_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ static constexpr DataElement kRedisElements[] = {
canonical_data_elements::kUPID,
canonical_data_elements::kRemoteAddr,
canonical_data_elements::kRemotePort,
canonical_data_elements::kLocalAddr,
canonical_data_elements::kLocalPort,
canonical_data_elements::kTraceRole,
{"req_cmd", "Request command. See https://redis.io/commands.",
types::DataType::STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
// But std::move is not allowed because we re-use conn object.
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("major_version")>(1);
r.Append<r.ColIndex("minor_version")>(resp_message.minor_version);
Expand Down Expand Up @@ -1333,6 +1335,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("major_version")>(2);
// HTTP2 does not define minor version.
Expand Down Expand Up @@ -1374,6 +1378,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("req_cmd")>(static_cast<uint64_t>(entry.req.cmd));
r.Append<r.ColIndex("req_body")>(std::move(entry.req.msg), FLAGS_max_body_bytes);
Expand All @@ -1397,6 +1403,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("req_op")>(static_cast<uint64_t>(entry.req.op));
r.Append<r.ColIndex("req_body")>(std::move(entry.req.msg), FLAGS_max_body_bytes);
Expand All @@ -1420,6 +1428,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("req_header")>(entry.req.header);
r.Append<r.ColIndex("req_body")>(entry.req.query);
Expand All @@ -1443,6 +1453,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("req")>(std::move(entry.req.payload));
r.Append<r.ColIndex("resp")>(std::move(entry.resp.payload));
Expand All @@ -1465,6 +1477,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());
r.Append<r.ColIndex("req_type")>(entry.req.type);
r.Append<r.ColIndex("latency")>(
Expand All @@ -1486,6 +1500,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(conn_tracker.role());

size_t frame_type = std::max(entry.req.frame_type, entry.resp.frame_type);
Expand Down Expand Up @@ -1537,6 +1553,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(role);
r.Append<r.ColIndex("req_cmd")>(std::string(entry.req.command));
r.Append<r.ColIndex("req_args")>(std::string(entry.req.payload));
Expand All @@ -1560,6 +1578,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(role);
r.Append<r.ColIndex("cmd")>(record.req.command);
r.Append<r.ColIndex("body")>(record.req.options);
Expand All @@ -1583,6 +1603,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(role);
r.Append<r.ColIndex("req_cmd")>(static_cast<int64_t>(record.req.api_key));
r.Append<r.ColIndex("client_id")>(std::move(record.req.client_id), FLAGS_max_body_bytes);
Expand All @@ -1607,6 +1629,8 @@ void SocketTraceConnector::AppendMessage(ConnectorContext* ctx, const ConnTracke
r.Append<r.ColIndex("upid")>(upid.value());
r.Append<r.ColIndex("remote_addr")>(conn_tracker.remote_endpoint().AddrStr());
r.Append<r.ColIndex("remote_port")>(conn_tracker.remote_endpoint().port());
r.Append<r.ColIndex("local_addr")>(conn_tracker.local_endpoint().AddrStr());
r.Append<r.ColIndex("local_port")>(conn_tracker.local_endpoint().port());
r.Append<r.ColIndex("trace_role")>(role);
r.Append<r.ColIndex("req_cmd")>(std::move(record.req.op_msg_type));
r.Append<r.ColIndex("req_body")>(std::move(record.req.frame_body));
Expand Down Expand Up @@ -1742,6 +1766,9 @@ void SocketTraceConnector::TransferConnStats(ConnectorContext* ctx, DataTable* d
r.Append<idx::kUPID>(upid.value());
r.Append<idx::kRemoteAddr>(key.remote_addr);
r.Append<idx::kRemotePort>(key.remote_port);
// TODO(@benkilimnik: uncomment when we have local addr/port in the key)
// r.Append<idx::kLocalAddr>(key.local_addr);
// r.Append<idx::kLocalPort>(key.local_port);
r.Append<idx::kAddrFamily>(static_cast<int>(stats.addr_family));
r.Append<idx::kProtocol>(stats.protocol);
r.Append<idx::kRole>(stats.role);
Expand Down

0 comments on commit fdce3fd

Please sign in to comment.