diff --git a/oap-application/oap-application-test/src/test/java/oap/application/remote/ForyTest.java b/oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java similarity index 81% rename from oap-application/oap-application-test/src/test/java/oap/application/remote/ForyTest.java rename to oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java index 88cf11d32..2b838ae84 100644 --- a/oap-application/oap-application-test/src/test/java/oap/application/remote/ForyTest.java +++ b/oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java @@ -30,10 +30,10 @@ import static org.assertj.core.api.Assertions.assertThat; -public class ForyTest { +public class KryoTest { @Test public void optional() { - assertThat( ForyConsts.fory.deserialize( ForyConsts.fory.serialize( Optional.empty() ) ) ).isEqualTo( Optional.empty() ); - assertThat( ForyConsts.fory.deserialize( ForyConsts.fory.serialize( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) ); + assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.empty() ) ) ).isEqualTo( Optional.empty() ); + assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) ); } } diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/ForyConsts.java b/oap-application/oap-application/src/main/java/oap/application/remote/ForyConsts.java deleted file mode 100644 index 156a62d52..000000000 --- a/oap-application/oap-application/src/main/java/oap/application/remote/ForyConsts.java +++ /dev/null @@ -1,29 +0,0 @@ -package oap.application.remote; - -import oap.system.Env; -import org.apache.fory.Fory; -import org.apache.fory.ThreadSafeFory; -import org.apache.fory.config.CompatibleMode; - -@SuppressWarnings( "checkstyle:AbstractClassName" ) -public abstract class ForyConsts { - public static final int MIN_POOL_SIZE = Integer.parseInt( Env.get( "FORY_MIN_POOL_SIZE", "32" ) ); - public static final int MAX_POOL_SIZE = Integer.parseInt( Env.get( "FORY_MAX_POOL_SIZE", "128" ) ); - - public static ThreadSafeFory fory; - - static { - fory = Fory - .builder() - .withCompatibleMode( CompatibleMode.COMPATIBLE ) - .requireClassRegistration( false ) - .withRefTracking( true ) - .serializeEnumByName( true ) - .registerGuavaTypes( true ) - - // buggy - .withCodegen( false ) - - .buildThreadSafeForyPool( ForyConsts.MIN_POOL_SIZE, ForyConsts.MAX_POOL_SIZE ); - } -} diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java b/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java new file mode 100644 index 000000000..549cdc92b --- /dev/null +++ b/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java @@ -0,0 +1,37 @@ +package oap.application.remote; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; +import org.objenesis.strategy.SerializingInstantiatorStrategy; + +import java.io.ByteArrayOutputStream; + +@SuppressWarnings( "checkstyle:AbstractClassName" ) +public abstract class KryoConsts { + public static Kryo kryo; + + static { + kryo = new Kryo(); + kryo.setRegistrationRequired( false ); + kryo.setReferences( true ); + kryo.setInstantiatorStrategy( new DefaultInstantiatorStrategy( new SerializingInstantiatorStrategy() ) ); + + } + + public static byte[] writeClassAndObject( Object obj ) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try( Output output = new Output( byteArrayOutputStream ) ) { + kryo.writeClassAndObject( output, obj ); + } + + return byteArrayOutputStream.toByteArray(); + } + + public static Object readClassAndObject( byte[] bytes ) { + Input input = new Input( bytes ); + + return kryo.readClassAndObject( input ); + } +} diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java b/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java index d6be44e34..79281960e 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java @@ -23,21 +23,18 @@ */ package oap.application.remote; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import oap.http.server.nio.HttpHandler; import oap.http.server.nio.HttpServerExchange; import oap.http.server.nio.NioHttpServer; import oap.util.function.Try; import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.fory.io.ForyInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; @@ -81,7 +78,7 @@ public Remote( String context, RemoteServices services, NioHttpServer server, St } public void start() { - log.info( "context {} MIN_POOL_SIZE {} MAX_POOL_SIZE {}", context, ForyConsts.MIN_POOL_SIZE, ForyConsts.MAX_POOL_SIZE ); + log.info( "context {}", context ); } @Override @@ -146,23 +143,22 @@ public void handleRequest( HttpServerExchange exchange ) { exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM ); try( OutputStream outputStream = exchange.getOutputStream(); - BufferedOutputStream bos = new BufferedOutputStream( outputStream ); - DataOutputStream dos = new DataOutputStream( bos ) ) { - dos.writeBoolean( ex == null ); + Output out = new Output( outputStream ) ) { + out.writeBoolean( ex == null ); if( ex != null ) { - ForyConsts.fory.serialize( dos, ex ); + KryoConsts.kryo.writeClassAndObject( out, ex ); } else if( v instanceof Stream ) { - dos.writeBoolean( true ); + out.writeBoolean( true ); ( ( Stream ) v ).forEach( Try.consume( obj -> { - dos.write( 1 ); - ForyConsts.fory.serialize( dos, obj ); + out.writeBoolean( true ); + KryoConsts.kryo.writeClassAndObject( out, obj ); } ) ); - dos.write( 0 ); + out.writeBoolean( false ); } else { - dos.writeBoolean( false ); - ForyConsts.fory.serialize( dos, v ); + out.writeBoolean( false ); + KryoConsts.kryo.writeClassAndObject( out, v ); } } catch( Throwable e ) { log.error( "invocation {}", finalInvocation, e ); @@ -179,12 +175,11 @@ public void handleRequest( HttpServerExchange exchange ) { } } - @SneakyThrows public RemoteInvocation getRemoteInvocation( InputStream body ) { - DataInputStream dis = new DataInputStream( body ); - int version = dis.readInt(); + Input in = new Input( body ); + int version = in.readInt(); - RemoteInvocation invocation = ( RemoteInvocation ) ForyConsts.fory.deserialize( new ForyInputStream( dis ) ); + RemoteInvocation invocation = ( RemoteInvocation ) KryoConsts.kryo.readClassAndObject( in ); log.trace( "invoke v{} - {}", version, invocation ); return invocation; } diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java index 5c03f94c7..04f3d46cd 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java @@ -23,6 +23,8 @@ */ package oap.application.remote; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SimpleTimeLimiter; import io.micrometer.core.instrument.Counter; @@ -42,13 +44,10 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; -import org.apache.fory.io.ForyInputStream; import org.jetbrains.annotations.NotNull; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; @@ -202,14 +201,13 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) { if( response.code() == HTTP_OK ) { InputStream inputStream = response.body().byteStream(); BufferedInputStream bis = new BufferedInputStream( inputStream ); - ForyInputStream foryInputStream = new ForyInputStream( bis ); - DataInputStream dis = new DataInputStream( foryInputStream ); - boolean success = dis.readBoolean(); + Input in = new Input( bis ); + boolean success = in.readBoolean(); try { if( !success ) { try { - Throwable throwable = ( Throwable ) ForyConsts.fory.deserialize( foryInputStream ); + Throwable throwable = ( Throwable ) KryoConsts.kryo.readClassAndObject( in ); if( throwable instanceof RemoteInvocationException riex ) { errorMetrics.increment(); @@ -219,29 +217,29 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) { errorMetrics.increment(); return async ? CompletableFuture.>failedStage( throwable ) : CompletableFuture.completedStage( Result.failure( throwable ) ); } finally { - dis.close(); + in.close(); } } else { - boolean stream = dis.readBoolean(); + boolean stream = in.readBoolean(); if( stream ) { - ChainIterator it = new ChainIterator( foryInputStream ); + ChainIterator it = new ChainIterator( in ); return CompletableFuture.completedStage( Result.success( Stream.of( it ).onClose( Try.run( () -> { - dis.close(); + in.close(); successMetrics.increment(); } ) ) ) ); } else { try { - Result r = Result.success( ForyConsts.fory.deserialize( foryInputStream ) ); + Result r = Result.success( KryoConsts.kryo.readClassAndObject( in ) ); successMetrics.increment(); return CompletableFuture.completedStage( r ); } finally { - dis.close(); + in.close(); } } } } catch( Exception e ) { - dis.close(); + in.close(); return retException( e, async ); } } else { @@ -294,10 +292,10 @@ private byte[] getInvocation( Method method, List arg Reference reference = ServiceKernelCommand.INSTANCE.reference( service, null ); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream( baos ); - dos.writeInt( RemoteInvocation.VERSION ); - ForyConsts.fory.serialize( dos, new RemoteInvocation( reference.toString(), method.getName(), arguments ) ); - baos.close(); + try( Output out = new Output( baos ) ) { + out.writeInt( RemoteInvocation.VERSION ); + KryoConsts.kryo.writeClassAndObject( out, new RemoteInvocation( reference.toString(), method.getName(), arguments ) ); + } return baos.toByteArray(); } @@ -307,13 +305,13 @@ public String toString() { return "source:" + source + " -> remote:" + service + "@" + uri; } - private class ChainIterator implements Iterator { - private final ForyInputStream dis; + private static class ChainIterator implements Iterator { + private final Input in; private Object obj; private boolean end; - ChainIterator( ForyInputStream dis ) { - this.dis = dis; + ChainIterator( Input dis ) { + this.in = dis; obj = null; end = false; } @@ -325,13 +323,13 @@ public boolean hasNext() { if( obj != null ) return true; - int b = dis.read(); - if( b == 1 ) { - obj = ForyConsts.fory.deserialize( dis ); + boolean b = in.readBoolean(); + if( b ) { + obj = KryoConsts.kryo.readClassAndObject( in ); } else { end = true; obj = null; - dis.close(); + in.close(); } return obj != null; diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java index 35f211eb9..c7b3890da 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java @@ -62,16 +62,16 @@ public Object invoke( Object proxy, Method method, Object[] args ) throws Throwa parameters[i].getType(), args[i] ) ); } - final byte[] content = ForyConsts.fory.serialize( new RemoteInvocation( "service", method.getName(), arguments ) ); - RemoteInvocation ri = ( RemoteInvocation ) ForyConsts.fory.deserialize( content ); + byte[] content = KryoConsts.writeClassAndObject( new RemoteInvocation( "service", method.getName(), arguments ) ); + RemoteInvocation ri = ( RemoteInvocation ) KryoConsts.readClassAndObject( content ); - var result = master.getClass() + Object result = master.getClass() .getMethod( ri.method, ri.types() ) .invoke( master, ri.values() ); - byte[] resultContent = ForyConsts.fory.serialize( result ); + content = KryoConsts.writeClassAndObject( result ); - return ForyConsts.fory.deserialize( resultContent ); + return KryoConsts.readClassAndObject( content ); } } diff --git a/oap-stdlib/pom.xml b/oap-stdlib/pom.xml index 15eee6805..fd0abb7f2 100644 --- a/oap-stdlib/pom.xml +++ b/oap-stdlib/pom.xml @@ -271,11 +271,10 @@ - org.apache.fory - fory-core - ${oap.deps.apache.fory.version} + com.esotericsoftware + kryo + ${oap.deps.kryo.version} - javax.activation activation diff --git a/pom.xml b/pom.xml index 88659460c..302cbdb39 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ - 25.0.8 + 25.0.10 22.0.1 25.0.0 @@ -82,7 +82,7 @@ 5.1.0 - 0.13.1 + 5.6.2 4.4.16 4.5.14