Skip to content

Commit

Permalink
mobile grpc: expose cancel method on a stream (#24780)
Browse files Browse the repository at this point in the history
Commit Message: Expose `cancel` method on a gRPC stream.
Additional Description:
Risk Level: Low, new API
Testing: Unit/integration tests
Docs Changes:
Release Notes:
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional [API Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):]
  • Loading branch information
Augustyniak committed Jan 6, 2023
1 parent bfa849b commit 2cdaac4
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 2 deletions.
1 change: 1 addition & 0 deletions mobile/docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Breaking changes:
- kotlin: always use ``getaddrinfo`` DNS resolver. Remove ``addDNSFallbackNameservers``, ``enableDNSFilterUnroutableFamilies``, and ``enableDNSUseSystemResolver`` methods from the Kotlin engine builder. (:issue:`#2618 <2618>`)
- Envoy Mobile's release builds compile without admin support by default. (``--define=admin_functionality=disabled``) (:issue`#2693 <2693>`)
- swift/kotlin: remove `gauge`, `timer`, and `distribution` methods from the PulseClient.
- swift/kotlin: add `cancel` method to `GRPCStream`` type (:issue:`#24780 <24780>`).

Bugfixes:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,23 @@ class GRPCStream(
}

/**
* Close this connection.
* Close stream by sending the "end stream" signal to the peer and
* then waiting for the peer to finish before actually closing the stream.
*/
fun close() {
// TODO(Augustyniak): Remove the method once `cancel` method is proved
// to work fine.

// The gRPC protocol requires the client stream to close with a DATA frame.
// More information here:
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
underlyingStream.close(ByteBuffer.allocate(0))
}

/**
* Cancel the stream forcefully regardless of whether the peer has more data to send.
*/
fun cancel() {
underlyingStream.cancel()
}
}
11 changes: 10 additions & 1 deletion mobile/library/swift/grpc/GRPCStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,20 @@ public final class GRPCStream: NSObject {
return self
}

/// Close this connection.
/// Close stream by sending the "end stream" signal to the peer and
/// then waiting for the peer to finish before actually closing the stream.
public func close() {
// TODO(Augustyniak): Remove the method once `cancel` method is proved
// to work fine.

// The gRPC protocol requires the client stream to close with a DATA frame.
// More information here:
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
self.underlyingStream.close(data: Data())
}

/// Cancel the stream forcefully regardless of whether the peer has more data to send.
public func cancel() {
self.underlyingStream.cancel()
}
}
14 changes: 14 additions & 0 deletions mobile/test/kotlin/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ envoy_mobile_jni_kt_test(
],
)

envoy_mobile_jni_kt_test(
name = "cancel_grpc_stream_test",
srcs = [
"CancelGRPCStreamTest.kt",
],
native_deps = [
"//library/common/jni:libjava_jni_lib.so",
"//library/common/jni:java_jni_lib.jnilib",
],
deps = [
"//library/kotlin/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_jni_kt_test(
name = "reset_connectivity_state_test",
srcs = [
Expand Down
149 changes: 149 additions & 0 deletions mobile/test/kotlin/integration/CancelGRPCStreamTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package test.kotlin.integration

import io.envoyproxy.envoymobile.Custom
import io.envoyproxy.envoymobile.EngineBuilder
import io.envoyproxy.envoymobile.EnvoyError
import io.envoyproxy.envoymobile.FilterDataStatus
import io.envoyproxy.envoymobile.FilterHeadersStatus
import io.envoyproxy.envoymobile.FilterTrailersStatus
import io.envoyproxy.envoymobile.FinalStreamIntel
import io.envoyproxy.envoymobile.GRPCClient
import io.envoyproxy.envoymobile.GRPCRequestHeadersBuilder
import io.envoyproxy.envoymobile.RequestMethod
import io.envoyproxy.envoymobile.ResponseFilter
import io.envoyproxy.envoymobile.ResponseHeaders
import io.envoyproxy.envoymobile.ResponseTrailers
import io.envoyproxy.envoymobile.StreamIntel
import io.envoyproxy.envoymobile.engine.JniLibrary
import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test

private val filterName = "cancel_validation_filter"
private val config =
"""
static_resources:
listeners:
- name: base_api_listener
address:
socket_address: { protocol: TCP, address: 0.0.0.0, port_value: 10000 }
api_listener:
api_listener:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.EnvoyMobileHttpConnectionManager
config:
stat_prefix: api_hcm
route_config:
name: api_router
virtual_hosts:
- name: api
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: fake_remote }
http_filters:
- name: envoy.filters.http.local_error
typed_config:
"@type": type.googleapis.com/envoymobile.extensions.filters.http.local_error.LocalError
- name: envoy.filters.http.platform_bridge
typed_config:
"@type": type.googleapis.com/envoymobile.extensions.filters.http.platform_bridge.PlatformBridge
platform_filter_name: $filterName
- name: envoy.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: fake_remote
connect_timeout: 0.25s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_remote
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: { address: 127.0.0.1, port_value: ${(10001..11000).random()} }
"""

class CancelGRPCStreamTest {

init {
JniLibrary.loadTestLibrary()
}

private val filterExpectation = CountDownLatch(1)
private val onCancelCallbackExpectation = CountDownLatch(1)

class CancelValidationFilter(
private val latch: CountDownLatch
) : ResponseFilter {
override fun onResponseHeaders(
headers: ResponseHeaders,
endStream: Boolean,
streamIntel: StreamIntel
): FilterHeadersStatus<ResponseHeaders> {
return FilterHeadersStatus.Continue(headers)
}

override fun onResponseData(
body: ByteBuffer,
endStream: Boolean,
streamIntel: StreamIntel
): FilterDataStatus<ResponseHeaders> {
return FilterDataStatus.Continue(body)
}

override fun onResponseTrailers(
trailers: ResponseTrailers,
streamIntel: StreamIntel
): FilterTrailersStatus<ResponseHeaders, ResponseTrailers> {
return FilterTrailersStatus.Continue(trailers)
}

override fun onError(error: EnvoyError, finalStreamIntel: FinalStreamIntel) {}
override fun onComplete(finalStreamIntel: FinalStreamIntel) {}

override fun onCancel(finalStreamIntel: FinalStreamIntel) {
latch.countDown()
}
}

@Test
fun `cancel grpc stream calls onCancel callback`() {
val engine = EngineBuilder(Custom(config))
.addPlatformFilter(
name = filterName,
factory = { CancelValidationFilter(filterExpectation) }
)
.setOnEngineRunning {}
.build()

val client = GRPCClient(engine.streamClient())

val requestHeaders = GRPCRequestHeadersBuilder(
scheme = "https",
authority = "example.com",
path = "/test"
)
.build()

client.newGRPCStreamPrototype()
.setOnCancel { _ ->
onCancelCallbackExpectation.countDown()
}
.start(Executors.newSingleThreadExecutor())
.sendHeaders(requestHeaders, false)
.cancel()

filterExpectation.await(10, TimeUnit.SECONDS)
onCancelCallbackExpectation.await(10, TimeUnit.SECONDS)

engine.terminate()

assertThat(filterExpectation.count).isEqualTo(0)
assertThat(onCancelCallbackExpectation.count).isEqualTo(0)
}
}
18 changes: 18 additions & 0 deletions mobile/test/kotlin/io/envoyproxy/envoymobile/GRPCStreamTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test

Expand Down Expand Up @@ -92,6 +93,23 @@ class GRPCStreamTest {
assertThat(closedData).isEqualTo(ByteBuffer.allocate(0))
}

@Test
fun `cancel calls a stream callback`() {
val countDownLatch = CountDownLatch(1)
val streamClient = MockStreamClient { stream ->
stream.onCancel = {
countDownLatch.countDown()
}
}

GRPCClient(streamClient)
.newGRPCStreamPrototype()
.start(Executor {})
.cancel()

assertThat(countDownLatch.await(2000, TimeUnit.MILLISECONDS)).isTrue()
}

// Response tests

@Test(timeout = 1000L)
Expand Down
14 changes: 14 additions & 0 deletions mobile/test/swift/GRPCStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ final class GRPCStreamTests: XCTestCase {
XCTAssertEqual(Data(), closedData)
}

func testCancelCallsStreamCallback() {
let expectation = self.expectation(description: "onCancel callback is called")
let streamClient = MockStreamClient { stream in
stream.onCancel = expectation.fulfill
}

GRPCClient(streamClient: streamClient)
.newGRPCStreamPrototype()
.start()
.cancel()

self.waitForExpectations(timeout: 0.1)
}

// MARK: - Response tests

func testHeadersCallbackPassesHeaders() {
Expand Down
15 changes: 15 additions & 0 deletions mobile/test/swift/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ envoy_mobile_swift_test(
],
)

envoy_mobile_swift_test(
name = "cancel_grpc_stream_test",
srcs = [
"CancelGRPCStreamTest.swift",
],
# TODO(jpsim): Fix remote execution for these tests
tags = [
"no-remote-exec",
],
visibility = ["//visibility:public"],
deps = [
"//library/objective-c:envoy_engine_objc_lib",
],
)

envoy_cc_library(
name = "test_extensions_cc",
srcs = [
Expand Down
Loading

0 comments on commit 2cdaac4

Please sign in to comment.