From bb565894ac54c36d8302a7051ca1bd81432e1952 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Thu, 19 Feb 2026 19:48:08 +0200 Subject: [PATCH 1/2] CE-136 logstream: compress --- .../net/client/SocketLoggerBackend.java | 4 +- .../net/server/SocketLoggerServer.java | 34 ++--- .../src/main/java/oap/logstream/LogId.java | 15 +- .../main/java/oap/logstream/net/Buffer.java | 63 +++++--- .../main/java/oap/logstream/net/Buffers.java | 138 ++++++++++++------ pom.xml | 2 +- 6 files changed, 153 insertions(+), 103 deletions(-) diff --git a/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java index d0008a343a..0336b93eec 100644 --- a/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java @@ -38,6 +38,7 @@ import oap.logstream.net.Buffers; import oap.message.client.MessageAvailabilityReport; import oap.message.client.MessageSender; +import oap.util.FastByteArrayOutputStream; import java.util.HashMap; import java.util.Map; @@ -96,7 +97,8 @@ private boolean sendAsync( boolean shutdown ) { if( shutdown || !closed ) { buffers.forEachReadyData( b -> { log.trace( "Sending {}", b ); - sender.send( MESSAGE_TYPE, ( short ) b.protocolVersion.version, b.data(), 0, b.length() ); + FastByteArrayOutputStream data = b.compress(); + sender.send( MESSAGE_TYPE, ( short ) b.protocolVersion.version, data.array, 0, data.length ); } ); log.trace( "Data sent to server" ); return true; diff --git a/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java b/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java index 338a1e7025..bab91c1057 100644 --- a/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java +++ b/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java @@ -23,26 +23,23 @@ */ package oap.logstream.net.server; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; import lombok.extern.slf4j.Slf4j; -import oap.io.content.ContentReader; import oap.logstream.AbstractLoggerBackend; import oap.logstream.LogStreamProtocol; import oap.logstream.LogStreamProtocol.ProtocolVersion; import oap.logstream.LoggerException; -import oap.logstream.formats.rowbinary.RowBinaryUtils; import oap.message.server.MessageListener; -import oap.template.BinaryUtils; -import oap.tsv.Tsv; +import org.apache.commons.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; +import java.util.zip.GZIPInputStream; import static oap.logstream.LogStreamProtocol.MESSAGE_TYPE; @@ -119,26 +116,15 @@ private void readBinaryV3( ProtocolVersion version, String hostName, DataInputSt properties.put( in.readUTF(), in.readUTF() ); } - byte[] buffer = new byte[length]; - in.readFully( buffer, 0, length ); + byte[] compressedBuffer = new byte[length]; + in.readFully( compressedBuffer, 0, length ); - if( log.isTraceEnabled() ) { - List> lines = new ArrayList<>(); - switch( version ) { - case TSV_V1 -> ContentReader.read( buffer, Tsv.tsv.ofSeparatedValues() ).toList() - .forEach( line -> lines.add( Collections.singletonList( line ) ) ); - case BINARY_V2 -> lines.addAll( BinaryUtils.read( buffer ) ); - case ROW_BINARY_V3 -> lines.addAll( RowBinaryUtils.read( buffer, headers, types ) ); - } - - lines.forEach( line -> - log.trace( "[{}] logging (properties {} filePreffix {} logType {} headers {} types {}, length {}, line {})", - hostName, properties, filePreffix, logType, headers, types, length, line - ) - ); + FastByteArrayOutputStream buffer = new FastByteArrayOutputStream( length ); + try( GZIPInputStream gzip = new GZIPInputStream( new FastByteArrayInputStream( compressedBuffer ) ) ) { + IOUtils.copy( gzip, buffer ); } - backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer, 0, length ); + backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer.array, 0, buffer.length ); } @Override diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogId.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogId.java index fc77cb6e77..b2cbbe199c 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogId.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogId.java @@ -34,7 +34,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; @@ -72,19 +71,9 @@ public LogId( String filePrefixPattern, String logType, String clientHostname, public int getHash() { Hasher hasher = Hashing.murmur3_32_fixed().newHasher(); - for( var header : headers ) hasher.putString( header, UTF_8 ); - for( var type : types ) hasher.putBytes( type ); + for( String header : headers ) hasher.putString( header, UTF_8 ); + for( byte[] type : types ) hasher.putBytes( type ); return hasher.hash().asInt(); } - - public final String lock() { - return ( String.join( "-", properties.values() ) - + String.join( "-", List.of( - filePrefixPattern, - logType, - Arrays.deepToString( headers ), - Arrays.deepToString( types ) - ) ) ).intern(); - } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java index be2b31f1b3..e04813cd85 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java @@ -23,16 +23,23 @@ */ package oap.logstream.net; +import lombok.SneakyThrows; import oap.logstream.LogId; import oap.logstream.LogStreamProtocol.ProtocolVersion; +import oap.util.FastByteArrayOutputStream; import java.io.Serializable; +import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.GZIPOutputStream; public class Buffer implements Serializable { + public static final int DIGESTION_POSITION = 0; + public static final int DATE_LENGTH_POSITION = 8; public final LogId id; public final ProtocolVersion protocolVersion; + public final ReentrantLock lock = new ReentrantLock(); private final byte[] data; - private int position = 0; + private int position = DIGESTION_POSITION; private volatile boolean closed = false; private int dataStart; @@ -44,9 +51,9 @@ public class Buffer implements Serializable { } private void initMetadata( LogId id ) { - if( position != 0 ) throw new IllegalStateException( "metadata could be set for empty buffer only!" ); - var result = putLong( 0 ); //reserved for digestion control - result &= putInt( 0 ); //reserved for data length + if( position != DIGESTION_POSITION ) throw new IllegalStateException( "metadata could be set for empty buffer only!" ); + boolean result = putLong( DIGESTION_POSITION ); //reserved for digestion control + result &= putInt( DIGESTION_POSITION ); //reserved for data length result &= putUTF( id.filePrefixPattern ); result &= putUTF( id.logType ); result &= putUTF( id.clientHostname ); @@ -54,7 +61,7 @@ private void initMetadata( LogId id ) { for( var header : id.headers ) result &= putUTF( header ); - for( var type : id.types ) { + for( byte[] type : id.types ) { result &= putByte( ( byte ) type.length ); for( var t : type ) { result &= putByte( t ); @@ -71,7 +78,7 @@ private void initMetadata( LogId id ) { } public final boolean put( byte[] buf ) { - return put( buf, 0, buf.length ); + return put( buf, DIGESTION_POSITION, buf.length ); } public final boolean put( byte[] buf, int offset, int length ) { @@ -94,7 +101,7 @@ private byte[] encodeInt( int i ) { return new byte[] { ( byte ) ( ( i >>> 24 ) & 0xFF ), ( byte ) ( ( i >>> 16 ) & 0xFF ), - ( byte ) ( ( i >>> 8 ) & 0xFF ), + ( byte ) ( ( i >>> DATE_LENGTH_POSITION ) & 0xFF ), ( byte ) ( i & 0xFF ) }; } @@ -111,7 +118,7 @@ private byte[] encodeLong( long v ) { ( byte ) ( v >>> 32 ), ( byte ) ( v >>> 24 ), ( byte ) ( v >>> 16 ), - ( byte ) ( v >>> 8 ), + ( byte ) ( v >>> DATE_LENGTH_POSITION ), ( byte ) v }; } @@ -119,11 +126,11 @@ private byte[] encodeLong( long v ) { @SuppressWarnings( "checkstyle:UnnecessaryParentheses" ) public final boolean putUTF( String str ) { int strlen = str.length(); - int utflen = 0; - int c, count = 0; + int utflen = DIGESTION_POSITION; + int c, count = DIGESTION_POSITION; /* use charAt instead of copying String to char array */ - for( int i = 0; i < strlen; i++ ) { + for( int i = DIGESTION_POSITION; i < strlen; i++ ) { c = str.charAt( i ); if( ( c >= 0x0001 ) && ( c <= 0x007F ) ) utflen++; else if( c > 0x07FF ) utflen += 3; @@ -134,11 +141,11 @@ public final boolean putUTF( String str ) { byte[] buffer = new byte[utflen + 2]; - buffer[count++] = ( byte ) ( ( utflen >>> 8 ) & 0xFF ); + buffer[count++] = ( byte ) ( ( utflen >>> DATE_LENGTH_POSITION ) & 0xFF ); buffer[count++] = ( byte ) ( utflen & 0xFF ); int i; - for( i = 0; i < strlen; i++ ) { + for( i = DIGESTION_POSITION; i < strlen; i++ ) { c = str.charAt( i ); if( !( ( c >= 0x0001 ) && ( c <= 0x007F ) ) ) break; buffer[count++] = ( byte ) c; @@ -157,7 +164,7 @@ public final boolean putUTF( String str ) { buffer[count++] = ( byte ) ( 0x80 | ( c & 0x3F ) ); } } - return put( buffer, 0, utflen + 2 ); + return put( buffer, DIGESTION_POSITION, utflen + 2 ); } public final boolean available( int length ) { @@ -170,12 +177,12 @@ public final byte[] data() { public final void reset( LogId id ) { this.closed = false; - this.position = 0; + this.position = DIGESTION_POSITION; initMetadata( id ); } public final boolean isEmpty() { - return dataLength() == 0; + return dataLength() == DIGESTION_POSITION; } public final int length() { @@ -185,9 +192,10 @@ public final int length() { public final void close( long digestionId ) { this.closed = true; byte[] digestion = encodeLong( digestionId ); + byte[] length = encodeInt( dataLength() ); - System.arraycopy( digestion, 0, this.data, 0, digestion.length ); - System.arraycopy( length, 0, this.data, 8, length.length ); + System.arraycopy( digestion, DIGESTION_POSITION, this.data, DIGESTION_POSITION, digestion.length ); + System.arraycopy( length, DIGESTION_POSITION, this.data, DATE_LENGTH_POSITION, length.length ); } public final int dataLength() { @@ -202,4 +210,23 @@ public final int headerLength() { public final String toString() { return id + "," + position; } + + @SneakyThrows + public FastByteArrayOutputStream compress() { + FastByteArrayOutputStream fastByteArrayOutputStream = new FastByteArrayOutputStream( length() ); + + fastByteArrayOutputStream.write( data, DIGESTION_POSITION, dataStart ); + + try( GZIPOutputStream gzipOutputStream = new GZIPOutputStream( fastByteArrayOutputStream ) ) { + gzipOutputStream.write( data, dataStart, dataLength() ); + } + + int newDataLength = fastByteArrayOutputStream.length - dataStart; + + byte[] bytes = encodeInt( newDataLength ); + + System.arraycopy( bytes, DIGESTION_POSITION, fastByteArrayOutputStream.array, DATE_LENGTH_POSITION, bytes.length ); + + return fastByteArrayOutputStream; + } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java index 78e74a113f..a281e5ab21 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java @@ -44,6 +44,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @EqualsAndHashCode( exclude = "closed" ) @@ -51,8 +52,9 @@ @Slf4j public class Buffers implements Closeable { + public final ReentrantLock lock = new ReentrantLock(); // private final int bufferSize; - private final ConcurrentHashMap currentBuffers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap currentBuffers = new ConcurrentHashMap<>(); private final ConcurrentHashMap configurationForSelector = new ConcurrentHashMap<>(); private final BufferConfigurationMap configurations; public BufferCache cache; @@ -71,20 +73,30 @@ public final void put( LogId key, ProtocolVersion protocolVersion, byte[] buffer public final void put( LogId id, ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { if( closed ) throw new IllegalStateException( "current buffer is already closed" ); - var conf = configurationForSelector.computeIfAbsent( id, this::findConfiguration ); + BufferConfiguration conf = configurationForSelector.computeIfAbsent( id, this::findConfiguration ); + int bufferSize = conf.bufferSize; - var bufferSize = conf.bufferSize; - var intern = id.lock(); - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized( intern ) { - var b = currentBuffers.computeIfAbsent( intern, k -> cache.get( id, protocolVersion, bufferSize ) ); + Buffer b = currentBuffers.computeIfAbsent( id, k -> cache.get( id, protocolVersion, bufferSize ) ); + b.lock.lock(); + try { if( bufferSize - b.headerLength() < length ) throw new IllegalArgumentException( "buffer size is too big: " + length + " for buffer of " + bufferSize + "; headers = " + b.headerLength() ); if( !b.available( length ) ) { readyBuffers.ready( b ); - currentBuffers.put( intern, b = cache.get( id, protocolVersion, bufferSize ) ); + + Buffer bb; + currentBuffers.put( id, bb = cache.get( id, protocolVersion, bufferSize ) ); + bb.lock.lock(); + try { + bb.put( buffer, offset, length ); + return; + } finally { + bb.lock.unlock(); + } } b.put( buffer, offset, length ); + } finally { + b.lock.unlock(); } } @@ -96,11 +108,15 @@ private BufferConfiguration findConfiguration( LogId id ) { } public void flush() { - for( var internSelector : currentBuffers.keySet() ) { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized( internSelector ) { - var buffer = currentBuffers.remove( internSelector ); - if( buffer != null && !buffer.isEmpty() ) readyBuffers.ready( buffer ); + for( LogId internSelector : currentBuffers.keySet() ) { + Buffer buffer = currentBuffers.remove( internSelector ); + if( buffer != null && !buffer.isEmpty() ) { + buffer.lock.lock(); + try { + readyBuffers.ready( buffer ); + } finally { + buffer.lock.unlock(); + } } } @@ -111,22 +127,32 @@ public final boolean isEmpty() { } @Override - public final synchronized void close() { - if( closed ) throw new IllegalStateException( "already closed" ); - flush(); - closed = true; + public final void close() { + lock.lock(); + try { + if( closed ) throw new IllegalStateException( "already closed" ); + flush(); + closed = true; + } finally { + lock.unlock(); + } } - public final synchronized void forEachReadyData( Consumer consumer ) { - flush(); - report(); - log.trace( "buffers to go {}", readyBuffers.size() ); - var iterator = readyBuffers.iterator(); - while( iterator.hasNext() ) { - var buffer = iterator.next(); - consumer.accept( buffer ); - iterator.remove(); - cache.release( buffer ); + public final void forEachReadyData( Consumer consumer ) { + lock.lock(); + try { + flush(); + report(); + log.trace( "buffers to go {}", readyBuffers.size() ); + Iterator iterator = readyBuffers.iterator(); + while( iterator.hasNext() ) { + Buffer buffer = iterator.next(); + consumer.accept( buffer ); + iterator.remove(); + cache.release( buffer ); + } + } finally { + lock.unlock(); } } @@ -136,11 +162,11 @@ public void report() { } private void report( Collection in, String ready ) { - var buffers = new ArrayList<>( in ); + ArrayList buffers = new ArrayList<>( in ); - var map = new HashMap(); - for( var buffer : buffers ) { - var logType = buffer.id.logType; + HashMap map = new HashMap(); + for( Buffer buffer : buffers ) { + String logType = buffer.id.logType; map.computeIfAbsent( logType, lt -> new MutableLong() ).increment(); } @@ -152,37 +178,57 @@ public final int readyBuffers() { } public static class BufferCache { - private final Map> cache = new HashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); - private synchronized Buffer get( LogId id, ProtocolVersion protocolVersion, int bufferSize ) { - var list = cache.computeIfAbsent( bufferSize, bs -> new LinkedList<>() ); + private final Map> cache = new HashMap<>(); - if( list.isEmpty() ) return new Buffer( bufferSize, id, protocolVersion ); - else { - var buffer = list.poll(); - buffer.reset( id ); - return buffer; + private Buffer get( LogId id, ProtocolVersion protocolVersion, int bufferSize ) { + lock.lock(); + try { + Queue list = cache.computeIfAbsent( bufferSize, bs -> new LinkedList<>() ); + + if( list.isEmpty() ) { + return new Buffer( bufferSize, id, protocolVersion ); + } else { + Buffer buffer = list.poll(); + buffer.reset( id ); + return buffer; + } + } finally { + lock.unlock(); } } - private synchronized void release( Buffer buffer ) { - var list = cache.get( buffer.length() ); - if( list != null ) list.offer( buffer ); + private void release( Buffer buffer ) { + lock.lock(); + try { + Queue list = cache.get( buffer.length() ); + if( list != null ) list.offer( buffer ); + } finally { + lock.unlock(); + } } public final int size( int bufferSize ) { - var list = cache.get( bufferSize ); + Queue list = cache.get( bufferSize ); return list != null ? list.size() : 0; } } static class ReadyQueue implements Serializable { static Cuid digestionIds = Cuid.UNIQUE; + + private final ReentrantLock lock = new ReentrantLock(); private final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue<>(); - public final synchronized void ready( Buffer buffer ) { - buffer.close( digestionIds.nextLong() ); - buffers.offer( buffer ); + public final void ready( Buffer buffer ) { + lock.lock(); + try { + buffer.close( digestionIds.nextLong() ); + buffers.offer( buffer ); + } finally { + lock.unlock(); + } } public final Iterator iterator() { diff --git a/pom.xml b/pom.xml index a7de2aedca..516605132a 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.4.1 + 25.4.2 25.0.1 25.0.0 From 5e8262f75b027d6f26973cdb1bc300ab188c029c Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Thu, 19 Feb 2026 21:47:30 +0200 Subject: [PATCH 2/2] CE-136 logstream: compress --- .../java/oap/logstream/net/BuffersTest.java | 12 +++--- .../main/java/oap/logstream/net/Buffer.java | 2 - .../main/java/oap/logstream/net/Buffers.java | 38 +++++++------------ .../main/java/oap/template/BinaryUtils.java | 10 ++--- 4 files changed, 25 insertions(+), 37 deletions(-) diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/net/BuffersTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/net/BuffersTest.java index d2b4ec0aef..55ed316a94 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/net/BuffersTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/net/BuffersTest.java @@ -98,17 +98,17 @@ public void foreach() { buffers.put( new LogId( "x/z", "", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 14, 15 } ); buffers.put( new LogId( "x/z", "", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 16 } ); - var expected = List.of( + List expected = List.of( buffer( BINARY_V2, header + 4, 1, new LogId( "x/y", "", "", Map.of(), HEADERS, TYPES ), new byte[] { 1, 2, 3 } ), buffer( BINARY_V2, header + 4, 2, new LogId( "x/y", "", "", Map.of(), HEADERS, TYPES ), new byte[] { 4, 5, 6 } ), buffer( BINARY_V2, header + 4, 3, new LogId( "x/z", "", "", Map.of(), HEADERS, TYPES ), new byte[] { 11, 12, 13 } ), - buffer( BINARY_V2, header + 4, 4, new LogId( "x/y", "", "", Map.of(), - HEADERS, TYPES ), new byte[] { 7, 8, 9 } ), - buffer( BINARY_V2, header + 4, 5, new LogId( "x/z", "", "", Map.of(), - HEADERS, TYPES ), new byte[] { 14, 15, 16 } ) + buffer( BINARY_V2, header + 4, 4, new LogId( "x/z", "", "", Map.of(), + HEADERS, TYPES ), new byte[] { 14, 15, 16 } ), + buffer( BINARY_V2, header + 4, 5, new LogId( "x/y", "", "", Map.of(), + HEADERS, TYPES ), new byte[] { 7, 8, 9 } ) ); assertReadyData( buffers, expected ); assertReadyData( buffers, Lists.empty() ); @@ -127,7 +127,7 @@ public void foreachPattern() { buffers.put( new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 3 } ); buffers.put( new LogId( "", "x/z", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 14, 15 } ); - var expected = List.of( + List expected = List.of( buffer( BINARY_V2, header + 2, 1, new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), new byte[] { 1, 2 } ), buffer( BINARY_V2, header + 4, 2, new LogId( "", "x/z", "", Map.of(), HEADERS, TYPES ), new byte[] { 11, 12, 13 } ), buffer( BINARY_V2, header + 4, 3, new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), new byte[] { 3 } ), diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java index e04813cd85..91dd50dd8d 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffer.java @@ -29,7 +29,6 @@ import oap.util.FastByteArrayOutputStream; import java.io.Serializable; -import java.util.concurrent.locks.ReentrantLock; import java.util.zip.GZIPOutputStream; public class Buffer implements Serializable { @@ -37,7 +36,6 @@ public class Buffer implements Serializable { public static final int DATE_LENGTH_POSITION = 8; public final LogId id; public final ProtocolVersion protocolVersion; - public final ReentrantLock lock = new ReentrantLock(); private final byte[] data; private int position = DIGESTION_POSITION; private volatile boolean closed = false; diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java index a281e5ab21..fd61fa9cff 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/net/Buffers.java @@ -40,7 +40,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; -import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -70,34 +69,30 @@ public final void put( LogId key, ProtocolVersion protocolVersion, byte[] buffer put( key, protocolVersion, buffer, 0, buffer.length ); } + @SuppressWarnings( "checkstyle:ParameterAssignment" ) public final void put( LogId id, ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { if( closed ) throw new IllegalStateException( "current buffer is already closed" ); BufferConfiguration conf = configurationForSelector.computeIfAbsent( id, this::findConfiguration ); int bufferSize = conf.bufferSize; - Buffer b = currentBuffers.computeIfAbsent( id, k -> cache.get( id, protocolVersion, bufferSize ) ); - b.lock.lock(); - try { + currentBuffers.compute( id, ( _, b ) -> { + if( b == null ) { + b = cache.get( id, protocolVersion, bufferSize ); + } + if( bufferSize - b.headerLength() < length ) throw new IllegalArgumentException( "buffer size is too big: " + length + " for buffer of " + bufferSize + "; headers = " + b.headerLength() ); + if( !b.available( length ) ) { readyBuffers.ready( b ); - - Buffer bb; - currentBuffers.put( id, bb = cache.get( id, protocolVersion, bufferSize ) ); - bb.lock.lock(); - try { - bb.put( buffer, offset, length ); - return; - } finally { - bb.lock.unlock(); - } + b = cache.get( id, protocolVersion, bufferSize ); } + b.put( buffer, offset, length ); - } finally { - b.lock.unlock(); - } + + return b; + } ); } private BufferConfiguration findConfiguration( LogId id ) { @@ -111,12 +106,7 @@ public void flush() { for( LogId internSelector : currentBuffers.keySet() ) { Buffer buffer = currentBuffers.remove( internSelector ); if( buffer != null && !buffer.isEmpty() ) { - buffer.lock.lock(); - try { - readyBuffers.ready( buffer ); - } finally { - buffer.lock.unlock(); - } + readyBuffers.ready( buffer ); } } @@ -180,7 +170,7 @@ public final int readyBuffers() { public static class BufferCache { private final ReentrantLock lock = new ReentrantLock(); - private final Map> cache = new HashMap<>(); + private final HashMap> cache = new HashMap<>(); private Buffer get( LogId id, ProtocolVersion protocolVersion, int bufferSize ) { lock.lock(); diff --git a/oap-formats/oap-template/src/main/java/oap/template/BinaryUtils.java b/oap-formats/oap-template/src/main/java/oap/template/BinaryUtils.java index 921bc812aa..a4f78a7bcc 100644 --- a/oap-formats/oap-template/src/main/java/oap/template/BinaryUtils.java +++ b/oap-formats/oap-template/src/main/java/oap/template/BinaryUtils.java @@ -39,7 +39,7 @@ public static byte[] line( List cols ) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputStream bos = new BinaryOutputStream( baos ); - for( var col : cols ) bos.writeObject( col ); + for( Object col : cols ) bos.writeObject( col ); baos.write( Types.EOL.id ); @@ -47,8 +47,8 @@ public static byte[] line( List cols ) throws IOException { } public static byte[] lines( List> rows ) throws IOException { - var baos = new ByteArrayOutputStream(); - for( var row : rows ) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for( List row : rows ) { baos.write( line( row ) ); } @@ -62,8 +62,8 @@ public static List> read( byte[] bytes ) throws IOException { public static List> read( byte[] bytes, int offset, int length ) throws IOException { BinaryInputStream binaryInputStream = new BinaryInputStream( new ByteArrayInputStream( bytes, offset, length ) ); Object obj = binaryInputStream.readObject(); - var line = new ArrayList(); - var res = new ArrayList>(); + ArrayList line = new ArrayList<>(); + ArrayList> res = new ArrayList<>(); while( obj != null ) { if( obj != BinaryInputStream.EOL ) line.add( obj ); else {