Skip to content

Commit ef2209c

Browse files
committed
binder: Add unit tests for ClientInbound's message reassembly
1 parent b02268f commit ef2209c

File tree

1 file changed

+97
-0
lines changed

1 file changed

+97
-0
lines changed

binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest;
6161
import io.grpc.internal.AbstractTransportTest;
6262
import io.grpc.internal.ClientStream;
63+
import io.grpc.internal.ClientStreamListenerBase;
6364
import io.grpc.internal.ClientTransport;
6465
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
6566
import io.grpc.internal.ConnectionClientTransport;
@@ -547,6 +548,102 @@ public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception {
547548
assertThat(methodDescriptor.parseResponse(msgStream)).isEqualTo(largeMessage);
548549
}
549550

551+
@Test
552+
public void singleTxnMsgsDeliveredToClientOutOfOrder() throws Exception {
553+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
554+
registerServerWithRobolectric((BinderServer) server);
555+
server.start(serverListener);
556+
557+
client = newClientTransportBuilder().build();
558+
runIfNotNull(client.start(mockClientTransportListener));
559+
560+
OneWayBinderProxy clientProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);
561+
assertThat(clientProxy).isNotNull();
562+
QueueingOneWayBinderProxy proxy = new QueueingOneWayBinderProxy(clientProxy);
563+
blockingDecorator.putNextResult(proxy);
564+
565+
// Deliver the handshake transaction!
566+
QueueingOneWayBinderProxy.Transaction setupTx = takeNextTransaction(proxy);
567+
proxy.deliver(setupTx);
568+
569+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
570+
571+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
572+
ClientStream stream =
573+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
574+
stream.start(clientStreamListener);
575+
stream.request(2);
576+
577+
MockServerTransportListener serverTransportListener =
578+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
579+
MockServerTransportListener.StreamCreation streamCreation =
580+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
581+
582+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse("one"));
583+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse("two"));
584+
streamCreation.stream.flush();
585+
586+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(proxy);
587+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(proxy);
588+
589+
proxy.deliver(tx2);
590+
proxy.deliver(tx1);
591+
592+
InputStream msg1 = clientStreamListener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS);
593+
assertThat(msg1).isNotNull();
594+
assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one");
595+
596+
InputStream msg2 = clientStreamListener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS);
597+
assertThat(msg2).isNotNull();
598+
assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two");
599+
}
600+
601+
@Test
602+
public void msgFragmentsDeliveredToClientOutOfOrder() throws Exception {
603+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
604+
registerServerWithRobolectric((BinderServer) server);
605+
server.start(serverListener);
606+
607+
client = newClientTransportBuilder().build();
608+
runIfNotNull(client.start(mockClientTransportListener));
609+
610+
OneWayBinderProxy clientProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);
611+
assertThat(clientProxy).isNotNull();
612+
QueueingOneWayBinderProxy proxy = new QueueingOneWayBinderProxy(clientProxy);
613+
blockingDecorator.putNextResult(proxy);
614+
615+
// Deliver the handshake transaction!
616+
QueueingOneWayBinderProxy.Transaction setupTx = takeNextTransaction(proxy);
617+
proxy.deliver(setupTx);
618+
619+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
620+
621+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
622+
ClientStream stream =
623+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
624+
stream.start(clientStreamListener);
625+
stream.request(1);
626+
627+
MockServerTransportListener serverTransportListener =
628+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
629+
MockServerTransportListener.StreamCreation streamCreation =
630+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
631+
632+
String largeMessage = newStringOfLength(20 * 1024);
633+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse(largeMessage));
634+
streamCreation.stream.flush();
635+
636+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(proxy);
637+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(proxy);
638+
639+
proxy.deliver(tx2);
640+
proxy.deliver(tx1);
641+
642+
InputStream msgStream = clientStreamListener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS);
643+
assertThat(msgStream).isNotNull();
644+
assertThat(methodDescriptor.parseResponse(msgStream)).isEqualTo(largeMessage);
645+
}
646+
550647
private static QueueingOneWayBinderProxy.Transaction takeNextTransaction(
551648
QueueingOneWayBinderProxy proxy) throws InterruptedException {
552649
QueueingOneWayBinderProxy.Transaction tx = proxy.pollNextTransaction(TIMEOUT_MS, MILLISECONDS);

0 commit comments

Comments
 (0)