Skip to content

Commit

Permalink
Revert "[ package:dds ] Add locking when modifying DDS state via clie…
Browse files Browse the repository at this point in the history
…nt requests"

This reverts commit 0ecfc7d.

Reason for revert: dart-lang#46826

Original change's description:
> [ package:dds ] Add locking when modifying DDS state via client requests
>
> Fixes dart-lang#46696
>
> Change-Id: I666b59a0661f4df3b1f0a47aba52096133f5fbb7
> Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/209140
> Reviewed-by: Anna Gringauze <annagrin@google.com>

TBR=bkonyi@google.com,annagrin@google.com

Change-Id: Iec89181372a2fc1b8a461e616bbcd23dd6bbd72d
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/209280
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
  • Loading branch information
bkonyi authored and commit-bot@chromium.org committed Aug 5, 2021
1 parent 11fafd0 commit 0b0bb99
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 218 deletions.
4 changes: 2 additions & 2 deletions .dart_tool/package_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"constraint, update this by running tools/generate_package_config.dart."
],
"configVersion": 2,
"generated": "2021-08-05T11:33:04.746536",
"generated": "2021-08-04T16:42:24.433381",
"generator": "tools/generate_package_config.dart",
"packages": [
{
Expand Down Expand Up @@ -256,7 +256,7 @@
"name": "dds",
"rootUri": "../pkg/dds",
"packageUri": "lib/",
"languageVersion": "2.14"
"languageVersion": "2.12"
},
{
"name": "dev_compiler",
Expand Down
115 changes: 50 additions & 65 deletions pkg/dds/lib/src/isolate_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:dds/src/utils/mutex.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;

import 'client.dart';
Expand Down Expand Up @@ -140,67 +139,58 @@ class IsolateManager {
}

void _updateIsolateState(String id, String name, String eventKind) {
_mutex.runGuarded(
() {
switch (eventKind) {
case ServiceEvents.isolateStart:
isolateStarted(id, name);
break;
case ServiceEvents.isolateExit:
isolateExited(id);
break;
default:
final isolate = isolates[id];
switch (eventKind) {
case ServiceEvents.isolateStart:
isolateStarted(id, name);
case ServiceEvents.pauseExit:
isolate!.pausedOnExit();
break;
case ServiceEvents.isolateExit:
isolateExited(id);
case ServiceEvents.pausePostRequest:
isolate!.pausedPostRequest();
break;
case ServiceEvents.pauseStart:
isolate!.pausedOnStart();
break;
case ServiceEvents.resume:
isolate!.resumed();
break;
default:
final isolate = isolates[id];
switch (eventKind) {
case ServiceEvents.pauseExit:
isolate!.pausedOnExit();
break;
case ServiceEvents.pausePostRequest:
isolate!.pausedPostRequest();
break;
case ServiceEvents.pauseStart:
isolate!.pausedOnStart();
break;
case ServiceEvents.resume:
isolate!.resumed();
break;
default:
break;
}
break;
}
},
);
}
}

/// Initializes the set of running isolates.
Future<void> initialize() async {
if (_initialized) {
return;
}
await _mutex.runGuarded(
() async {
final vm = await dds.vmServiceClient.sendRequest('getVM');
final List<Map> isolateRefs =
vm['isolates'].cast<Map<String, dynamic>>();
// Check the pause event for each isolate to determine whether or not the
// isolate is already paused.
for (final isolateRef in isolateRefs) {
final id = isolateRef['id'];
final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
'isolateId': id,
});
final name = isolate['name'];
if (isolate.containsKey('pauseEvent')) {
isolates[id] = _RunningIsolate(this, id, name);
final eventKind = isolate['pauseEvent']['kind'];
_updateIsolateState(id, name, eventKind);
} else {
// If the isolate doesn't have a pauseEvent, assume it's running.
isolateStarted(id, name);
}
}
},
);
final vm = await dds.vmServiceClient.sendRequest('getVM');
final List<Map> isolateRefs = vm['isolates'].cast<Map<String, dynamic>>();
// Check the pause event for each isolate to determine whether or not the
// isolate is already paused.
for (final isolateRef in isolateRefs) {
final id = isolateRef['id'];
final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
'isolateId': id,
});
final name = isolate['name'];
if (isolate.containsKey('pauseEvent')) {
isolates[id] = _RunningIsolate(this, id, name);
final eventKind = isolate['pauseEvent']['kind'];
_updateIsolateState(id, name, eventKind);
} else {
// If the isolate doesn't have a pauseEvent, assume it's running.
isolateStarted(id, name);
}
}
_initialized = true;
}

Expand Down Expand Up @@ -228,20 +218,16 @@ class IsolateManager {
DartDevelopmentServiceClient client,
json_rpc.Parameters parameters,
) async {
return await _mutex.runGuarded(
() async {
final isolateId = parameters['isolateId'].asString;
final isolate = isolates[isolateId];
if (isolate == null) {
return RPCResponses.collectedSentinel;
}
if (isolate.shouldResume(resumingClient: client)) {
isolate.clearResumeApprovals();
return await _sendResumeRequest(isolateId, parameters);
}
return RPCResponses.success;
},
);
final isolateId = parameters['isolateId'].asString;
final isolate = isolates[isolateId];
if (isolate == null) {
return RPCResponses.collectedSentinel;
}
if (isolate.shouldResume(resumingClient: client)) {
isolate.clearResumeApprovals();
return await _sendResumeRequest(isolateId, parameters);
}
return RPCResponses.success;
}

/// Forwards a `resume` request to the VM service.
Expand All @@ -262,6 +248,5 @@ class IsolateManager {

bool _initialized = false;
final DartDevelopmentServiceImpl dds;
final _mutex = Mutex();
final Map<String, _RunningIsolate> isolates = {};
}
138 changes: 63 additions & 75 deletions pkg/dds/lib/src/stream_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import 'client.dart';
import 'dds_impl.dart';
import 'logging_repository.dart';
import 'rpc_error_codes.dart';
import 'utils/mutex.dart';

class StreamManager {
StreamManager(this.dds);
Expand Down Expand Up @@ -134,56 +133,51 @@ class StreamManager {
DartDevelopmentServiceClient? client,
String stream,
) async {
await _mutex.runGuarded(
() async {
assert(stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// Initialize the list of clients for the new stream before we do
// anything else to ensure multiple clients registering for the same
// stream in quick succession doesn't result in multiple streamListen
// requests being sent to the VM service.
streamListeners[stream] = <DartDevelopmentServiceClient>[];
if ((stream == kDebugStream && client == null) ||
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
assert(stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// Initialize the list of clients for the new stream before we do
// anything else to ensure multiple clients registering for the same
// stream in quick succession doesn't result in multiple streamListen
// requests being sent to the VM service.
streamListeners[stream] = <DartDevelopmentServiceClient>[];
if ((stream == kDebugStream && client == null) ||
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
}
}
if (streamListeners[stream]!.contains(client)) {
throw kStreamAlreadySubscribedException;
}
if (client != null) {
streamListeners[stream]!.add(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream]!.sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
// Send all previously registered service extensions when a client
// subscribes to the Service stream.
for (final c in dds.clientManager.clients) {
if (c == client) {
continue;
}
}
if (streamListeners[stream]!.contains(client)) {
throw kStreamAlreadySubscribedException;
}
if (client != null) {
streamListeners[stream]!.add(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream]!.sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
// Send all previously registered service extensions when a client
// subscribes to the Service stream.
for (final c in dds.clientManager.clients) {
if (c == client) {
continue;
}
final namespace = dds.getNamespace(c);
for (final service in c.services.keys) {
client.sendNotification(
'streamNotify',
_buildStreamRegisteredEvent(
namespace!,
service,
c.services[service]!,
),
);
}
}
final namespace = dds.getNamespace(c);
for (final service in c.services.keys) {
client.sendNotification(
'streamNotify',
_buildStreamRegisteredEvent(
namespace!,
service,
c.services[service]!,
),
);
}
}
},
);
}
}
}

List<Map<String, dynamic>>? getStreamHistory(String stream) {
Expand All @@ -204,32 +198,27 @@ class StreamManager {
String stream, {
bool cancelCoreStream = false,
}) async {
await _mutex.runGuarded(
() async {
assert(stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null ||
client != null && !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
// Don't cancel streams DDS needs to function.
if (listeners.isEmpty &&
(!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
streamListeners.remove(stream);
// Ensure the VM service hasn't shutdown.
if (dds.vmServiceClient.isClosed) {
return;
}
final result = await dds.vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
},
);
assert(stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || client != null && !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
// Don't cancel streams DDS needs to function.
if (listeners.isEmpty &&
(!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
streamListeners.remove(stream);
// Ensure the VM service hasn't shutdown.
if (dds.vmServiceClient.isClosed) {
return;
}
final result = await dds.vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}

/// Cleanup stream subscriptions for `client` when it has disconnected.
Expand Down Expand Up @@ -291,5 +280,4 @@ class StreamManager {

final DartDevelopmentServiceImpl dds;
final streamListeners = <String, List<DartDevelopmentServiceClient>>{};
final _mutex = Mutex();
}
48 changes: 0 additions & 48 deletions pkg/dds/lib/src/utils/mutex.dart

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/dds/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ version: 2.0.2
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds

environment:
sdk: '>=2.14.0 <3.0.0'
sdk: '>=2.12.0 <3.0.0'

dependencies:
async: ^2.4.1
Expand Down
Loading

0 comments on commit 0b0bb99

Please sign in to comment.