Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
50b0efd
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
b0fdd5d
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
61c831e
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
c0c60cf
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
9239195
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
61a2d68
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
c391c0b
CE-135 okhttp -> jetty http client
nofateg Feb 8, 2026
2ea0e51
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
1864b99
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
9dc4197
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
3840d51
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
9af3119
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
7b41708
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
d6d82c9
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
42b8f57
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
0362b4d
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
ba2373a
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
3b0b5d4
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
dc8de49
CE-135 okhttp -> jetty http client
nofateg Feb 9, 2026
5d1f54c
CE-135 okhttp -> jetty http client
nofateg Feb 11, 2026
69a3548
CE-135 okhttp -> jetty http client
nofateg Feb 12, 2026
db3c0d2
CE-135 okhttp -> jetty http client
nofateg Feb 12, 2026
069f20a
CE-135 okhttp -> jetty http client
nofateg Feb 12, 2026
014ed34
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
aed8883
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
7e36124
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
a11f518
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
4b4d942
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
e8887d5
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
79d4032
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
6f9e743
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
73692ce
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
63c6701
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
5ba467e
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
2f829c1
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
db3b15c
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
33ca723
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
5a133b4
CE-135 okhttp -> jetty http client
nofateg Feb 13, 2026
4e5c5f6
CE-135 okhttp -> jetty http client
nofateg Feb 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@
import oap.application.module.Module;
import oap.testng.Fixtures;
import oap.testng.Ports;
import oap.util.Dates;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

import java.net.ConnectException;
import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static oap.testng.Asserts.urlOfTestResource;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -49,6 +48,7 @@
public class RemoteTest extends Fixtures {
@Test
public void invoke() {
Assertions.setMaxStackTraceElementsDisplayed( 1024 );
int port = Ports.getFreePort( getClass() );

List<URL> modules = Module.CONFIGURATION.urlsFromClassPath();
Expand Down Expand Up @@ -78,33 +78,10 @@ public void invoke() {
assertThat( kernel.<RemoteClient>service( "*.remote-client-unreachable" ) )
.isPresent()
.get()
.satisfies( remote -> assertThatThrownBy( remote::accessible ).isInstanceOf( IllegalArgumentException.class ) );
}
}

@Test
public void testAsync() {
int port = Ports.getFreePort( getClass() );

List<URL> modules = Module.CONFIGURATION.urlsFromClassPath();
modules.add( urlOfTestResource( getClass(), "module.oap" ) );
try( Kernel kernel = new Kernel( modules ) ) {
kernel.start( ApplicationConfiguration.load( urlOfTestResource( RemoteTest.class, "application-remote.conf" ),
List.of(),
Map.of( "HTTP_PORT", port ) ) );

Optional<RemoteClient> service = kernel.service( "*.remote-client" );
assertThat( service ).isPresent();
assertThat( service )
.get()
.satisfies( remote -> {
CompletableFuture<Boolean> actual = remote.accessibleAsync();
long timeStart = System.currentTimeMillis();
assertThat( actual ).succeedsWithin( Duration.ofSeconds( 10 ) ).isEqualTo( true );
long timeEnd = System.currentTimeMillis();

assertThat( timeEnd - timeStart ).isGreaterThanOrEqualTo( Dates.s( 2 ) );
} );
.satisfies( remote -> assertThatThrownBy( remote::accessible )
.isInstanceOf( RuntimeException.class )
.hasCauseInstanceOf( ConnectException.class )
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,66 +24,46 @@
package oap.application.remote;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import oap.application.ServiceKernelCommand;
import oap.application.module.Reference;
import oap.http.client.OapHttpClient;
import oap.util.Result;
import oap.util.Stream;
import oap.util.function.Try;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.jetbrains.annotations.NotNull;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.InputStreamResponseListener;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.http.HttpMethod;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.net.HttpURLConnection.HTTP_OK;

@Slf4j
public final class RemoteInvocationHandler implements InvocationHandler {
public static final ExecutorService NEW_SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();
private static final OkHttpClient globalClient;
private static final SimpleTimeLimiter SIMPLE_TIME_LIMITER = SimpleTimeLimiter.create( NEW_SINGLE_THREAD_EXECUTOR );

static {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests( 1024 );
dispatcher.setMaxRequestsPerHost( 1024 );
builder.dispatcher( dispatcher );
globalClient = builder.build();
}

private final Counter timeoutMetrics;
private final Counter errorMetrics;
private final Counter successMetrics;
Expand Down Expand Up @@ -116,17 +96,7 @@ public static Object proxy( String source, RemoteLocation remote, Class<?> clazz
}

private static Object proxy( String source, URI uri, String service, Class<?> clazz, long timeout ) {
return Proxy.newProxyInstance( clazz.getClassLoader(), new Class[] { clazz },
new RemoteInvocationHandler( source, uri, service, timeout ) );
}

@NotNull
private static CompletionStage<Result<Object, Throwable>> retException( Throwable e, boolean async ) {
if( async ) {
return CompletableFuture.failedStage( e );
} else {
return CompletableFuture.completedStage( Result.failure( e ) );
}
return Proxy.newProxyInstance( clazz.getClassLoader(), new Class[] { clazz }, new RemoteInvocationHandler( source, uri, service, timeout ) );
}

@Override
Expand All @@ -149,7 +119,11 @@ public Object invoke( Object proxy, Method method, Object[] args ) throws Throwa
if( result.isSuccess() ) {
return result.successValue;
} else {
throw result.failureValue;
if( result.failureValue instanceof RuntimeException ) {
throw result.failureValue;
} else {
throw new RuntimeException( result.failureValue );
}
}
}

Expand All @@ -164,128 +138,88 @@ private Result<?, Throwable> invoke( Method method, Object[] args ) {

byte[] invocationB = getInvocation( method, arguments );

boolean async = CompletableFuture.class.isAssignableFrom( method.getReturnType() );

try {
OkHttpClient client = globalClient.newBuilder().callTimeout( Duration.ofMillis( timeout ) ).build();
Request request = new Request.Builder()
.url( uri.toURL() )
.post( RequestBody.create( invocationB ) )
.build();
Call call = client.newCall( request );

CompletableFuture<Response> responseAsync = new CompletableFuture<>();

if( async ) {
call.enqueue( new Callback() {
@Override
public void onFailure( @NotNull Call call, @NotNull IOException e ) {
responseAsync.completeExceptionally( e );
}
InputStreamResponseListener inputStreamResponseListener = new InputStreamResponseListener();

@Override
public void onResponse( @NotNull Call call, @NotNull Response response ) {
responseAsync.complete( response );
}
} );
} else {
try {
responseAsync.complete( call.execute() );
} catch( IOException e ) {
responseAsync.completeExceptionally( e );
}
}
Request request = OapHttpClient.DEFAULT_HTTP_CLIENT
.newRequest( uri )
.method( HttpMethod.POST )
.body( new BytesRequestContent( invocationB ) )
.timeout( timeout, TimeUnit.MILLISECONDS );

CompletableFuture<Result<Object, Throwable>> ret = responseAsync.thenCompose( response -> {
try {
if( response.code() == HTTP_OK ) {
InputStream inputStream = response.body().byteStream();
BufferedInputStream bis = new BufferedInputStream( inputStream );
DataInputStream dis = new DataInputStream( bis );
boolean success = dis.readBoolean();

try {
if( !success ) {
try {
Throwable throwable = FstConsts.readObjectWithSize( dis );
request.send( inputStreamResponseListener );

if( throwable instanceof RemoteInvocationException riex ) {
errorMetrics.increment();
return retException( riex, async );
}
try {
Response response = inputStreamResponseListener.get( timeout, TimeUnit.MILLISECONDS );

if( response.getStatus() == HTTP_OK ) {
InputStream inputStream = inputStreamResponseListener.getInputStream();
BufferedInputStream bis = new BufferedInputStream( inputStream );
DataInputStream dis = new DataInputStream( bis );
boolean success = dis.readBoolean();

try {
if( !success ) {
try {
Throwable throwable = FstConsts.readObjectWithSize( dis );

if( throwable instanceof RemoteInvocationException riex ) {
errorMetrics.increment();
return async ? CompletableFuture.<Result<Object, Throwable>>failedStage( throwable ) : CompletableFuture.completedStage( Result.failure( throwable ) );
} finally {
dis.close();
return Result.failure( riex );
}

errorMetrics.increment();
return Result.failure( throwable );
} finally {
dis.close();
}
} else {
boolean stream = dis.readBoolean();
if( stream ) {
ChainIterator it = new ChainIterator( dis );

return Result.success( Stream.of( it ).onClose( Try.run( () -> {
dis.close();
successMetrics.increment();
} ) ) );
} else {
boolean stream = dis.readBoolean();
if( stream ) {
ChainIterator it = new ChainIterator( dis );

return CompletableFuture.completedStage( Result.success( Stream.of( it ).onClose( Try.run( () -> {
dis.close();
successMetrics.increment();
} ) ) ) );
} else {
try {
Result<Object, Throwable> r = Result.success( FstConsts.readObjectWithSize( dis ) );
successMetrics.increment();
return CompletableFuture.completedStage( r );
} finally {
dis.close();
}
try {
Result<Object, Throwable> r = Result.success( FstConsts.readObjectWithSize( dis ) );
successMetrics.increment();
return r;
} finally {
dis.close();
}
}
} catch( Exception e ) {
dis.close();
return retException( e, async );
}
} else {
RemoteInvocationException ex = new RemoteInvocationException( "invocation failed " + this + "#" + service + "@" + method.getName()
+ " code " + response.code()
+ " body '" + response.body().string() + "'"
+ " message '" + response.message() + "'" );

return retException( ex, async );
} catch( Exception e ) {
dis.close();
return Result.failure( e );
}
} catch( Throwable e ) {
return retException( e, async );
}
} );
} else {
RemoteInvocationException ex = new RemoteInvocationException( "invocation failed " + this + "#" + service + "@" + method.getName()
+ " code " + response.getStatus()
+ " body '" + new String( inputStreamResponseListener.getInputStream().readAllBytes(), StandardCharsets.UTF_8 ) + "'"
+ " message '" + response.getReason() + "'" );

ret.whenComplete( ( _, ex ) -> {
if( ex != null ) {
checkException( ex );
return Result.failure( ex );
}
} );

if( async ) {
return Result.success( ret.thenApply( r -> r.successValue ) );
} else {
return ret.get();
}
} catch( Exception e ) {
if( async ) {
return Result.success( CompletableFuture.failedFuture( e ) );
} catch( InterruptedException | TimeoutException e ) {
timeoutMetrics.increment();

return Result.failure( e );
} catch( ExecutionException e ) {
return Result.failure( e.getCause() );
} catch( Throwable e ) {
return Result.failure( e );
}

} catch( Exception e ) {
return Result.failure( e );
}
}

private void checkException( Throwable ex ) {
if( ex instanceof RemoteInvocationException riex ) {
checkException( riex.getCause() );
return;
}

if( ex instanceof HttpTimeoutException || ex instanceof TimeoutException ) {
timeoutMetrics.increment();
}
}

@SneakyThrows
private byte[] getInvocation( Method method, List<RemoteInvocation.Argument> arguments ) {

Expand Down
6 changes: 6 additions & 0 deletions oap-http/oap-http-prometheus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>oap</groupId>
<artifactId>oap-http-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Loading