Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@

import static org.assertj.core.api.Assertions.assertThat;

public class ForyTest {
public class KryoTest {
@Test
public void optional() {
assertThat( ForyConsts.fory.deserialize( ForyConsts.fory.serialize( Optional.empty() ) ) ).isEqualTo( Optional.empty() );
assertThat( ForyConsts.fory.deserialize( ForyConsts.fory.serialize( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) );
assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.empty() ) ) ).isEqualTo( Optional.empty() );
assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) );
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package oap.application.remote;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import org.objenesis.strategy.SerializingInstantiatorStrategy;

import java.io.ByteArrayOutputStream;

@SuppressWarnings( "checkstyle:AbstractClassName" )
public abstract class KryoConsts {
public static Kryo kryo;

static {
kryo = new Kryo();
kryo.setRegistrationRequired( false );
kryo.setReferences( true );
kryo.setInstantiatorStrategy( new DefaultInstantiatorStrategy( new SerializingInstantiatorStrategy() ) );

}

public static byte[] writeClassAndObject( Object obj ) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try( Output output = new Output( byteArrayOutputStream ) ) {
kryo.writeClassAndObject( output, obj );
}

return byteArrayOutputStream.toByteArray();
}

public static Object readClassAndObject( byte[] bytes ) {
Input input = new Input( bytes );

return kryo.readClassAndObject( input );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,18 @@
*/
package oap.application.remote;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import oap.http.server.nio.HttpHandler;
import oap.http.server.nio.HttpServerExchange;
import oap.http.server.nio.NioHttpServer;
import oap.util.function.Try;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.fory.io.ForyInputStream;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -81,7 +78,7 @@ public Remote( String context, RemoteServices services, NioHttpServer server, St
}

public void start() {
log.info( "context {} MIN_POOL_SIZE {} MAX_POOL_SIZE {}", context, ForyConsts.MIN_POOL_SIZE, ForyConsts.MAX_POOL_SIZE );
log.info( "context {}", context );
}

@Override
Expand Down Expand Up @@ -146,23 +143,22 @@ public void handleRequest( HttpServerExchange exchange ) {
exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM );

try( OutputStream outputStream = exchange.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream( outputStream );
DataOutputStream dos = new DataOutputStream( bos ) ) {
dos.writeBoolean( ex == null );
Output out = new Output( outputStream ) ) {
out.writeBoolean( ex == null );

if( ex != null ) {
ForyConsts.fory.serialize( dos, ex );
KryoConsts.kryo.writeClassAndObject( out, ex );
} else if( v instanceof Stream<?> ) {
dos.writeBoolean( true );
out.writeBoolean( true );

( ( Stream<?> ) v ).forEach( Try.consume( obj -> {
dos.write( 1 );
ForyConsts.fory.serialize( dos, obj );
out.writeBoolean( true );
KryoConsts.kryo.writeClassAndObject( out, obj );
} ) );
dos.write( 0 );
out.writeBoolean( false );
} else {
dos.writeBoolean( false );
ForyConsts.fory.serialize( dos, v );
out.writeBoolean( false );
KryoConsts.kryo.writeClassAndObject( out, v );
}
} catch( Throwable e ) {
log.error( "invocation {}", finalInvocation, e );
Expand All @@ -179,12 +175,11 @@ public void handleRequest( HttpServerExchange exchange ) {
}
}

@SneakyThrows
public RemoteInvocation getRemoteInvocation( InputStream body ) {
DataInputStream dis = new DataInputStream( body );
int version = dis.readInt();
Input in = new Input( body );
int version = in.readInt();

RemoteInvocation invocation = ( RemoteInvocation ) ForyConsts.fory.deserialize( new ForyInputStream( dis ) );
RemoteInvocation invocation = ( RemoteInvocation ) KryoConsts.kryo.readClassAndObject( in );
log.trace( "invoke v{} - {}", version, invocation );
return invocation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
package oap.application.remote;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import io.micrometer.core.instrument.Counter;
Expand All @@ -42,13 +44,10 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.fory.io.ForyInputStream;
import org.jetbrains.annotations.NotNull;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -202,14 +201,13 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) {
if( response.code() == HTTP_OK ) {
InputStream inputStream = response.body().byteStream();
BufferedInputStream bis = new BufferedInputStream( inputStream );
ForyInputStream foryInputStream = new ForyInputStream( bis );
DataInputStream dis = new DataInputStream( foryInputStream );
boolean success = dis.readBoolean();
Input in = new Input( bis );
boolean success = in.readBoolean();

try {
if( !success ) {
try {
Throwable throwable = ( Throwable ) ForyConsts.fory.deserialize( foryInputStream );
Throwable throwable = ( Throwable ) KryoConsts.kryo.readClassAndObject( in );

if( throwable instanceof RemoteInvocationException riex ) {
errorMetrics.increment();
Expand All @@ -219,29 +217,29 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) {
errorMetrics.increment();
return async ? CompletableFuture.<Result<Object, Throwable>>failedStage( throwable ) : CompletableFuture.completedStage( Result.failure( throwable ) );
} finally {
dis.close();
in.close();
}
} else {
boolean stream = dis.readBoolean();
boolean stream = in.readBoolean();
if( stream ) {
ChainIterator it = new ChainIterator( foryInputStream );
ChainIterator it = new ChainIterator( in );

return CompletableFuture.completedStage( Result.success( Stream.of( it ).onClose( Try.run( () -> {
dis.close();
in.close();
successMetrics.increment();
} ) ) ) );
} else {
try {
Result<Object, Throwable> r = Result.success( ForyConsts.fory.deserialize( foryInputStream ) );
Result<Object, Throwable> r = Result.success( KryoConsts.kryo.readClassAndObject( in ) );
successMetrics.increment();
return CompletableFuture.completedStage( r );
} finally {
dis.close();
in.close();
}
}
}
} catch( Exception e ) {
dis.close();
in.close();
return retException( e, async );
}
} else {
Expand Down Expand Up @@ -294,10 +292,10 @@ private byte[] getInvocation( Method method, List<RemoteInvocation.Argument> arg
Reference reference = ServiceKernelCommand.INSTANCE.reference( service, null );

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( baos );
dos.writeInt( RemoteInvocation.VERSION );
ForyConsts.fory.serialize( dos, new RemoteInvocation( reference.toString(), method.getName(), arguments ) );
baos.close();
try( Output out = new Output( baos ) ) {
out.writeInt( RemoteInvocation.VERSION );
KryoConsts.kryo.writeClassAndObject( out, new RemoteInvocation( reference.toString(), method.getName(), arguments ) );
}

return baos.toByteArray();
}
Expand All @@ -307,13 +305,13 @@ public String toString() {
return "source:" + source + " -> remote:" + service + "@" + uri;
}

private class ChainIterator implements Iterator<Object> {
private final ForyInputStream dis;
private static class ChainIterator implements Iterator<Object> {
private final Input in;
private Object obj;
private boolean end;

ChainIterator( ForyInputStream dis ) {
this.dis = dis;
ChainIterator( Input dis ) {
this.in = dis;
obj = null;
end = false;
}
Expand All @@ -325,13 +323,13 @@ public boolean hasNext() {

if( obj != null ) return true;

int b = dis.read();
if( b == 1 ) {
obj = ForyConsts.fory.deserialize( dis );
boolean b = in.readBoolean();
if( b ) {
obj = KryoConsts.kryo.readClassAndObject( in );
} else {
end = true;
obj = null;
dis.close();
in.close();
}

return obj != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ public Object invoke( Object proxy, Method method, Object[] args ) throws Throwa
parameters[i].getType(), args[i] ) );
}

final byte[] content = ForyConsts.fory.serialize( new RemoteInvocation( "service", method.getName(), arguments ) );
RemoteInvocation ri = ( RemoteInvocation ) ForyConsts.fory.deserialize( content );
byte[] content = KryoConsts.writeClassAndObject( new RemoteInvocation( "service", method.getName(), arguments ) );
RemoteInvocation ri = ( RemoteInvocation ) KryoConsts.readClassAndObject( content );

var result = master.getClass()
Object result = master.getClass()
.getMethod( ri.method, ri.types() )
.invoke( master, ri.values() );


byte[] resultContent = ForyConsts.fory.serialize( result );
content = KryoConsts.writeClassAndObject( result );

return ForyConsts.fory.deserialize( resultContent );
return KryoConsts.readClassAndObject( content );
}
}
7 changes: 3 additions & 4 deletions oap-stdlib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,10 @@


<dependency>
<groupId>org.apache.fory</groupId>
<artifactId>fory-core</artifactId>
<version>${oap.deps.apache.fory.version}</version>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${oap.deps.kryo.version}</version>
</dependency>

<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
</distributionManagement>

<properties>
<oap.project.version>25.0.8</oap.project.version>
<oap.project.version>25.0.10</oap.project.version>

<oap.deps.config.version>22.0.1</oap.deps.config.version>
<oap.deps.oap-teamcity.version>25.0.0</oap.deps.oap-teamcity.version>
Expand All @@ -82,7 +82,7 @@

<oap.deps.okhttp.version>5.1.0</oap.deps.okhttp.version>

<oap.deps.apache.fory.version>0.13.1</oap.deps.apache.fory.version>
<oap.deps.kryo.version>5.6.2</oap.deps.kryo.version>

<oap.deps.apache.httpcomponents.version>4.4.16</oap.deps.apache.httpcomponents.version>
<oap.deps.apache.httpclient.version>4.5.14</oap.deps.apache.httpclient.version>
Expand Down