Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: Adding eds caching support to grpc-mux #28273

Merged
merged 9 commits into from
Aug 2, 2023
Prev Previous commit
Next Next commit
add tests to increase coverage, and test unified_mux
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Jul 24, 2023
commit 1556694fd1d2c129e6feb56134eeaac4b7a1e903
13 changes: 13 additions & 0 deletions test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,19 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) {
"--service-node and --service-cluster options.");
}

// Validates that the EDS cache getter returns the cache.
TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) {
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();
EXPECT_NE({}, grpc_mux_->edsResourcesCache());
}

// Validates that the EDS cache getter returns empty if there is no cache.
TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) {
setup();
EXPECT_EQ({}, grpc_mux_->edsResourcesCache());
}

// Validate that an EDS resource is cached if there's a cache.
TEST_F(GrpcMuxImplTest, CacheEdsResource) {
// Create the cache that will also be passed to the GrpcMux object via setup().
Expand Down
13 changes: 13 additions & 0 deletions test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,19 @@ TEST_P(NewGrpcMuxImplTest, Shutdown) {
// There won't be any unsubscribe messages for the legacy mux either for the same reason
}

// Validates that the EDS cache getter returns the cache.
TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEds) {
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();
EXPECT_NE({}, grpc_mux_->edsResourcesCache());
}

// Validates that the EDS cache getter returns empty if there is no cache.
TEST_P(NewGrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) {
setup();
EXPECT_EQ({}, grpc_mux_->edsResourcesCache());
}

// Validate that an EDS resource is cached if there's a cache.
TEST_P(NewGrpcMuxImplTest, CacheEdsResource) {
htuch marked this conversation as resolved.
Show resolved Hide resolved
// Create the cache that will also be passed to the GrpcMux object via setup().
Expand Down
191 changes: 179 additions & 12 deletions test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "test/config/v2_link_hacks.h"
#include "test/mocks/common.h"
#include "test/mocks/config/custom_config_validators.h"
#include "test/mocks/config/eds_resources_cache.h"
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
Expand Down Expand Up @@ -58,17 +59,7 @@ class GrpcMuxImplTestBase : public testing::Test {
control_plane_pending_requests_(stats_.gauge("control_plane.pending_requests",
Stats::Gauge::ImportMode::NeverImport)) {}

void setup() {
grpc_mux_ = std::make_unique<XdsMux::GrpcMuxSotw>(
std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
*stats_.rootScope(), rate_limit_settings_, local_info_, true, std::move(config_validators_),
std::make_unique<JitteredExponentialBackOffStrategy>(
SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs,
random_),
/*xds_config_tracker=*/XdsConfigTrackerOptRef());
}
void setup() { setup(rate_limit_settings_); }

void setup(const RateLimitSettings& custom_rate_limit_settings) {
grpc_mux_ = std::make_unique<XdsMux::GrpcMuxSotw>(
Expand All @@ -80,7 +71,9 @@ class GrpcMuxImplTestBase : public testing::Test {
std::make_unique<JitteredExponentialBackOffStrategy>(
SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs,
random_),
/*xds_config_tracker=*/XdsConfigTrackerOptRef());
/*xds_config_tracker=*/XdsConfigTrackerOptRef(),
/*xds_resources_delegate=*/XdsResourcesDelegateOptRef(),
std::unique_ptr<MockEdsResourcesCache>(eds_resources_cache_), /*target_xds_authority=*/"");
}

void expectSendMessage(const std::string& type_url,
Expand Down Expand Up @@ -138,6 +131,7 @@ class GrpcMuxImplTestBase : public testing::Test {
Envoy::Config::RateLimitSettings rate_limit_settings_;
Stats::Gauge& control_plane_connected_state_;
Stats::Gauge& control_plane_pending_requests_;
MockEdsResourcesCache* eds_resources_cache_{nullptr};
};

class GrpcMuxImplTest : public GrpcMuxImplTestBase {
Expand Down Expand Up @@ -1045,6 +1039,177 @@ TEST_F(GrpcMuxImplTest, AllMuxesStateTest) {
EXPECT_TRUE(grpc_mux_1->isShutdown());
}

// Validates that the EDS cache getter returns the cache.
TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) {
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();
EXPECT_NE({}, grpc_mux_->edsResourcesCache());
}

// Validates that the EDS cache getter returns empty if there is no cache.
TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) {
setup();
EXPECT_EQ({}, grpc_mux_->edsResourcesCache());
}

// Validate that an EDS resource is cached if there's a cache.
TEST_F(GrpcMuxImplTest, CacheEdsResource) {
// Create the cache that will also be passed to the GrpcMux object via setup().
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();

OpaqueResourceDecoderSharedPtr resource_decoder(
std::make_shared<TestUtility::TestOpaqueResourceDecoderImpl<
envoy::config::endpoint::v3::ClusterLoadAssignment>>("cluster_name"));
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
InSequence s;
auto eds_sub = makeWatch(type_url, {"x"});

EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x"}, "", true);
grpc_mux_->start();

// Reply with the resource, it will be added to the cache.
{
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url);
response->set_version_info("1");
envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("x");
response->add_resources()->PackFrom(load_assignment);

EXPECT_CALL(callbacks_, onConfigUpdate(_, "1"))
.WillOnce(Invoke([](const std::vector<DecodedResourceRef>& resources, const std::string&) {
EXPECT_EQ(1, resources.size());
}));
EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment)));
expectSendMessage(type_url, {"x"}, "1");
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}

// Envoy will unsubscribe from all resources.
EXPECT_CALL(*eds_resources_cache_, removeResource("x"));
expectSendMessage(type_url, {}, "1");
}

// Validate that an update to an EDS resource watcher is reflected in the cache,
// if there's a cache.
TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) {
// Create the cache that will also be passed to the GrpcMux object via setup().
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();

OpaqueResourceDecoderSharedPtr resource_decoder(
std::make_shared<TestUtility::TestOpaqueResourceDecoderImpl<
envoy::config::endpoint::v3::ClusterLoadAssignment>>("cluster_name"));
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
InSequence s;
auto eds_sub = makeWatch(type_url, {"x"});

EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x"}, "", true);
grpc_mux_->start();

// Reply with the resource, it will be added to the cache.
{
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url);
response->set_version_info("1");
envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("x");
response->add_resources()->PackFrom(load_assignment);

EXPECT_CALL(callbacks_, onConfigUpdate(_, "1"))
.WillOnce(Invoke([](const std::vector<DecodedResourceRef>& resources, const std::string&) {
EXPECT_EQ(1, resources.size());
}));
EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment)));
expectSendMessage(type_url, {"x"}, "1");
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}

// Update the cache to another resource.
EXPECT_CALL(*eds_resources_cache_, removeResource("x"));
expectSendMessage(type_url, {"y"}, "1");
eds_sub->update({"y"});

// Envoy will unsubscribe from all resources.
EXPECT_CALL(*eds_resources_cache_, removeResource("y"));
expectSendMessage(type_url, {}, "1");
}

// Validate that adding and removing watchers reflects on the cache changes,
// if there's a cache.
TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) {
// Create the cache that will also be passed to the GrpcMux object via setup().
eds_resources_cache_ = new NiceMock<MockEdsResourcesCache>();
setup();

OpaqueResourceDecoderSharedPtr resource_decoder(
std::make_shared<TestUtility::TestOpaqueResourceDecoderImpl<
envoy::config::endpoint::v3::ClusterLoadAssignment>>("cluster_name"));
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
InSequence s;

{
auto eds_sub = makeWatch(type_url, {"x"});

EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x"}, "", true);
grpc_mux_->start();

// Reply with the resource, it will be added to the cache.
{
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url);
response->set_version_info("1");
envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("x");
response->add_resources()->PackFrom(load_assignment);

EXPECT_CALL(callbacks_, onConfigUpdate(_, "1"))
.WillOnce(Invoke([](const std::vector<DecodedResourceRef>& resources,
const std::string&) { EXPECT_EQ(1, resources.size()); }));
EXPECT_CALL(*eds_resources_cache_, setResource("x", ProtoEq(load_assignment)));
expectSendMessage(type_url, {"x"}, "1"); // Ack.
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}

// Watcher (eds_sub) going out of scope, the resource should be removed, as well as
// the interest.
EXPECT_CALL(*eds_resources_cache_, removeResource("x"));
expectSendMessage(type_url, {}, "1");
}

// Update to a new resource interest.
{
expectSendMessage(type_url, {"y"}, "1");
auto eds_sub2 = makeWatch(type_url, {"y"});

// Reply with the resource, it will be added to the cache.
{
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url);
response->set_version_info("2");
envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("y");
response->add_resources()->PackFrom(load_assignment);

EXPECT_CALL(callbacks_, onConfigUpdate(_, "2"))
.WillOnce(Invoke([](const std::vector<DecodedResourceRef>& resources,
const std::string&) { EXPECT_EQ(1, resources.size()); }));
EXPECT_CALL(*eds_resources_cache_, setResource("y", ProtoEq(load_assignment)));
expectSendMessage(type_url, {"y"}, "2"); // Ack.
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}

// Watcher (eds_sub2) going out of scope, the resource should be removed, as well as
// the interest.
EXPECT_CALL(*eds_resources_cache_, removeResource("y"));
expectSendMessage(type_url, {}, "2");
}
}

class NullGrpcMuxImplTest : public testing::Test {
public:
NullGrpcMuxImplTest() : null_mux_(std::make_unique<Config::XdsMux::NullGrpcMuxImpl>()) {}
Expand Down Expand Up @@ -1080,6 +1245,8 @@ TEST_F(NullGrpcMuxImplTest, AddWatchRaisesException) {
EnvoyException, "ADS must be configured to support an ADS config source");
}

TEST_F(NullGrpcMuxImplTest, NoEdsResourcesCache) { EXPECT_EQ({}, null_mux_->edsResourcesCache()); }

} // namespace
} // namespace XdsMux
} // namespace Config
Expand Down
Loading