diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java new file mode 100644 index 00000000000000..65dea1348f846d --- /dev/null +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** This annotation indicates that this field will be compressed during serialization. */ +@Retention(RetentionPolicy.CLASS) +@Target(ElementType.FIELD) +public @interface Compress { + // No-op. +} diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 03420b425ccf79..c0f84275f83dab 100644 --- a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -318,6 +318,8 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception { String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()"; + boolean compress = field.getAnnotation(Compress.class) != null; + TypeMirror type = field.asType(); if (type.getKind().isPrimitive()) { @@ -371,9 +373,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - returnFalseIfWriteFailed(write, "writer.writeMap", getExpr, - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)), - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + List args = new ArrayList<>(); + args.add(getExpr); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + + if (compress) + args.add("true"); + + returnFalseIfWriteFailed(write, "writer.writeMap", args.toArray(String[]::new)); } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) @@ -385,8 +393,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache. else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfWriteFailed(write, "writer.writeGridLongList", getExpr); - else if (assignableFrom(type, type(MESSAGE_INTERFACE))) - returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); + else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { + if (compress) + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true"); + else + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); + } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { List typeArgs = ((DeclaredType)type).getTypeArguments(); @@ -508,6 +520,8 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName; + boolean compress = field.getAnnotation(Compress.class) != null; + if (type.getKind().isPrimitive()) { String typeName = capitalizeOnlyFirst(type.getKind().name()); @@ -580,9 +594,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { assert typeArgs.size() == 2; - returnFalseIfReadFailed(name, "reader.readMap", - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)), - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)), "false"); + List args = new ArrayList<>(); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + args.add("false"); + + if (compress) + args.add("true"); + + returnFalseIfReadFailed(name, "reader.readMap", args.toArray(String[]::new)); } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) @@ -594,8 +614,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache. else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfReadFailed(name, "reader.readGridLongList"); - else if (assignableFrom(type, type(MESSAGE_INTERFACE))) - returnFalseIfReadFailed(name, "reader.readMessage"); + else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { + if (compress) + returnFalseIfReadFailed(name, "reader.readMessage", "true"); + else + returnFalseIfReadFailed(name, "reader.readMessage"); + } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { List typeArgs = ((DeclaredType)type).getTypeArguments(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 24d97da7b2de5a..3aca6db961537f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -23,9 +23,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -49,6 +51,12 @@ public class DirectMessageReader implements MessageReader { @GridToStringInclude private final DirectMessageState state; + /** */ + private final MessageFactory msgFactory; + + /** */ + private final IgniteCacheObjectProcessor cacheObjProc; + /** Buffer for reading. */ private ByteBuffer buf; @@ -60,6 +68,9 @@ public class DirectMessageReader implements MessageReader { * @param cacheObjProc Cache object processor. */ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) { + this.msgFactory = msgFactory; + this.cacheObjProc = cacheObjProc; + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { return new StateItem(msgFactory, cacheObjProc); @@ -315,12 +326,21 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Nullable @Override public T readMessage() { + @Nullable @Override public T readMessage(boolean compress) { DirectByteBufferStream stream = state.item().stream; - T msg = stream.readMessage(this); + T msg; - lastRead = stream.lastFinished(); + if (compress) + msg = readCompressedMessageAndDeserialize( + stream, + tmpReader -> tmpReader.state.item().stream.readMessage(tmpReader) + ); + else { + msg = stream.readMessage(this); + + lastRead = stream.lastFinished(); + } return msg; } @@ -393,12 +413,21 @@ public ByteBuffer getBuffer() { /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked) { + MessageCollectionItemType valType, boolean linked, boolean compress) { DirectByteBufferStream stream = state.item().stream; - M map = stream.readMap(keyType, valType, linked, this); + M map; - lastRead = stream.lastFinished(); + if (compress) + map = readCompressedMessageAndDeserialize( + stream, + tmpReader -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader) + ); + else { + map = stream.readMap(keyType, valType, linked, this); + + lastRead = stream.lastFinished(); + } return map; } @@ -418,6 +447,11 @@ public ByteBuffer getBuffer() { state.item().state++; } + /** {@inheritDoc} */ + @Override public void decrementState() { + state.item().state--; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageRead() { state.forward(); @@ -440,6 +474,40 @@ public ByteBuffer getBuffer() { return S.toString(DirectMessageReader.class, this); } + /** */ + private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Function fun) { + Message msg = stream.readMessage(this); + + lastRead = stream.lastFinished(); + + if (!lastRead || msg == null) + return null; + + assert msg instanceof CompressedMessage : msg; + + CompressedMessage msg0 = (CompressedMessage)msg; + + if (msg0.dataSize() == 0) + return null; + + byte[] uncompressed = msg0.uncompressed(); + + ByteBuffer tmpBuf = ByteBuffer.allocateDirect(uncompressed.length); + + tmpBuf.put(uncompressed); + tmpBuf.flip(); + + DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); + + tmpReader.setBuffer(tmpBuf); + + T res = fun.apply(tmpReader); + + lastRead = tmpReader.state.item().stream.lastFinished(); + + return res; + } + /** */ private static class StateItem implements DirectMessageStateItem { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 1da76aa14cfe74..4c30538a76cf01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -23,9 +23,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -44,15 +46,23 @@ * Message writer implementation. */ public class DirectMessageWriter implements MessageWriter { + /** */ + private static final int TMP_BUF_CAPACITY = 1024 * 1024; + /** State. */ @GridToStringInclude private final DirectMessageState state; + /** */ + private final MessageFactory msgFactory; + /** Buffer for writing. */ private ByteBuffer buf; /** */ public DirectMessageWriter(final MessageFactory msgFactory) { + this.msgFactory = msgFactory; + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { return new StateItem(msgFactory); @@ -293,10 +303,17 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeMessage(@Nullable Message msg) { + @Override public boolean writeMessage(@Nullable Message msg, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMessage(msg, this); + if (compress) + writeCompressedMessage( + tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter), + msg == null, + stream + ); + else + stream.writeMessage(msg, this); return stream.lastFinished(); } @@ -353,10 +370,17 @@ public ByteBuffer getBuffer() { /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType) { + MessageCollectionItemType valType, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMap(map, keyType, valType, this); + if (compress) + writeCompressedMessage( + tmpWriter -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter), + map == null, + stream + ); + else + stream.writeMap(map, keyType, valType, this); return stream.lastFinished(); } @@ -381,6 +405,11 @@ public ByteBuffer getBuffer() { state.item().state++; } + /** {@inheritDoc} */ + @Override public void decrementState() { + state.item().state--; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageWrite() { state.forward(); @@ -403,6 +432,40 @@ public ByteBuffer getBuffer() { return S.toString(DirectMessageWriter.class, this); } + /** */ + private void writeCompressedMessage(Consumer consumer, boolean empty, DirectByteBufferStream stream) { + if (empty) { + stream.writeMessage(CompressedMessage.empty(), this); + + return; + } + + ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + + DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory); + + tmpWriter.setBuffer(tmpBuf); + + boolean finished; + + do { + if (!tmpBuf.hasRemaining()) { + tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2); + + tmpWriter.setBuffer(tmpBuf); + + tmpWriter.reset(); + } + + consumer.accept(tmpWriter); + + finished = tmpWriter.state.item().stream.lastFinished(); + } + while (!finished); + + stream.writeMessage(new CompressedMessage(tmpBuf), this); + } + /** */ private static class StateItem implements DirectMessageStateItem { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index 5c526bf6fd3b4a..ff66fec11d6e45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -31,6 +31,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -1013,7 +1014,12 @@ private void writeRandomAccessList(List list, MessageCollectionItemType i * @param valType Value type. * @param writer Writer. */ - public void writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, MessageWriter writer) { + public void writeMap( + Map map, + MessageCollectionItemType keyType, + MessageCollectionItemType valType, + MessageWriter writer + ) { if (map != null) { if (mapIt == null) { writeInt(map.size()); @@ -1551,6 +1557,7 @@ public T readMessage(MessageReader reader) { Message msg0 = msg; msgTypeDone = false; + msg = null; return (T)msg0; @@ -2106,6 +2113,9 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w break; case MSG: + if (val instanceof CompressedMessage) + throw new IllegalArgumentException("CompressedMessage is not supported in collections."); + try { if (val != null) writer.beforeInnerMessageWrite(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java new file mode 100644 index 00000000000000..f8b70b8f0cbb41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import org.apache.ignite.IgniteException; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** */ +public class CompressedMessage implements Message { + /** Type code. */ + public static final short TYPE_CODE = 517; + + /** */ + private static final int CHUNK_SIZE = 1024 * 1024; + + /** */ + private static final int BUFFER_CAPACITY = 10 * 1024 * 1024; + + /** */ + private ByteBuffer tmpBuf; + + /** */ + private int dataSize = -1; + + /** */ + private ChunkedByteReader chunkedReader; + + /** */ + private byte[] chunk; + + /** */ + private boolean finalChunk; + + /** */ + public CompressedMessage() { + // No-op. + } + + /** + * @param buf Source buffer with seralized data. + */ + public CompressedMessage(ByteBuffer buf) { + dataSize = buf.position(); + chunkedReader = new ChunkedByteReader(compress(buf), CHUNK_SIZE); + } + + /** */ + public static CompressedMessage empty() { + CompressedMessage msg = new CompressedMessage(); + + msg.dataSize = 0; + msg.finalChunk = true; + msg.chunk = null; + + return msg; + } + + /** */ + public int dataSize() { + return dataSize; + } + + /** */ + public byte[] uncompressed() { + assert finalChunk; + + byte[] uncompress = uncompress(); + + assert uncompress != null; + assert uncompress.length == dataSize : "Expected=" + dataSize + ", actual=" + uncompress.length; + + return uncompress; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType())) + return false; + + writer.onHeaderWritten(); + } + + while (true) { + if (chunk == null && chunkedReader != null) { + chunk = chunkedReader.nextChunk(); + + finalChunk = (chunk == null); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt(dataSize)) + return false; + + writer.incrementState(); + + if (dataSize == 0) + return true; + + case 1: + if (!writer.writeBoolean(finalChunk)) + return false; + + writer.incrementState(); + + if (finalChunk) + return true; + + case 2: + if (!writer.writeByteArray(chunk)) + return false; + + chunk = null; + + writer.decrementState(); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocate(BUFFER_CAPACITY); + + assert chunk == null : chunk; + + while (true) { + switch (reader.state()) { + case 0: + dataSize = reader.readInt(); + + if (!reader.isLastRead()) + return false; + + if (dataSize == 0) + return true; + + reader.incrementState(); + + case 1: + finalChunk = reader.readBoolean(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + if (finalChunk) + return true; + + case 2: + chunk = reader.readByteArray(); + + if (!reader.isLastRead()) + return false; + + if (chunk != null) { + if (tmpBuf.remaining() <= CHUNK_SIZE) { + ByteBuffer newTmpBuf = ByteBuffer.allocate(tmpBuf.capacity() * 2); + + newTmpBuf.put(tmpBuf); + + tmpBuf = newTmpBuf; + } + + tmpBuf.put(chunk); + + reader.decrementState(); + + chunk = null; + } + } + } + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } + + /** + * @param buf Buffer. + */ + private byte[] compress(ByteBuffer buf) { + byte[] data = new byte[buf.position()]; + + buf.flip(); + buf.get(data); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length); + Deflater deflater = new Deflater(Deflater.BEST_SPEED, true); + + try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, deflater)) { + dos.write(data); + dos.finish(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + deflater.end(); + } + + return baos.toByteArray(); + } + + /** */ + private byte[] uncompress() { + if (tmpBuf == null) + return null; + + byte[] uncompressedData; + + Inflater inflater = new Inflater(true); + + try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(tmpBuf.array()), inflater)) { + uncompressedData = iis.readAllBytes(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + inflater.end(); + } + + tmpBuf = null; + + return uncompressedData; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CompressedMessage{" + + "chunk=" + Arrays.toString(chunk) + + ", tmpBuf=" + tmpBuf + + ", dataSize=" + dataSize + + ", chunkedReader=" + chunkedReader + + ", finalChunk=" + finalChunk + + '}'; + } + + /** */ + private static class ChunkedByteReader { + /** */ + private final byte[] inputData; + + /** */ + private final int chunkSize; + + /** */ + private int position; + + /** */ + ChunkedByteReader(byte[] inputData, int chunkSize) { + this.inputData = inputData; + this.chunkSize = chunkSize; + } + + /** */ + byte[] nextChunk() { + if (position >= inputData.length) + return null; + + int curChunkSize = Math.min(inputData.length - position, chunkSize); + + byte[] chunk = new byte[curChunkSize]; + + System.arraycopy(inputData, position, chunk, 0, curChunkSize); + + position += curChunkSize; + + return chunk; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6dc2d02dd0af5d..0dfd249e541cbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -76,6 +76,8 @@ import org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer; +import org.apache.ignite.internal.codegen.GridDhtPartitionFullMapSerializer; +import org.apache.ignite.internal.codegen.GridDhtPartitionMapSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleMessageSerializer; @@ -113,6 +115,7 @@ import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer; import org.apache.ignite.internal.codegen.GridNearTxPrepareResponseSerializer; import org.apache.ignite.internal.codegen.GridNearUnlockRequestSerializer; +import org.apache.ignite.internal.codegen.GridPartitionStateMapSerializer; import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer; import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer; @@ -237,6 +240,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -309,6 +314,7 @@ import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult; import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch; import org.apache.ignite.internal.util.GridByteArrayList; +import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -518,6 +524,10 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer()); factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer()); + factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new); + factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); + factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); + factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this // [120..123] - DR diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bbc2e78bbb4c0c..c0a1cefecced0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1288,7 +1288,7 @@ private void sendAllPartitions( ) { long time = System.currentTimeMillis(); - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null, grps); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null, null, null, grps); m.topologyVersion(msgTopVer); @@ -1342,8 +1342,6 @@ private void sendAllPartitions( /** * Creates partitions full message for all cache groups. * - * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ - * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. * @param partHistSuppliers Partition history suppliers map. @@ -1351,7 +1349,6 @@ private void sendAllPartitions( * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( - boolean compress, @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @@ -1359,14 +1356,12 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( ) { Collection grps = cctx.cache().cacheGroups(); - return createPartitionsFullMessage(compress, exchId, lastVer, partHistSuppliers, partsToReload, grps); + return createPartitionsFullMessage(exchId, lastVer, partHistSuppliers, partsToReload, grps); } /** * Creates partitions full message for selected cache groups. * - * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ - * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. * @param partHistSuppliers Partition history suppliers map. @@ -1375,7 +1370,6 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( - boolean compress, @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @@ -1387,8 +1381,6 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload); - m.compressed(compress); - final Map> dupData = new HashMap<>(); Map> partsSizes = new HashMap<>(); @@ -1406,7 +1398,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); if (locMap != null) - addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey()); + addFullPartitionsMap(m, dupData, grp.groupId(), locMap, affCache.similarAffinityKey()); Map partSizesMap = grp.topology().globalPartSizes(); @@ -1426,7 +1418,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( GridDhtPartitionFullMap map = top.partitionMap(true); if (map != null) - addFullPartitionsMap(m, dupData, compress, top.groupId(), map, top.similarAffinityKey()); + addFullPartitionsMap(m, dupData, top.groupId(), map, top.similarAffinityKey()); if (exchId != null) { m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters()); @@ -1449,14 +1441,12 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( /** * @param m Message. * @param dupData Duplicated data map. - * @param compress {@code True} if need check for duplicated partition state data. * @param grpId Cache group ID. * @param map Map to add. * @param affKey Cache affinity key. */ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Map> dupData, - boolean compress, Integer grpId, GridDhtPartitionFullMap map, Object affKey) { @@ -1464,7 +1454,7 @@ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Integer dupDataCache = null; - if (compress && affKey != null && !m.containsGroup(grpId)) { + if (affKey != null && !m.containsGroup(grpId)) { T2 state0 = dupData.get(affKey); if (state0 != null && state0.get2().partitionStateEquals(map)) { @@ -1556,8 +1546,7 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( ) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, - cctx.versions().last(), - true); + cctx.versions().last()); Map> dupData = new HashMap<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 068d5a3f5bbf0c..2127578e99dded 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -21,31 +21,46 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.AbstractMap; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.NotNull; /** * Full partition map from all nodes. */ public class GridDhtPartitionFullMap - extends HashMap implements Comparable, Externalizable { + extends AbstractMap implements Comparable, Externalizable, Message { + /** Type code. */ + public static final short TYPE_CODE = 520; + /** */ private static final long serialVersionUID = 0L; /** Node ID. */ + @Order(0) private UUID nodeId; /** Node order. */ + @Order(1) private long nodeOrder; /** Update sequence number. */ + @Order(value = 2, method = "updateSequence") private long updateSeq; + /** Partition map. */ + @Order(3) + private Map map; + /** * @param nodeId Node ID. * @param nodeOrder Node order. @@ -59,6 +74,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq) { this.nodeId = nodeId; this.nodeOrder = nodeOrder; this.updateSeq = updateSeq; + map = new HashMap<>(); } /** @@ -77,6 +93,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map< this.nodeId = nodeId; this.nodeOrder = nodeOrder; this.updateSeq = updateSeq; + map = new HashMap<>(); for (Map.Entry e : m.entrySet()) { GridDhtPartitionMap part = e.getValue(); @@ -96,7 +113,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map< * @param updateSeq Update sequence. */ public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) { - super(m); + map = new HashMap<>(m); nodeId = m.nodeId; nodeOrder = m.nodeOrder; @@ -107,7 +124,7 @@ public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) { * Empty constructor required for {@link Externalizable}. */ public GridDhtPartitionFullMap() { - // No-op. + map = new HashMap<>(); } /** @@ -124,6 +141,13 @@ public UUID nodeId() { return nodeId; } + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @return Node order. */ @@ -131,6 +155,13 @@ public long nodeOrder() { return nodeOrder; } + /** + * @param nodeOrder Node order. + */ + public void nodeOrder(long nodeOrder) { + this.nodeOrder = nodeOrder; + } + /** * @return Update sequence. */ @@ -138,6 +169,20 @@ public long updateSequence() { return updateSeq; } + /** + * @return Partition map. + */ + public Map map() { + return map; + } + + /** + * @param map Partition map. + */ + public void map(Map map) { + this.map = map; + } + /** * @param fullMap Map. * @return {@code True} if this map and given map contain the same data. @@ -159,7 +204,7 @@ public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { /** * @param updateSeq New update sequence value. */ - public void newUpdateSequence(long updateSeq) { + public void updateSequence(long updateSeq) { this.updateSeq = updateSeq; } @@ -167,7 +212,7 @@ public void newUpdateSequence(long updateSeq) { * @param updateSeq New update sequence value. * @return Old update sequence value. */ - public long updateSequence(long updateSeq) { + public long checkAndUpdateSequence(long updateSeq) { long old = this.updateSeq; assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + @@ -185,7 +230,7 @@ public long updateSequence(long updateSeq) { out.writeLong(nodeOrder); out.writeLong(updateSeq); - U.writeMap(out, this); + U.writeMap(out, map); } /** {@inheritDoc} */ @@ -195,7 +240,12 @@ public long updateSequence(long updateSeq) { nodeOrder = in.readLong(); updateSeq = in.readLong(); - putAll(U.readMap(in)); + map = new HashMap<>(); + + Map map0 = U.readMap(in); + + if (map0 != null) + map.putAll(map0); } /** {@inheritDoc} */ @@ -260,4 +310,69 @@ public String toFullString() { return Long.compare(updateSeq, o.updateSeq); } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap put(UUID key, GridDhtPartitionMap val) { + return map.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void putAll(Map m) { + map.putAll(m); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap get(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap remove(Object key) { + return map.remove(key); + } + + /** {@inheritDoc} */ + @Override public int size() { + return map.size(); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return map.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean containsValue(Object val) { + return map.containsValue(val); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(Object key) { + return map.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public void clear() { + map.clear(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Set keySet() { + return map.keySet(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Collection values() { + return map.values(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Set> entrySet() { + return map.entrySet(); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index 3cbf75767e7fa0..accd3f730031a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -26,31 +26,40 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; /** * Partition map from single node. */ -public class GridDhtPartitionMap implements Comparable, Externalizable { +public class GridDhtPartitionMap implements Comparable, Externalizable, Message { + /** Type code. */ + public static final short TYPE_CODE = 519; + /** */ private static final long serialVersionUID = 0L; /** Node ID. */ + @Order(0) protected UUID nodeId; /** Update sequence number. */ + @Order(value = 1, method = "updateSequence") protected long updateSeq; /** Topology version. */ + @Order(value = 2, method = "topologyVersion") protected AffinityTopologyVersion top; /** */ + @Order(3) protected GridPartitionStateMap map; /** */ @@ -197,6 +206,17 @@ public GridPartitionStateMap map() { return map; } + /** + * @param map Map. + */ + public void map(GridPartitionStateMap map) { + this.map = new GridPartitionStateMap(); + + if (map != null) + for (Map.Entry entry : map.entrySet()) + put(entry.getKey(), entry.getValue()); + } + /** * @return Node ID. */ @@ -204,6 +224,13 @@ public UUID nodeId() { return nodeId; } + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @return Update sequence. */ @@ -211,6 +238,13 @@ public long updateSequence() { return updateSeq; } + /** + * @param updateSeq Update sequence. + */ + public void updateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + /** * @param updateSeq New update sequence value. * @param topVer Current topology version. @@ -239,6 +273,13 @@ public AffinityTopologyVersion topologyVersion() { return top; } + /** + * @param top Topology version. + */ + public void topologyVersion(AffinityTopologyVersion top) { + this.top = top; + } + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionMap o) { assert nodeId.equals(o.nodeId); @@ -337,4 +378,9 @@ public String toFullString() { @Override public String toString() { return S.toString(GridDhtPartitionMap.class, this, "top", top, "updateSeq", updateSeq, "size", size()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index df9672c02e5808..98390f503ce1ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -28,9 +28,6 @@ * Request for single partition info. */ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { - /** */ - private static final byte COMPRESSED_FLAG_MASK = 0x01; - /** */ private static final byte RESTORE_STATE_FLAG_MASK = 0x02; @@ -128,20 +125,6 @@ public void flags(byte flags) { this.flags = flags; } - /** - * @return {@code True} if message data is compressed. - */ - public final boolean compressed() { - return (flags & COMPRESSED_FLAG_MASK) != 0; - } - - /** - * @param compressed {@code True} if message data is compressed. - */ - public final void compressed(boolean compressed) { - flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); - } - /** * @param restoreState Restore exchange state flag. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3456d33cbab19b..abf0d704c73d01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2117,8 +2117,7 @@ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) { msg = new GridDhtPartitionsSingleMessage(exchangeId(), cctx.kernalContext().clientNode(), - cctx.versions().last(), - true); + cctx.versions().last()); } else { msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), @@ -2165,14 +2164,12 @@ else if (localJoinExchange()) } /** - * @param compress Message compress flag. * @return Message. */ - private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) { + private GridDhtPartitionsFullMessage createPartitionsMessage() { GridCacheVersion last = lastVer.get(); GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage( - compress, exchangeId(), last != null ? last : cctx.versions().last(), partHistSuppliers, @@ -2958,7 +2955,7 @@ public void forceClientReconnect(ClusterNode node, GridDhtPartitionsSingleMessag onDone(null, reconnectEx); - GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(); fullMsg.setErrorsMap(exchangeGlobalExceptions); @@ -3119,7 +3116,7 @@ public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleM return; } - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(); msg.rebalanced(rebalanced()); @@ -3289,7 +3286,7 @@ private void onAffinityInitialized(IgniteInternalFuture>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + GridDhtPartitionsFullMessage m = createPartitionsMessage(); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); @@ -3858,7 +3855,7 @@ else if (exchCtx.events().hasServerLeft()) cctx.versions().onExchange(lastVer.get().order()); - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(); if (!cctx.affinity().rebalanceRequired() && !deactivateCluster()) msg.rebalanced(true); @@ -4420,8 +4417,7 @@ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSi if (dynamicCacheStartExchange() && exchangeLocE != null) { res = new GridDhtPartitionsSingleMessage(msg.restoreExchangeId(), cctx.kernalContext().clientNode(), - cctx.versions().last(), - true); + cctx.versions().last()); res.setError(exchangeLocE); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index ead64e48bcff91..96e329734d259e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -29,21 +27,19 @@ import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.lang.IgniteThrowableFunction; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -57,33 +53,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private static final byte REBALANCED_FLAG_MASK = 0x01; /** grpId -> FullMap */ + @Order(value = 6, method = "localPartitions") + @Compress @GridToStringInclude private Map parts; - /** - * Serialized local partitions. - *

- * TODO Remove this field after completing task IGNITE-26976. - */ - @Order(value = 6, method = "partitionBytes") - private byte[] partsBytes; - /** */ @Order(value = 7, method = "duplicatedPartitionsData") private Map dupPartsData; /** Partitions update counters. */ @Order(value = 8, method = "partitionCounters") + @Compress @GridToStringInclude private IgniteDhtPartitionCountersMap partCntrs; /** Partitions history suppliers. */ @Order(value = 9, method = "partitionHistorySuppliers") + @Compress @GridToStringInclude private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers; /** Partitions that must be cleared and re-loaded. */ @Order(value = 10, method = "partitionsToReload") + @Compress @GridToStringInclude private IgniteDhtPartitionsToReloadMap partsToReload; @@ -104,6 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * All logic resides within getter and setter. */ @Order(value = 13, method = "errorMessages") + @Compress @SuppressWarnings("unused") private Map errMsgs; @@ -180,7 +174,6 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, cp.parts = null; cp.dupPartsData = dupPartsData; - cp.partsBytes = partsBytes; cp.partCntrs = partCntrs; cp.partHistSuppliers = partHistSuppliers; cp.partsToReload = partsToReload; @@ -258,17 +251,17 @@ public Map partitions() { } /** - * @return Serialized local partitions. + * @return Local partitions as is for serializer. */ - public byte[] partitionBytes() { - return partsBytes; + public Map localPartitions() { + return parts; } /** - * @param partsBytes Serialized local partitions. + * @param parts Local partitions. */ - public void partitionBytes(byte[] partsBytes) { - this.partsBytes = partsBytes; + public void localPartitions(Map parts) { + this.parts = parts; } /** @@ -294,7 +287,6 @@ public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nu parts.put(grpId, fullMap); if (dupDataCache != null) { - assert compressed(); assert parts.containsKey(dupDataCache); if (dupPartsData == null) @@ -506,43 +498,6 @@ public void lostPartitions(Map lostParts) { this.lostParts = lostParts; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - boolean marshal = !F.isEmpty(parts) && partsBytes == null; - - if (marshal) { - // Reserve at least 2 threads for system operations. - int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - - Collection objectsToMarshall = new ArrayList<>(); - - if (!F.isEmpty(parts) && partsBytes == null) - objectsToMarshall.add(parts); - - Collection marshalled = U.doInParallel( - parallelismLvl, - ctx.kernalContext().pools().getSystemExecutorService(), - objectsToMarshall, - new IgniteThrowableFunction() { - @Override public byte[] apply(Object payload) throws IgniteCheckedException { - byte[] marshalled = U.marshal(ctx, payload); - - if (compressed()) - marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel()); - - return marshalled; - } - }); - - Iterator iter = marshalled.iterator(); - - if (!F.isEmpty(parts) && partsBytes == null) - partsBytes = iter.next(); - } - } - /** * @return Topology version. */ @@ -561,58 +516,29 @@ public void topologyVersion(AffinityTopologyVersion topVer) { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig()); - - Collection objectsToUnmarshall = new ArrayList<>(); - - // Reserve at least 2 threads for system operations. - int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - - if (partsBytes != null && parts == null) - objectsToUnmarshall.add(partsBytes); - - Collection unmarshalled = U.doInParallel( - parallelismLvl, - ctx.kernalContext().pools().getSystemExecutorService(), - objectsToUnmarshall, - new IgniteThrowableFunction() { - @Override public Object apply(byte[] binary) throws IgniteCheckedException { - return compressed() - ? U.unmarshalZip(ctx.marshaller(), binary, clsLdr) - : U.unmarshal(ctx, binary, clsLdr); - } - } - ); - - Iterator iter = unmarshalled.iterator(); - - if (partsBytes != null && parts == null) { - parts = (Map)iter.next(); - - if (dupPartsData != null) { - assert parts != null; + if (dupPartsData != null) { + assert parts != null; - for (Map.Entry e : dupPartsData.entrySet()) { - GridDhtPartitionFullMap map1 = parts.get(e.getKey()); - GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + for (Map.Entry e : dupPartsData.entrySet()) { + GridDhtPartitionFullMap map1 = parts.get(e.getKey()); + GridDhtPartitionFullMap map2 = parts.get(e.getValue()); - assert map1 != null : e.getKey(); - assert map2 != null : e.getValue(); - assert map1.size() == map2.size(); + assert map1 != null : e.getKey(); + assert map2 != null : e.getValue(); + assert map1.size() == map2.size(); - for (Map.Entry e0 : map2.entrySet()) { - GridDhtPartitionMap partMap1 = map1.get(e0.getKey()); + for (Map.Entry e0 : map2.entrySet()) { + GridDhtPartitionMap partMap1 = map1.get(e0.getKey()); - assert partMap1 != null && partMap1.map().isEmpty() : partMap1; - assert !partMap1.hasMovingPartitions() : partMap1; + assert partMap1 != null && partMap1.map().isEmpty() : partMap1; + assert !partMap1.hasMovingPartitions() : partMap1; - GridDhtPartitionMap partMap2 = e0.getValue(); + GridDhtPartitionMap partMap2 = e0.getValue(); - assert partMap2 != null; + assert partMap2 != null; - for (Map.Entry stateEntry : partMap2.entrySet()) - partMap1.put(stateEntry.getKey(), stateEntry.getValue()); - } + for (Map.Entry stateEntry : partMap2.entrySet()) + partMap1.put(stateEntry.getKey(), stateEntry.getValue()); } } } @@ -679,7 +605,6 @@ public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager disco * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - partsBytes = null; partCntrs = null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9266a486c7c74f..b66d6250f3694b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -30,7 +31,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -39,34 +39,31 @@ * Sent in response to {@link GridDhtPartitionsSingleRequest} and during processing partitions exchange future. */ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMessage { - /** Local partitions. Serialized as {@link #partsBytes}, may be compressed. */ + /** Local partitions. */ + @Order(value = 6, method = "localPartitions") + @Compress @GridToStringInclude private Map parts; - /** - * Serialized local partitions. Unmarshalled to {@link #parts}. - *

- * TODO Remove this field after completing task IGNITE-26976. - */ - @Order(value = 6, method = "partitionBytes") - private byte[] partsBytes; - /** */ @Order(value = 7, method = "duplicatedPartitionsData") private Map dupPartsData; /** Partitions update counters. */ @Order(value = 8, method = "partitionUpdateCounters") + @Compress @GridToStringInclude private Map partCntrs; /** Partitions sizes. */ @Order(value = 9, method = "partitionSizesMap") + @Compress @GridToStringInclude private Map partsSizes; /** Partitions history reservation counters. */ @Order(value = 10, method = "partitionHistoryCountersMap") + @Compress @GridToStringInclude private Map partHistCntrs; @@ -105,17 +102,13 @@ public GridDhtPartitionsSingleMessage() { * @param exchId Exchange ID. * @param client Client message flag. * @param lastVer Last version. - * @param compress {@code True} if it is possible to use compression for message. */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, - @Nullable GridCacheVersion lastVer, - boolean compress + @Nullable GridCacheVersion lastVer ) { super(exchId, lastVer); - compressed(compress); - this.client = client; } @@ -187,7 +180,6 @@ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap, @Nulla parts.put(cacheId, locMap); if (dupDataCache != null) { - assert compressed(); assert F.isEmpty(locMap.map()); assert parts.containsKey(dupDataCache); @@ -332,17 +324,17 @@ public Map partitions() { } /** - * @return Serialized local partitions. + * @return Local partitions as is for serializer. */ - public byte[] partitionBytes() { - return partsBytes; + public Map localPartitions() { + return parts; } /** - * @param partsBytes Serialized local partitions. + * @param parts Local partitions. */ - public void partitionBytes(byte[] partsBytes) { - this.partsBytes = partsBytes; + public void localPartitions(Map parts) { + this.parts = parts; } /** @@ -387,36 +379,10 @@ public void exchangeStartTime(long exchangeStartTime) { this.exchangeStartTime = exchangeStartTime; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (parts != null && partsBytes == null) { - byte[] partsBytes0 = U.marshal(ctx, parts); - - if (compressed()) { - try { - partsBytes0 = U.zip(partsBytes0); - } - catch (IgniteCheckedException e) { - U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); - } - } - - partsBytes = partsBytes0; - } - } - /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) { - parts = compressed() - ? U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())) - : U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } - if (dupPartsData != null) { assert parts != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 9ca26f636ae3a2..dfcd6ac5d467f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -1167,7 +1167,7 @@ private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long u updateSeq = seq; } - node2part.updateSequence(updateSeq); + node2part.checkAndUpdateSequence(updateSeq); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index b004053ca6f350..ba62046013f8ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -1950,7 +1950,7 @@ else if (isStaleUpdate(cur, parts)) { long updateSeq = this.updateSeq.incrementAndGet(); - node2part.newUpdateSequence(updateSeq); + node2part.updateSequence(updateSeq); boolean changed = false; @@ -2650,7 +2650,7 @@ private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, Aff updateSeq = seq; } - node2part.updateSequence(updateSeq); + node2part.checkAndUpdateSequence(updateSeq); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java index 855d3c79ef4a50..f15c1516dd538b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java @@ -24,17 +24,22 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Grid partition state map. States are encoded using bits. *

* Null values are prohibited. */ -public class GridPartitionStateMap extends AbstractMap implements Serializable { +public class GridPartitionStateMap extends AbstractMap implements Serializable, Message { /** Empty map. */ public static final GridPartitionStateMap EMPTY = new GridPartitionStateMap(0); + /** Type code. */ + public static final short TYPE_CODE = 518; + /** */ private static final long serialVersionUID = 0L; @@ -55,9 +60,11 @@ public class GridPartitionStateMap extends AbstractMap Type of the message. * @return Message. */ - public T readMessage(); + public default T readMessage() { + return readMessage(false); + } + + /** + * Reads nested message. + * + * @param compress Whether message should be decompressed. + * @param Type of the message. + * @return Message. + */ + public T readMessage(boolean compress); /** * Reads {@link CacheObject}. @@ -259,9 +270,25 @@ public default void setBuffer(ByteBuffer buf) { * @param Type of the read map. * @return Map. */ - // TODO: IGNITE-26329 — switch to the new readMap method without the flag parameter + // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter + public default > M readMap(MessageCollectionItemType keyType, + MessageCollectionItemType valType, boolean linked) { + return readMap(keyType, valType, linked, false); + } + + /** + * Reads map. + * + * @param keyType Map key type. + * @param valType Map value type. + * @param linked Whether {@link LinkedHashMap} should be created. + * @param compress Whether map should be compressed. + * @param Type of the read map. + * @return Map. + */ + // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked); + MessageCollectionItemType valType, boolean linked, boolean compress); /** * Tells whether last invocation of any of {@code readXXX(...)} @@ -284,6 +311,11 @@ public default void setBuffer(ByteBuffer buf) { */ public void incrementState(); + /** + * Deccrements read state. + */ + public void decrementState(); + /** * Callback called before inner message is read. */ diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index 43fda9736d9e1f..e073ca8313423d 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -247,7 +247,18 @@ public default void setBuffer(ByteBuffer buf) { * @param val Message. * @return Whether value was fully written. */ - public boolean writeMessage(Message val); + public default boolean writeMessage(Message val) { + return writeMessage(val, false); + } + + /** + * Writes nested message. + * + * @param val Message. + * @param compress Whether message should be compressed. + * @return Whether value was fully written. + */ + public boolean writeMessage(Message val, boolean compress); /** * Writes {@link CacheObject}. @@ -313,8 +324,24 @@ public default void setBuffer(ByteBuffer buf) { * @param Initial value types of the map to write. * @return Whether value was fully written. */ + public default boolean writeMap(Map map, MessageCollectionItemType keyType, + MessageCollectionItemType valType) { + return writeMap(map, keyType, valType, false); + } + + /** + * Writes map. + * + * @param map Map. + * @param keyType Map key type. + * @param valType Map value type. + * @param compress Whether map should be compressed. + * @param Initial key types of the map to write. + * @param Initial value types of the map to write. + * @return Whether value was fully written. + */ public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType); + MessageCollectionItemType valType, boolean compress); /** * @return Whether header of current message is already written. @@ -338,6 +365,11 @@ public boolean writeMap(Map map, MessageCollectionItemType keyType, */ public void incrementState(); + /** + * Decrements state. + */ + public void decrementState(); + /** * Callback called before inner message is written. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index 4875ec0a465e83..317ff20a961eca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -286,7 +286,7 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeMessage(Message val) { + @Override public boolean writeMessage(Message val, boolean compress) { return writeField(Message.class); } @@ -307,7 +307,7 @@ private boolean writeField(Class type) { /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType) { + MessageCollectionItemType valType, boolean compress) { return writeField(Map.class); } @@ -329,6 +329,11 @@ private boolean writeField(Class type) { ++state; } + /** {@inheritDoc} */ + @Override public void decrementState() { + --state; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageWrite() {} @@ -518,7 +523,7 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T readMessage() { + @Override public T readMessage(boolean compress) { readField(Message.class); return null; @@ -568,7 +573,7 @@ private void readField(Class type) { /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked) { + MessageCollectionItemType valType, boolean linked, boolean compress) { readField(Map.class); return null; @@ -594,6 +599,11 @@ private void readField(Class type) { ++state; } + /** {@inheritDoc} */ + @Override public void decrementState() { + --state; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageRead() {} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java index c5d6afb5bc63c8..782e156f35ce4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java @@ -38,6 +38,9 @@ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessa if (msg instanceof NodeIdMessage) FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true); + if (msg instanceof CompressedMessage) + FieldUtils.writeField(msg, "dataSize", 0, true); + return msg; }