Skip to content

Commit

Permalink
[Core] Cancel lease requests before returning a PG bundle (ray-projec…
Browse files Browse the repository at this point in the history
…t#46116)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao committed Jun 20, 2024
1 parent 3cca541 commit a50fc17
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 239 deletions.
84 changes: 84 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import asyncio
import os
import threading
from time import sleep
Expand All @@ -22,6 +23,8 @@
)
from ray.job_submission import JobSubmissionClient, JobStatus
from ray._raylet import GcsClient
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray.util.state import list_placement_groups

import psutil

Expand Down Expand Up @@ -1213,6 +1216,87 @@ def spawn(self, name, namespace):
raise ValueError(f"Unknown case: {case}")


MyPlugin = "MyPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"


class HangPlugin(RuntimeEnvPlugin):
name = MyPlugin

async def create(
self,
uri,
runtime_env,
ctx,
logger, # noqa: F821
) -> float:
while True:
await asyncio.sleep(1)

@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_rpc_server_reconnect_timeout_s=60,
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
),
],
indirect=True,
)
@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_placement_group_removal_after_gcs_restarts(
set_runtime_env_plugins, ray_start_regular_with_external_redis
):
@ray.remote
def task():
pass

pg = ray.util.placement_group(bundles=[{"CPU": 1}])
_ = task.options(
max_retries=0,
num_cpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
),
runtime_env={
MyPlugin: {"name": "f2"},
"config": {"setup_timeout_seconds": -1},
},
).remote()

# The task should be popping worker
# TODO(jjyao) Use a more determinstic way to
# decide whether the task is popping worker
sleep(5)

ray.util.remove_placement_group(pg)
# The PG is marked as REMOVED in redis but not removed yet from raylet
# due to the injected delay of CancelResourceReserve rpc
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")

ray._private.worker._global_node.kill_gcs_server()
# After GCS restarts, it will try to remove the PG resources
# again via ReleaseUnusedBundles rpc
ray._private.worker._global_node.start_gcs_server()

def verify_pg_resources_cleaned():
r_keys = ray.available_resources().keys()
return all("group" not in k for k in r_keys)

wait_for_condition(verify_pg_resources_cleaned, timeout=30)


if __name__ == "__main__":

import pytest
Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,9 @@ async def create(
) -> float:
await asyncio.sleep(PLUGIN_TIMEOUT)


@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1
@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
Expand Down
36 changes: 36 additions & 0 deletions python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest
import sys
import ray
import time
import ray.cluster_utils
from ray._private.test_utils import get_other_nodes, wait_for_condition
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

MB = 1024 * 1024

Expand All @@ -16,6 +18,40 @@ def value(self):
return self.n


def test_placement_group_recover_prepare_failure(monkeypatch, ray_start_cluster):
# Test to make sure that gcs can handle the prepare pg failure
# by retrying on other nodes.
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

monkeypatch.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.PrepareBundleResources=500000000:500000000",
)
worker1 = cluster.add_node(num_cpus=1)
pg = ray.util.placement_group(
strategy="STRICT_SPREAD", bundles=[{"CPU": 1}, {"CPU": 1}]
)
# actor will wait for the pg to be created
actor = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)
).remote()

# wait for the prepare rpc to be sent
time.sleep(1)

# prepare will fail
cluster.remove_node(worker1)

monkeypatch.delenv("RAY_testing_asio_delay_us")
# prepare will retry on this node
cluster.add_node(num_cpus=1)

# pg can be created successfully
ray.get(actor.value.remote())


# Test whether the bundles spread on two nodes can be rescheduled successfully
# when both nodes die at the same time.
def test_placement_group_failover_when_two_nodes_die(monkeypatch, ray_start_cluster):
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
thread.join();

RayConfig::instance().initialize(promise.get_future().get());
ray::asio::testing::init();
}

void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,8 +1321,10 @@ void GcsActorManager::OnActorSchedulingFailed(
ray::rpc::ActorDeathCause death_cause;
switch (failure_type) {
case rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED:
error_msg =
"Could not create the actor because its associated placement group was removed.";
error_msg = absl::StrCat(
"Could not create the actor because its associated placement group was "
"removed.\n",
scheduling_failure_message);
death_cause.mutable_actor_unschedulable_context()->set_error_message(error_msg);
break;
case rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED:
Expand Down
39 changes: 15 additions & 24 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
auto node_id = NodeID::FromBinary(node.value()->node_id());

if (max_retry == current_retry_cnt) {
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
return;
}

Expand All @@ -261,11 +261,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
} else {
// We couldn't delete the pg resources either becuase it is in use
// or network issue. Retry.
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
// We couldn't delete the pg resources because of network issue. Retry.
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
execute_after(
io_context_,
[this, bundle_spec, node, max_retry, current_retry_cnt] {
Expand Down Expand Up @@ -568,14 +567,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
}
}
Expand All @@ -594,14 +589,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
committed_bundle_location_index_.Erase(placement_group_id);
cluster_resource_scheduler_.GetClusterResourceManager()
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int main(int argc, char *argv[]) {
gflags::ShutDownCommandLineFlags();

RayConfig::instance().initialize(config_list);
ray::asio::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
Expand Down
Loading

0 comments on commit a50fc17

Please sign in to comment.