Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/** This annotation indicates that this field will be compressed during serialization. */
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.FIELD)
public @interface Compress {
// No-op.
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception {

String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()";

boolean compress = field.getAnnotation(Compress.class) != null;

TypeMirror type = field.asType();

if (type.getKind().isPrimitive()) {
Expand Down Expand Up @@ -371,9 +373,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {

imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");

returnFalseIfWriteFailed(write, "writer.writeMap", getExpr,
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)),
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));
List<String> args = new ArrayList<>();
args.add(getExpr);
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));

if (compress)
args.add("true");

returnFalseIfWriteFailed(write, "writer.writeMap", args.toArray(String[]::new));
}

else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
Expand All @@ -385,8 +393,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.
else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList")))
returnFalseIfWriteFailed(write, "writer.writeGridLongList", getExpr);

else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr);
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
if (compress)
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true");
else
returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr);
}

else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
List<? extends TypeMirror> typeArgs = ((DeclaredType)type).getTypeArguments();
Expand Down Expand Up @@ -508,6 +520,8 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception {

String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName;

boolean compress = field.getAnnotation(Compress.class) != null;

if (type.getKind().isPrimitive()) {
String typeName = capitalizeOnlyFirst(type.getKind().name());

Expand Down Expand Up @@ -580,9 +594,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) {

assert typeArgs.size() == 2;

returnFalseIfReadFailed(name, "reader.readMap",
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)),
"MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)), "false");
List<String> args = new ArrayList<>();
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)));
args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)));
args.add("false");

if (compress)
args.add("true");

returnFalseIfReadFailed(name, "reader.readMap", args.toArray(String[]::new));
}

else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
Expand All @@ -594,8 +614,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.
else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList")))
returnFalseIfReadFailed(name, "reader.readGridLongList");

else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
returnFalseIfReadFailed(name, "reader.readMessage");
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
if (compress)
returnFalseIfReadFailed(name, "reader.readMessage", "true");
else
returnFalseIfReadFailed(name, "reader.readMessage");
}

else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
List<? extends TypeMirror> typeArgs = ((DeclaredType)type).getTypeArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
Expand All @@ -49,6 +51,12 @@ public class DirectMessageReader implements MessageReader {
@GridToStringInclude
private final DirectMessageState<StateItem> state;

/** */
private final MessageFactory msgFactory;

/** */
private final IgniteCacheObjectProcessor cacheObjProc;

/** Buffer for reading. */
private ByteBuffer buf;

Expand All @@ -60,6 +68,9 @@ public class DirectMessageReader implements MessageReader {
* @param cacheObjProc Cache object processor.
*/
public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
this.msgFactory = msgFactory;
this.cacheObjProc = cacheObjProc;

state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, cacheObjProc);
Expand Down Expand Up @@ -315,12 +326,21 @@ public ByteBuffer getBuffer() {
}

/** {@inheritDoc} */
@Nullable @Override public <T extends Message> T readMessage() {
@Nullable @Override public <T extends Message> T readMessage(boolean compress) {
DirectByteBufferStream stream = state.item().stream;

T msg = stream.readMessage(this);
T msg;

lastRead = stream.lastFinished();
if (compress)
msg = readCompressedMessageAndDeserialize(
stream,
tmpReader -> tmpReader.state.item().stream.readMessage(tmpReader)
);
else {
msg = stream.readMessage(this);

lastRead = stream.lastFinished();
}

return msg;
}
Expand Down Expand Up @@ -393,12 +413,21 @@ public ByteBuffer getBuffer() {

/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
MessageCollectionItemType valType, boolean linked) {
MessageCollectionItemType valType, boolean linked, boolean compress) {
DirectByteBufferStream stream = state.item().stream;

M map = stream.readMap(keyType, valType, linked, this);
M map;

lastRead = stream.lastFinished();
if (compress)
map = readCompressedMessageAndDeserialize(
stream,
tmpReader -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader)
);
else {
map = stream.readMap(keyType, valType, linked, this);

lastRead = stream.lastFinished();
}

return map;
}
Expand All @@ -418,6 +447,11 @@ public ByteBuffer getBuffer() {
state.item().state++;
}

/** {@inheritDoc} */
@Override public void decrementState() {
state.item().state--;
}

/** {@inheritDoc} */
@Override public void beforeInnerMessageRead() {
state.forward();
Expand All @@ -440,6 +474,40 @@ public ByteBuffer getBuffer() {
return S.toString(DirectMessageReader.class, this);
}

/** */
private <T> T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Function<DirectMessageReader, T> fun) {
Message msg = stream.readMessage(this);

lastRead = stream.lastFinished();

if (!lastRead || msg == null)
return null;

assert msg instanceof CompressedMessage : msg;

CompressedMessage msg0 = (CompressedMessage)msg;

if (msg0.dataSize() == 0)
return null;

byte[] uncompressed = msg0.uncompressed();

ByteBuffer tmpBuf = ByteBuffer.allocateDirect(uncompressed.length);

tmpBuf.put(uncompressed);
tmpBuf.flip();

DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, cacheObjProc);

tmpReader.setBuffer(tmpBuf);

T res = fun.apply(tmpReader);

lastRead = tmpReader.state.item().stream.lastFinished();

return res;
}

/**
*/
private static class StateItem implements DirectMessageStateItem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
Expand All @@ -44,15 +46,23 @@
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
/** */
private static final int TMP_BUF_CAPACITY = 1024 * 1024;

/** State. */
@GridToStringInclude
private final DirectMessageState<StateItem> state;

/** */
private final MessageFactory msgFactory;

/** Buffer for writing. */
private ByteBuffer buf;

/** */
public DirectMessageWriter(final MessageFactory msgFactory) {
this.msgFactory = msgFactory;

state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory);
Expand Down Expand Up @@ -293,10 +303,17 @@ public ByteBuffer getBuffer() {
}

/** {@inheritDoc} */
@Override public boolean writeMessage(@Nullable Message msg) {
@Override public boolean writeMessage(@Nullable Message msg, boolean compress) {
DirectByteBufferStream stream = state.item().stream;

stream.writeMessage(msg, this);
if (compress)
writeCompressedMessage(
tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter),
msg == null,
stream
);
else
stream.writeMessage(msg, this);

return stream.lastFinished();
}
Expand Down Expand Up @@ -353,10 +370,17 @@ public ByteBuffer getBuffer() {

/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType) {
MessageCollectionItemType valType, boolean compress) {
DirectByteBufferStream stream = state.item().stream;

stream.writeMap(map, keyType, valType, this);
if (compress)
writeCompressedMessage(
tmpWriter -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter),
map == null,
stream
);
else
stream.writeMap(map, keyType, valType, this);

return stream.lastFinished();
}
Expand All @@ -381,6 +405,11 @@ public ByteBuffer getBuffer() {
state.item().state++;
}

/** {@inheritDoc} */
@Override public void decrementState() {
state.item().state--;
}

/** {@inheritDoc} */
@Override public void beforeInnerMessageWrite() {
state.forward();
Expand All @@ -403,6 +432,40 @@ public ByteBuffer getBuffer() {
return S.toString(DirectMessageWriter.class, this);
}

/** */
private void writeCompressedMessage(Consumer<DirectMessageWriter> consumer, boolean empty, DirectByteBufferStream stream) {
if (empty) {
stream.writeMessage(CompressedMessage.empty(), this);

return;
}

ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY);

DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory);

tmpWriter.setBuffer(tmpBuf);

boolean finished;

do {
if (!tmpBuf.hasRemaining()) {
tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);

tmpWriter.setBuffer(tmpBuf);

tmpWriter.reset();
}

consumer.accept(tmpWriter);

finished = tmpWriter.state.item().stream.lastFinished();
}
while (!finished);

stream.writeMessage(new CompressedMessage(tmpBuf), this);
}

/**
*/
private static class StateItem implements DirectMessageStateItem {
Expand Down
Loading