Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import oap.logstream.net.Buffers;
import oap.message.client.MessageAvailabilityReport;
import oap.message.client.MessageSender;
import oap.util.FastByteArrayOutputStream;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -96,7 +97,8 @@ private boolean sendAsync( boolean shutdown ) {
if( shutdown || !closed ) {
buffers.forEachReadyData( b -> {
log.trace( "Sending {}", b );
sender.send( MESSAGE_TYPE, ( short ) b.protocolVersion.version, b.data(), 0, b.length() );
FastByteArrayOutputStream data = b.compress();
sender.send( MESSAGE_TYPE, ( short ) b.protocolVersion.version, data.array, 0, data.length );
} );
log.trace( "Data sent to server" );
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,23 @@
*/
package oap.logstream.net.server;

import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import lombok.extern.slf4j.Slf4j;
import oap.io.content.ContentReader;
import oap.logstream.AbstractLoggerBackend;
import oap.logstream.LogStreamProtocol;
import oap.logstream.LogStreamProtocol.ProtocolVersion;
import oap.logstream.LoggerException;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.message.server.MessageListener;
import oap.template.BinaryUtils;
import oap.tsv.Tsv;
import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.zip.GZIPInputStream;

import static oap.logstream.LogStreamProtocol.MESSAGE_TYPE;

Expand Down Expand Up @@ -119,26 +116,15 @@ private void readBinaryV3( ProtocolVersion version, String hostName, DataInputSt
properties.put( in.readUTF(), in.readUTF() );
}

byte[] buffer = new byte[length];
in.readFully( buffer, 0, length );
byte[] compressedBuffer = new byte[length];
in.readFully( compressedBuffer, 0, length );

if( log.isTraceEnabled() ) {
List<List<Object>> lines = new ArrayList<>();
switch( version ) {
case TSV_V1 -> ContentReader.read( buffer, Tsv.tsv.ofSeparatedValues() ).toList()
.forEach( line -> lines.add( Collections.singletonList( line ) ) );
case BINARY_V2 -> lines.addAll( BinaryUtils.read( buffer ) );
case ROW_BINARY_V3 -> lines.addAll( RowBinaryUtils.read( buffer, headers, types ) );
}

lines.forEach( line ->
log.trace( "[{}] logging (properties {} filePreffix {} logType {} headers {} types {}, length {}, line {})",
hostName, properties, filePreffix, logType, headers, types, length, line
)
);
FastByteArrayOutputStream buffer = new FastByteArrayOutputStream( length );
try( GZIPInputStream gzip = new GZIPInputStream( new FastByteArrayInputStream( compressedBuffer ) ) ) {
IOUtils.copy( gzip, buffer );
}

backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer, 0, length );
backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer.array, 0, buffer.length );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ public void foreach() {
buffers.put( new LogId( "x/z", "", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 14, 15 } );
buffers.put( new LogId( "x/z", "", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 16 } );

var expected = List.of(
List<Buffer> expected = List.of(
buffer( BINARY_V2, header + 4, 1, new LogId( "x/y", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 1, 2, 3 } ),
buffer( BINARY_V2, header + 4, 2, new LogId( "x/y", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 4, 5, 6 } ),
buffer( BINARY_V2, header + 4, 3, new LogId( "x/z", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 11, 12, 13 } ),
buffer( BINARY_V2, header + 4, 4, new LogId( "x/y", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 7, 8, 9 } ),
buffer( BINARY_V2, header + 4, 5, new LogId( "x/z", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 14, 15, 16 } )
buffer( BINARY_V2, header + 4, 4, new LogId( "x/z", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 14, 15, 16 } ),
buffer( BINARY_V2, header + 4, 5, new LogId( "x/y", "", "", Map.of(),
HEADERS, TYPES ), new byte[] { 7, 8, 9 } )
);
assertReadyData( buffers, expected );
assertReadyData( buffers, Lists.empty() );
Expand All @@ -127,7 +127,7 @@ public void foreachPattern() {
buffers.put( new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 3 } );
buffers.put( new LogId( "", "x/z", "", Map.of(), HEADERS, TYPES ), BINARY_V2, new byte[] { 14, 15 } );

var expected = List.of(
List<Buffer> expected = List.of(
buffer( BINARY_V2, header + 2, 1, new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), new byte[] { 1, 2 } ),
buffer( BINARY_V2, header + 4, 2, new LogId( "", "x/z", "", Map.of(), HEADERS, TYPES ), new byte[] { 11, 12, 13 } ),
buffer( BINARY_V2, header + 4, 3, new LogId( "", "x/y", "", Map.of(), HEADERS, TYPES ), new byte[] { 3 } ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -72,19 +71,9 @@ public LogId( String filePrefixPattern, String logType, String clientHostname,
public int getHash() {
Hasher hasher = Hashing.murmur3_32_fixed().newHasher();

for( var header : headers ) hasher.putString( header, UTF_8 );
for( var type : types ) hasher.putBytes( type );
for( String header : headers ) hasher.putString( header, UTF_8 );
for( byte[] type : types ) hasher.putBytes( type );

return hasher.hash().asInt();
}

public final String lock() {
return ( String.join( "-", properties.values() )
+ String.join( "-", List.of(
filePrefixPattern,
logType,
Arrays.deepToString( headers ),
Arrays.deepToString( types )
) ) ).intern();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@
*/
package oap.logstream.net;

import lombok.SneakyThrows;
import oap.logstream.LogId;
import oap.logstream.LogStreamProtocol.ProtocolVersion;
import oap.util.FastByteArrayOutputStream;

import java.io.Serializable;
import java.util.zip.GZIPOutputStream;

public class Buffer implements Serializable {
public static final int DIGESTION_POSITION = 0;
public static final int DATE_LENGTH_POSITION = 8;
public final LogId id;
public final ProtocolVersion protocolVersion;
private final byte[] data;
private int position = 0;
private int position = DIGESTION_POSITION;
private volatile boolean closed = false;
private int dataStart;

Expand All @@ -44,17 +49,17 @@ public class Buffer implements Serializable {
}

private void initMetadata( LogId id ) {
if( position != 0 ) throw new IllegalStateException( "metadata could be set for empty buffer only!" );
var result = putLong( 0 ); //reserved for digestion control
result &= putInt( 0 ); //reserved for data length
if( position != DIGESTION_POSITION ) throw new IllegalStateException( "metadata could be set for empty buffer only!" );
boolean result = putLong( DIGESTION_POSITION ); //reserved for digestion control
result &= putInt( DIGESTION_POSITION ); //reserved for data length
result &= putUTF( id.filePrefixPattern );
result &= putUTF( id.logType );
result &= putUTF( id.clientHostname );
result &= putInt( id.headers.length );
for( var header : id.headers )
result &= putUTF( header );

for( var type : id.types ) {
for( byte[] type : id.types ) {
result &= putByte( ( byte ) type.length );
for( var t : type ) {
result &= putByte( t );
Expand All @@ -71,7 +76,7 @@ private void initMetadata( LogId id ) {
}

public final boolean put( byte[] buf ) {
return put( buf, 0, buf.length );
return put( buf, DIGESTION_POSITION, buf.length );
}

public final boolean put( byte[] buf, int offset, int length ) {
Expand All @@ -94,7 +99,7 @@ private byte[] encodeInt( int i ) {
return new byte[] {
( byte ) ( ( i >>> 24 ) & 0xFF ),
( byte ) ( ( i >>> 16 ) & 0xFF ),
( byte ) ( ( i >>> 8 ) & 0xFF ),
( byte ) ( ( i >>> DATE_LENGTH_POSITION ) & 0xFF ),
( byte ) ( i & 0xFF )
};
}
Expand All @@ -111,19 +116,19 @@ private byte[] encodeLong( long v ) {
( byte ) ( v >>> 32 ),
( byte ) ( v >>> 24 ),
( byte ) ( v >>> 16 ),
( byte ) ( v >>> 8 ),
( byte ) ( v >>> DATE_LENGTH_POSITION ),
( byte ) v
};
}

@SuppressWarnings( "checkstyle:UnnecessaryParentheses" )
public final boolean putUTF( String str ) {
int strlen = str.length();
int utflen = 0;
int c, count = 0;
int utflen = DIGESTION_POSITION;
int c, count = DIGESTION_POSITION;

/* use charAt instead of copying String to char array */
for( int i = 0; i < strlen; i++ ) {
for( int i = DIGESTION_POSITION; i < strlen; i++ ) {
c = str.charAt( i );
if( ( c >= 0x0001 ) && ( c <= 0x007F ) ) utflen++;
else if( c > 0x07FF ) utflen += 3;
Expand All @@ -134,11 +139,11 @@ public final boolean putUTF( String str ) {

byte[] buffer = new byte[utflen + 2];

buffer[count++] = ( byte ) ( ( utflen >>> 8 ) & 0xFF );
buffer[count++] = ( byte ) ( ( utflen >>> DATE_LENGTH_POSITION ) & 0xFF );
buffer[count++] = ( byte ) ( utflen & 0xFF );

int i;
for( i = 0; i < strlen; i++ ) {
for( i = DIGESTION_POSITION; i < strlen; i++ ) {
c = str.charAt( i );
if( !( ( c >= 0x0001 ) && ( c <= 0x007F ) ) ) break;
buffer[count++] = ( byte ) c;
Expand All @@ -157,7 +162,7 @@ public final boolean putUTF( String str ) {
buffer[count++] = ( byte ) ( 0x80 | ( c & 0x3F ) );
}
}
return put( buffer, 0, utflen + 2 );
return put( buffer, DIGESTION_POSITION, utflen + 2 );
}

public final boolean available( int length ) {
Expand All @@ -170,12 +175,12 @@ public final byte[] data() {

public final void reset( LogId id ) {
this.closed = false;
this.position = 0;
this.position = DIGESTION_POSITION;
initMetadata( id );
}

public final boolean isEmpty() {
return dataLength() == 0;
return dataLength() == DIGESTION_POSITION;
}

public final int length() {
Expand All @@ -185,9 +190,10 @@ public final int length() {
public final void close( long digestionId ) {
this.closed = true;
byte[] digestion = encodeLong( digestionId );

byte[] length = encodeInt( dataLength() );
System.arraycopy( digestion, 0, this.data, 0, digestion.length );
System.arraycopy( length, 0, this.data, 8, length.length );
System.arraycopy( digestion, DIGESTION_POSITION, this.data, DIGESTION_POSITION, digestion.length );
System.arraycopy( length, DIGESTION_POSITION, this.data, DATE_LENGTH_POSITION, length.length );
}

public final int dataLength() {
Expand All @@ -202,4 +208,23 @@ public final int headerLength() {
public final String toString() {
return id + "," + position;
}

@SneakyThrows
public FastByteArrayOutputStream compress() {
FastByteArrayOutputStream fastByteArrayOutputStream = new FastByteArrayOutputStream( length() );

fastByteArrayOutputStream.write( data, DIGESTION_POSITION, dataStart );

try( GZIPOutputStream gzipOutputStream = new GZIPOutputStream( fastByteArrayOutputStream ) ) {
gzipOutputStream.write( data, dataStart, dataLength() );
}

int newDataLength = fastByteArrayOutputStream.length - dataStart;

byte[] bytes = encodeInt( newDataLength );

System.arraycopy( bytes, DIGESTION_POSITION, fastByteArrayOutputStream.array, DATE_LENGTH_POSITION, bytes.length );

return fastByteArrayOutputStream;
}
}
Loading