Skip to content

Commit

Permalink
Stop instances that are not doing anything useful anymore (taskcluste…
Browse files Browse the repository at this point in the history
…r#6333)

* feat(w-m): Stop instances that are not doing anything useful anymore

This can happen when docker/generic worker process dies and stops
calling queue.claimWork/queue.reclaimTask
Such workers would either be not known to the queue, or would have their
last_date_active way in the past.
In such cases killing instance is the only option to free up resources.

* Moved `claimTimeout` to `lifecycle` parameter of worker pool config.

* Apply suggestions from code review

Co-authored-by: Matt Boris <92693437+matt-boris@users.noreply.github.com>

* Set minimum queue inactivity timeout to 30 minutes.

* Set minimum  queueInactivityTimeout to 1 minute
  • Loading branch information
lotas committed Jun 26, 2023
1 parent 66dc79c commit 0f4ed17
Show file tree
Hide file tree
Showing 23 changed files with 592 additions and 57 deletions.
7 changes: 7 additions & 0 deletions changelog/issue-6142.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
audience: admins
level: minor
reference: issue 6142
---

Worker manager stops instances that are not active in queue after short timeout.
This is to prevent instances from running when worker fails to start claiming work or dies and does not reclaims task.
18 changes: 13 additions & 5 deletions db/fns.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
* [`expire_worker_pool_errors`](#expire_worker_pool_errors)
* [`expire_worker_pools`](#expire_worker_pools)
* [`expire_workers`](#expire_workers)
* [`get_non_stopped_workers_quntil_providers`](#get_non_stopped_workers_quntil_providers)
* [`get_non_stopped_workers_scanner`](#get_non_stopped_workers_scanner)
* [`get_queue_worker_with_wm_join_2`](#get_queue_worker_with_wm_join_2)
* [`get_queue_workers_with_wm_join`](#get_queue_workers_with_wm_join)
* [`get_queue_workers_with_wm_join_quarantined_2`](#get_queue_workers_with_wm_join_quarantined_2)
Expand Down Expand Up @@ -2501,7 +2501,7 @@ If the hashed session id does not exist, then an error code `P0002` will be thro
* [`expire_worker_pool_errors`](#expire_worker_pool_errors)
* [`expire_worker_pools`](#expire_worker_pools)
* [`expire_workers`](#expire_workers)
* [`get_non_stopped_workers_quntil_providers`](#get_non_stopped_workers_quntil_providers)
* [`get_non_stopped_workers_scanner`](#get_non_stopped_workers_scanner)
* [`get_queue_worker_with_wm_join_2`](#get_queue_worker_with_wm_join_2)
* [`get_queue_workers_with_wm_join`](#get_queue_workers_with_wm_join)
* [`get_queue_workers_with_wm_join_quarantined_2`](#get_queue_workers_with_wm_join_quarantined_2)
Expand Down Expand Up @@ -2641,7 +2641,7 @@ no previous_provider_ids. Returns the worker pool ids that it deletes.
Expire workers that come before `expires_in`.
Returns a count of rows that have been deleted.

### get_non_stopped_workers_quntil_providers
### get_non_stopped_workers_scanner

* *Mode*: read
* *Arguments*:
Expand All @@ -2667,15 +2667,19 @@ Returns a count of rows that have been deleted.
* `secret jsonb`
* `etag uuid`
* `quarantine_until timestamptz`
* *Last defined on version*: 71
* `first_claim timestamptz`
* `last_date_active timestamptz`
* *Last defined on version*: 87

Get non-stopped workers filtered by the optional arguments,
ordered by `worker_pool_id`, `worker_group`, and `worker_id`.
If the pagination arguments are both NULL, all rows are returned.
Otherwise, page_size rows are returned at offset `page_offset`.
The `quaratine_until` contains NULL or a date in the past if the
worker is not quarantined, otherwise the date until which it is
quaratined. `providers_filter_cond` and `providers_filter_value` used to
quaratined. `first_claim` and `last_date_active` contains information
known to the queue service about the worker.
`providers_filter_cond` and `providers_filter_value` used to
filter `=` or `<>` provider by value.

### get_queue_worker_with_wm_join_2
Expand Down Expand Up @@ -3088,3 +3092,7 @@ is added to previous_provider_ids. The return value contains values
required for an API response and previous_provider_id (singular) containing
the provider_id found before the update. If no such worker pool exists,
the return value is an empty set.

### deprecated methods

* `get_non_stopped_workers_quntil_providers(worker_pool_id_in text, worker_group_in text, worker_id_in text, providers_filter_cond text, providers_filter_value text, page_size_in integer, page_offset_in integer)` (compatibility guaranteed until v55.0.0)
72 changes: 70 additions & 2 deletions db/test/fns/worker_manager_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ suite(testing.suiteName(), function() {
}
});

const rows = await db.fns.get_non_stopped_workers_quntil_providers(null, null, null, null, null, null, null);
const rows = await db.deprecatedFns.get_non_stopped_workers_quntil_providers(
null, null, null, null, null, null, null);

assert.equal(rows.length, 6);

Expand All @@ -609,6 +610,73 @@ suite(testing.suiteName(), function() {
}
});

helper.dbTest('get non-stopped workers with queue view timestamps ', async function(db) {
const now = new Date();

let i = 0;
// we are randomly ordering the ids to make sure rows are actually coming back ordered accordingly
const randomOrderIds = [4, 6, 5, 3, 2, 7, 0, 1];
for (let state of ["requested", "running", "stopping", "stopped", "requested", "running", "stopping", "stopped"]) {
await create_worker(db, {
worker_pool_id: `wp/${randomOrderIds[i]}`,
worker_group: `group${randomOrderIds[i]}`,
worker_id: `id${randomOrderIds[i]}`,
created: now,
last_modified: now,
last_checked: now,
expires: now,
state,
});
i++;
}

const quarantineUntil = fromNow('1 hour');
const firstClaim = fromNow('-1 hour');
const lastDateActive = fromNow('-1 minute');
await helper.withDbClient(async client => {
// worker 4 is quarantined, and worker 6 has the same workerGroup/workerId as a quarantined worker
// in another pool, and thus should not appear as quarantined here
for (const [workerPoolId, workerGroup, workerId] of [
['wp/4', 'group4', 'id4'],
['wp/6', 'group6', 'id6'],
]) {
await client.query(`
insert
into queue_workers
(task_queue_id, worker_group, worker_id, recent_tasks, quarantine_until, expires, first_claim, last_date_active) values
($1, $2, $3, jsonb_build_array(), $4, now() + interval '1 hour', $5, $6)
`, [workerPoolId, workerGroup, workerId, quarantineUntil, firstClaim, lastDateActive]);
}
});

const rows = await db.fns.get_non_stopped_workers_scanner(
null, null, null, null, null, null, null);

assert.equal(rows.length, 6);

i = 0;
const nonStoppedIds = [0, 2, 4, 5, 6, 7];
for (let row of rows) {
assert.equal(row.worker_pool_id, `wp/${nonStoppedIds[i]}`);
assert.equal(row.worker_group, `group${nonStoppedIds[i]}`);
assert.equal(row.worker_id, `id${nonStoppedIds[i]}`);
assert.equal(row.provider_id, 'provider');
assert(row.state !== 'stopped');
assert.equal(row.created.toJSON(), now.toJSON());
assert.equal(row.expires.toJSON(), now.toJSON());
assert.equal(row.last_modified.toJSON(), now.toJSON());
assert.equal(row.last_checked.toJSON(), now.toJSON());
assert.equal(row.capacity, 1);
assert.deepEqual(row.provider_data, { providerdata: true });
assert(row.secret !== undefined);
assert(row.etag !== undefined);
assert.deepEqual(row.quarantine_until, ['id4', 'id6'].includes(row.worker_id) ? quarantineUntil : null);
assert.equal(row.first_claim?.toJSON(), ['id4', 'id6'].includes(row.worker_id) ? firstClaim.toJSON() : undefined);
assert.equal(row.last_date_active?.toJSON(), ['id4', 'id6'].includes(row.worker_id) ? lastDateActive.toJSON() : undefined);
i++;
}
});

helper.dbTest('get non-stopped workers by provider', async function(db) {
const now = new Date();

Expand Down Expand Up @@ -640,7 +708,7 @@ suite(testing.suiteName(), function() {
];

for (const run of testRuns) {
const rows = await db.fns.get_non_stopped_workers_quntil_providers(
const rows = await db.deprecatedFns.get_non_stopped_workers_quntil_providers(
null, null, null, run.providers_filter_cond, run.providers_filter_value, null, null);

assert.equal(rows.length, run.expected_count);
Expand Down
6 changes: 6 additions & 0 deletions db/test/versions/0087_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const testing = require('taskcluster-lib-testing');

suite(testing.suiteName(), function() {
// this version only updates method,
// to trigger index usage, so no tests are needed
});
64 changes: 64 additions & 0 deletions db/versions/0087.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
version: 87
description: add queue information to get_non_stopped_workers_quntil_providers
methods:
get_non_stopped_workers_quntil_providers:
deprecated: true
get_non_stopped_workers_scanner:
description: |-
Get non-stopped workers filtered by the optional arguments,
ordered by `worker_pool_id`, `worker_group`, and `worker_id`.
If the pagination arguments are both NULL, all rows are returned.
Otherwise, page_size rows are returned at offset `page_offset`.
The `quaratine_until` contains NULL or a date in the past if the
worker is not quarantined, otherwise the date until which it is
quaratined. `first_claim` and `last_date_active` contains information
known to the queue service about the worker.
`providers_filter_cond` and `providers_filter_value` used to
filter `=` or `<>` provider by value.
mode: read
serviceName: worker_manager
args: worker_pool_id_in text, worker_group_in text, worker_id_in text, providers_filter_cond text, providers_filter_value text, page_size_in integer, page_offset_in integer
returns: table(worker_pool_id text, worker_group text, worker_id text, provider_id text, created timestamptz, expires timestamptz, state text, provider_data jsonb, capacity integer, last_modified timestamptz, last_checked timestamptz, secret jsonb, etag uuid, quarantine_until timestamptz, first_claim timestamptz, last_date_active timestamptz)
body: |-
begin
return query
select
workers.worker_pool_id,
workers.worker_group,
workers.worker_id,
workers.provider_id,
workers.created,
workers.expires,
workers.state,
workers.provider_data,
workers.capacity,
workers.last_modified,
workers.last_checked,
workers.secret,
workers.etag,
queue_workers.quarantine_until,
queue_workers.first_claim,
queue_workers.last_date_active
from
workers
left join queue_workers on
workers.worker_pool_id = queue_workers.task_queue_id and
workers.worker_id = queue_workers.worker_id and
workers.worker_group = queue_workers.worker_group
where
(workers.worker_pool_id = worker_pool_id_in or worker_pool_id_in is null) and
(workers.worker_group = worker_group_in or worker_group_in is null) and
(workers.worker_id = worker_id_in or worker_id_in is null) and
(workers.state <> 'stopped') and
(providers_filter_cond is null or providers_filter_value is null or
case
when providers_filter_cond = '='
then workers.provider_id = ANY(string_to_array(providers_filter_value, ','))
when providers_filter_cond = '<>'
then workers.provider_id <> ALL(string_to_array(providers_filter_value, ','))
end
)
order by worker_pool_id, worker_group, worker_id
limit get_page_limit(page_size_in)
offset get_page_offset(page_offset_in);
end
1 change: 1 addition & 0 deletions db/versions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ If this invariant is violated, it must be indicated clearly in the changelog.
| [0084](./0084.yml) | v49.1.0 | Fetch expired artifacts |
| [0085](./0085.yml) | v50.1.0 | Store pull request number in github builds table |
| [0086](./0086.yml) | v50.1.2 | Filter github builds in db |
| [0087](./0087.yml) | (pending release) | add queue information to get_non_stopped_workers_quntil_providers |
<!-- AUTOGENERATED DO NOT EDIT - END -->
3 changes: 2 additions & 1 deletion dev-docs/creating-a-worker-ami.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ Be sure to select the AWS provider that we created earlier.
{
"lifecycle": {
"registrationTimeout": 1800,
"reregistrationTimeout": 345600
"reregistrationTimeout": 345600,
"queueInactivityTimeout": 7200
},
"maxCapacity": 5, # The maximum amount of capacity this pool may have.
"minCapacity": 0, # The minimum amount of capacity this pool will have.
Expand Down
18 changes: 18 additions & 0 deletions generated/db-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions generated/docs-search.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions generated/references.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions services/worker-manager/schemas/v1/worker-lifecycle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,22 @@ properties:
will be terminated. This value also dictates the lifetime of the
temporary credentials granted to the worker, meaning that it must be
less than 30 days. The default value is 4 days.
queueInactivityTimeout:
title: Queue Inactivity Timeout
type: integer
minimum: 60 # 1 minute
default: 7200 # 2 hours
description: |
In order to prevent workers from being stuck without doing any work,
`queueInactivityTimeout` controls how long a worker can do something other than
working on a task. If worker process dies, or it stops calling `claimWork`
or `reclaimTask` it should be considered dead and terminated.
Minimum allowed value is 1 minute, to prevent workers from being terminated
while they are starting up or rebooting between the tasks.
This timeout has no affect quarantined workers,
as they are still calling `claimWork`.
Static workers are also unaffected.
additionalProperties: false
required: []
9 changes: 8 additions & 1 deletion services/worker-manager/src/providers/aws.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ class AwsProvider extends Provider {
return; // Nothing to do
}

const { terminateAfter, reregistrationTimeout } = Provider.interpretLifecycle(workerPool.config);
const {
terminateAfter, reregistrationTimeout, queueInactivityTimeout,
} = Provider.interpretLifecycle(workerPool.config);

const toSpawnPerConfig = Math.ceil(toSpawn / workerPool.config.launchConfigs.length);
const shuffledConfigs = _.shuffle(workerPool.config.launchConfigs);
Expand Down Expand Up @@ -260,6 +262,7 @@ class AwsProvider extends Provider {
stateReason: i.StateReason.Message,
terminateAfter,
reregistrationTimeout,
queueInactivityTimeout,
workerConfig: config.workerConfig || {},
},
});
Expand Down Expand Up @@ -359,6 +362,10 @@ class AwsProvider extends Provider {
await this.removeWorker({ worker, reason: 'terminateAfter time exceeded' });
}
}
const { isZombie, reason } = Provider.isZombie({ worker });
if (isZombie) {
await this.removeWorker({ worker, reason });
}
} catch (e) {
if (e.code !== 'InvalidInstanceID.NotFound') { // aws throws this error for instances that had been terminated, too
throw e;
Expand Down
11 changes: 10 additions & 1 deletion services/worker-manager/src/providers/azure/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ class AzureProvider extends Provider {
return; // Nothing to do
}

const { terminateAfter, reregistrationTimeout } = Provider.interpretLifecycle(workerPool.config);
const {
terminateAfter, reregistrationTimeout, queueInactivityTimeout,
} = Provider.interpretLifecycle(workerPool.config);

const cfgs = [];
while (toSpawn > 0) {
Expand Down Expand Up @@ -443,6 +445,7 @@ class AzureProvider extends Provider {
...providerData,
terminateAfter,
reregistrationTimeout,
queueInactivityTimeout,
},
});
await worker.create(this.db);
Expand Down Expand Up @@ -1088,6 +1091,12 @@ class AzureProvider extends Provider {
}
}

const { isZombie, reason } = Provider.isZombie({ worker });
if (isZombie) {
await this.removeWorker({ worker, reason });
break;
}

// Call provisionResources to allow it to finish up gathering data about the
// vm. This becomes a no-op once all required operations are complete.
await this.provisionResources({ worker, monitor });
Expand Down
Loading

0 comments on commit 0f4ed17

Please sign in to comment.