From 4c79a2c348356ee8fe4cccb241f834cb9b57be4f Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Thu, 4 Dec 2025 14:10:04 +0500 Subject: [PATCH 1/6] WIP-first --- .../org/apache/ignite/internal/Compress.java | 30 +++ .../internal/MessageSerializerGenerator.java | 44 +++- .../internal/direct/DirectMessageReader.java | 8 +- .../internal/direct/DirectMessageWriter.java | 8 +- .../direct/stream/DirectByteBufferStream.java | 191 +++++++++++++++++- .../communication/GridIoMessageFactory.java | 9 + .../GridCachePartitionExchangeManager.java | 23 +-- .../preloader/GridDhtPartitionFullMap.java | 129 +++++++++++- .../dht/preloader/GridDhtPartitionMap.java | 48 ++++- .../GridDhtPartitionsAbstractMessage.java | 17 -- .../GridDhtPartitionsExchangeFuture.java | 18 +- .../GridDhtPartitionsFullMessage.java | 133 +++--------- .../GridDhtPartitionsSingleMessage.java | 62 ++---- .../topology/GridClientPartitionTopology.java | 2 +- .../GridDhtPartitionTopologyImpl.java | 4 +- .../internal/util/GridPartitionStateMap.java | 37 +++- .../communication/MessageReader.java | 33 ++- .../communication/MessageWriter.java | 31 ++- .../AbstractMessageSerializationTest.java | 8 +- 19 files changed, 587 insertions(+), 248 deletions(-) create mode 100644 modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java new file mode 100644 index 0000000000000..65dea1348f846 --- /dev/null +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/Compress.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** This annotation indicates that this field will be compressed during serialization. */ +@Retention(RetentionPolicy.CLASS) +@Target(ElementType.FIELD) +public @interface Compress { + // No-op. +} diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 03420b425ccf7..c0f84275f83da 100644 --- a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -318,6 +318,8 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception { String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()"; + boolean compress = field.getAnnotation(Compress.class) != null; + TypeMirror type = field.asType(); if (type.getKind().isPrimitive()) { @@ -371,9 +373,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - returnFalseIfWriteFailed(write, "writer.writeMap", getExpr, - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)), - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + List args = new ArrayList<>(); + args.add(getExpr); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + + if (compress) + args.add("true"); + + returnFalseIfWriteFailed(write, "writer.writeMap", args.toArray(String[]::new)); } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) @@ -385,8 +393,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache. else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfWriteFailed(write, "writer.writeGridLongList", getExpr); - else if (assignableFrom(type, type(MESSAGE_INTERFACE))) - returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); + else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { + if (compress) + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true"); + else + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); + } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { List typeArgs = ((DeclaredType)type).getTypeArguments(); @@ -508,6 +520,8 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName; + boolean compress = field.getAnnotation(Compress.class) != null; + if (type.getKind().isPrimitive()) { String typeName = capitalizeOnlyFirst(type.getKind().name()); @@ -580,9 +594,15 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { assert typeArgs.size() == 2; - returnFalseIfReadFailed(name, "reader.readMap", - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0)), - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1)), "false"); + List args = new ArrayList<>(); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); + args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + args.add("false"); + + if (compress) + args.add("true"); + + returnFalseIfReadFailed(name, "reader.readMap", args.toArray(String[]::new)); } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) @@ -594,8 +614,12 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache. else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfReadFailed(name, "reader.readGridLongList"); - else if (assignableFrom(type, type(MESSAGE_INTERFACE))) - returnFalseIfReadFailed(name, "reader.readMessage"); + else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { + if (compress) + returnFalseIfReadFailed(name, "reader.readMessage", "true"); + else + returnFalseIfReadFailed(name, "reader.readMessage"); + } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { List typeArgs = ((DeclaredType)type).getTypeArguments(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 24d97da7b2de5..db8ab965da624 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -315,10 +315,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Nullable @Override public T readMessage() { + @Nullable @Override public T readMessage(boolean compress) { DirectByteBufferStream stream = state.item().stream; - T msg = stream.readMessage(this); + T msg = stream.readMessage(this, compress); lastRead = stream.lastFinished(); @@ -393,10 +393,10 @@ public ByteBuffer getBuffer() { /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked) { + MessageCollectionItemType valType, boolean linked, boolean compress) { DirectByteBufferStream stream = state.item().stream; - M map = stream.readMap(keyType, valType, linked, this); + M map = stream.readMap(keyType, valType, linked, this, compress); lastRead = stream.lastFinished(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 1da76aa14cfe7..a6cc0233b37d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -293,10 +293,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeMessage(@Nullable Message msg) { + @Override public boolean writeMessage(@Nullable Message msg, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMessage(msg, this); + stream.writeMessage(msg, this, compress); return stream.lastFinished(); } @@ -353,10 +353,10 @@ public ByteBuffer getBuffer() { /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType) { + MessageCollectionItemType valType, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMap(map, keyType, valType, this); + stream.writeMap(map, keyType, valType, this, compress); return stream.lastFinished(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index 5c526bf6fd3b4..e6280f7727117 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.direct.stream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -29,6 +32,10 @@ import java.util.RandomAccess; import java.util.Set; import java.util.UUID; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -339,6 +346,9 @@ public class DirectByteBufferStream { /** */ private byte cacheObjType; + /** */ + private boolean uncompressFinished; + /** * Constructror for stream used for writing messages. * @@ -879,14 +889,26 @@ public void writeGridLongList(@Nullable GridLongList val) { /** * @param msg Message. * @param writer Writer. + * @param compress Whether to compress message. */ - public void writeMessage(Message msg, MessageWriter writer) { + public void writeMessage(Message msg, MessageWriter writer, boolean compress) { + if (compress && buf.position() != 0) { + lastFinished = false; + + return; + } + if (msg != null) { if (buf.hasRemaining()) { try { writer.beforeInnerMessageWrite(); - lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer); + boolean finished = msgFactory.serializer(msg.directType()).writeTo(msg, writer); + + if (compress) + compressData(); + + lastFinished = finished; } finally { writer.afterInnerMessageWrite(lastFinished); @@ -895,8 +917,12 @@ public void writeMessage(Message msg, MessageWriter writer) { else lastFinished = false; } - else - writeShort(Short.MIN_VALUE); + else { + if (compress) + writeInt(-1); + else + writeShort(Short.MIN_VALUE); + } } /** @@ -1012,8 +1038,21 @@ private void writeRandomAccessList(List list, MessageCollectionItemType i * @param keyType Key type. * @param valType Value type. * @param writer Writer. + * @param compress Whether to compress map. */ - public void writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, MessageWriter writer) { + public void writeMap( + Map map, + MessageCollectionItemType keyType, + MessageCollectionItemType valType, + MessageWriter writer, + boolean compress + ) { + if (compress && buf.position() != 0) { + lastFinished = false; + + return; + } + if (map != null) { if (mapIt == null) { writeInt(map.size()); @@ -1035,21 +1074,32 @@ public void writeMap(Map map, MessageCollectionItemType keyType, Me if (!keyDone) { write(keyType, e.getKey(), writer); - if (!lastFinished) + if (!lastFinished) { + if (compress) + compressData(); + return; + } keyDone = true; } write(valType, e.getValue(), writer); - if (!lastFinished) + if (!lastFinished) { + if (compress) + compressData(); + return; + } mapCur = NULL; keyDone = false; } + if (compress) + compressData(); + mapIt = null; } else @@ -1519,7 +1569,26 @@ public GridLongList readGridLongList() { * @param reader Reader. * @return Message. */ - public T readMessage(MessageReader reader) { + public T readMessage(MessageReader reader, boolean compress) { + if (compress && !uncompressFinished) { + int startPos = buf.position(); + + if (readInt() == -1) { + lastFinished = true; + + return null; + } + + buf.position(startPos); + + boolean uncompressed = uncompressData(); + + if (!lastFinished || !uncompressed) + return null; + + uncompressFinished = true; + } + if (!msgTypeDone) { if (buf.remaining() < Message.DIRECT_TYPE_SIZE) { lastFinished = false; @@ -1551,6 +1620,10 @@ public T readMessage(MessageReader reader) { Message msg0 = msg; msgTypeDone = false; + + if (compress) + uncompressFinished = false; + msg = null; return (T)msg0; @@ -1674,10 +1747,30 @@ private > C readCollection(MessageCollectionItemType ite * @param valType Value type. * @param linked Whether linked map should be created. * @param reader Reader. + * @param compress Whether the map is compressed. * @return Map. */ public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, - boolean linked, MessageReader reader) { + boolean linked, MessageReader reader, boolean compress) { + if (compress && !uncompressFinished) { + int startPos = buf.position(); + + if (readInt() == -1) { + lastFinished = true; + + return null; + } + + buf.position(startPos); + + boolean uncompressed = uncompressData(); + + if (!lastFinished || !uncompressed) + return null; + + uncompressFinished = true; + } + if (readSize == -1) { int size = readInt(); @@ -1723,6 +1816,9 @@ private > C readCollection(MessageCollectionItemType ite map = null; + if (compress) + uncompressFinished = false; + return map0; } @@ -2110,7 +2206,7 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w if (val != null) writer.beforeInnerMessageWrite(); - writeMessage((Message)val, writer); + writeMessage((Message)val, writer, false); } finally { if (val != null) @@ -2204,7 +2300,7 @@ protected Object read(MessageCollectionItemType type, MessageReader reader) { return readGridLongList(); case MSG: - return readMessage(reader); + return readMessage(reader, false); default: throw new IllegalArgumentException("Unknown type: " + type); @@ -2260,6 +2356,79 @@ private void readUuidRaw() { return S.toString(DirectByteBufferStream.class, this); } + /** */ + private void compressData() { + if (buf.position() == 0) + return; + + byte[] rawData = new byte[buf.position()]; + + buf.flip(); + buf.get(rawData); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(rawData.length); + Deflater deflater = new Deflater(Deflater.BEST_SPEED, true); + + try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, deflater)) { + dos.write(rawData); + dos.finish(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + deflater.end(); + } + + buf.clear(); + + writeByteArray(baos.toByteArray()); + } + + /** */ + private boolean uncompressData() { + byte[] compressedData = readByteArray(); + + if (!lastFinished || compressedData == null) + return false; + + byte[] uncompressedData; + + Inflater inflater = new Inflater(true); + + try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(compressedData), inflater)) { + uncompressedData = iis.readAllBytes(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + inflater.end(); + } + + byte[] tmpBuf = null; + + if (buf.remaining() > 0) { + tmpBuf = new byte[buf.remaining()]; + buf.get(tmpBuf); + } + + int tmpBufLength = tmpBuf != null ? tmpBuf.length : 0; + + if (uncompressedData.length + tmpBufLength > buf.capacity()) + buf = ByteBuffer.allocateDirect(uncompressedData.length + tmpBufLength); + + buf.clear(); + buf.put(uncompressedData); + + if (tmpBuf != null) + buf.put(tmpBuf); + + buf.flip(); + + return true; + } + /** * Array creator. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6dc2d02dd0af5..385522ba6c3ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -76,6 +76,8 @@ import org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer; +import org.apache.ignite.internal.codegen.GridDhtPartitionFullMapSerializer; +import org.apache.ignite.internal.codegen.GridDhtPartitionMapSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer; import org.apache.ignite.internal.codegen.GridDhtPartitionsSingleMessageSerializer; @@ -113,6 +115,7 @@ import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer; import org.apache.ignite.internal.codegen.GridNearTxPrepareResponseSerializer; import org.apache.ignite.internal.codegen.GridNearUnlockRequestSerializer; +import org.apache.ignite.internal.codegen.GridPartitionStateMapSerializer; import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer; import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer; @@ -237,6 +240,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -309,6 +314,7 @@ import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult; import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch; import org.apache.ignite.internal.util.GridByteArrayList; +import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -518,6 +524,9 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer()); factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer()); + factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); + factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); + factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this // [120..123] - DR diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bbc2e78bbb4c0..c0a1cefecced0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1288,7 +1288,7 @@ private void sendAllPartitions( ) { long time = System.currentTimeMillis(); - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null, grps); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null, null, null, grps); m.topologyVersion(msgTopVer); @@ -1342,8 +1342,6 @@ private void sendAllPartitions( /** * Creates partitions full message for all cache groups. * - * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ - * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. * @param partHistSuppliers Partition history suppliers map. @@ -1351,7 +1349,6 @@ private void sendAllPartitions( * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( - boolean compress, @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @@ -1359,14 +1356,12 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( ) { Collection grps = cctx.cache().cacheGroups(); - return createPartitionsFullMessage(compress, exchId, lastVer, partHistSuppliers, partsToReload, grps); + return createPartitionsFullMessage(exchId, lastVer, partHistSuppliers, partsToReload, grps); } /** * Creates partitions full message for selected cache groups. * - * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ - * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. * @param partHistSuppliers Partition history suppliers map. @@ -1375,7 +1370,6 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( - boolean compress, @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @@ -1387,8 +1381,6 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload); - m.compressed(compress); - final Map> dupData = new HashMap<>(); Map> partsSizes = new HashMap<>(); @@ -1406,7 +1398,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); if (locMap != null) - addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey()); + addFullPartitionsMap(m, dupData, grp.groupId(), locMap, affCache.similarAffinityKey()); Map partSizesMap = grp.topology().globalPartSizes(); @@ -1426,7 +1418,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( GridDhtPartitionFullMap map = top.partitionMap(true); if (map != null) - addFullPartitionsMap(m, dupData, compress, top.groupId(), map, top.similarAffinityKey()); + addFullPartitionsMap(m, dupData, top.groupId(), map, top.similarAffinityKey()); if (exchId != null) { m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters()); @@ -1449,14 +1441,12 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( /** * @param m Message. * @param dupData Duplicated data map. - * @param compress {@code True} if need check for duplicated partition state data. * @param grpId Cache group ID. * @param map Map to add. * @param affKey Cache affinity key. */ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Map> dupData, - boolean compress, Integer grpId, GridDhtPartitionFullMap map, Object affKey) { @@ -1464,7 +1454,7 @@ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Integer dupDataCache = null; - if (compress && affKey != null && !m.containsGroup(grpId)) { + if (affKey != null && !m.containsGroup(grpId)) { T2 state0 = dupData.get(affKey); if (state0 != null && state0.get2().partitionStateEquals(map)) { @@ -1556,8 +1546,7 @@ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( ) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, - cctx.versions().last(), - true); + cctx.versions().last()); Map> dupData = new HashMap<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 068d5a3f5bbf0..ae5579e8131b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -21,31 +21,46 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.AbstractMap; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.NotNull; /** * Full partition map from all nodes. */ public class GridDhtPartitionFullMap - extends HashMap implements Comparable, Externalizable { + extends AbstractMap implements Comparable, Externalizable, Message { + /** Type code. */ + public static final short TYPE_CODE = 519; + /** */ private static final long serialVersionUID = 0L; /** Node ID. */ + @Order(0) private UUID nodeId; /** Node order. */ + @Order(1) private long nodeOrder; /** Update sequence number. */ + @Order(value = 2, method = "updateSequence") private long updateSeq; + /** Partition map. */ + @Order(3) + private Map map; + /** * @param nodeId Node ID. * @param nodeOrder Node order. @@ -59,6 +74,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq) { this.nodeId = nodeId; this.nodeOrder = nodeOrder; this.updateSeq = updateSeq; + map = new HashMap<>(); } /** @@ -77,6 +93,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map< this.nodeId = nodeId; this.nodeOrder = nodeOrder; this.updateSeq = updateSeq; + map = new HashMap<>(); for (Map.Entry e : m.entrySet()) { GridDhtPartitionMap part = e.getValue(); @@ -96,7 +113,7 @@ public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map< * @param updateSeq Update sequence. */ public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) { - super(m); + map = new HashMap<>(m); nodeId = m.nodeId; nodeOrder = m.nodeOrder; @@ -107,7 +124,7 @@ public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) { * Empty constructor required for {@link Externalizable}. */ public GridDhtPartitionFullMap() { - // No-op. + map = new HashMap<>(); } /** @@ -124,6 +141,13 @@ public UUID nodeId() { return nodeId; } + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @return Node order. */ @@ -131,6 +155,13 @@ public long nodeOrder() { return nodeOrder; } + /** + * @param nodeOrder Node order. + */ + public void nodeOrder(long nodeOrder) { + this.nodeOrder = nodeOrder; + } + /** * @return Update sequence. */ @@ -138,6 +169,20 @@ public long updateSequence() { return updateSeq; } + /** + * @return Partition map. + */ + public Map map() { + return map; + } + + /** + * @param map Partition map. + */ + public void map(Map map) { + this.map = map; + } + /** * @param fullMap Map. * @return {@code True} if this map and given map contain the same data. @@ -159,7 +204,7 @@ public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { /** * @param updateSeq New update sequence value. */ - public void newUpdateSequence(long updateSeq) { + public void updateSequence(long updateSeq) { this.updateSeq = updateSeq; } @@ -167,7 +212,7 @@ public void newUpdateSequence(long updateSeq) { * @param updateSeq New update sequence value. * @return Old update sequence value. */ - public long updateSequence(long updateSeq) { + public long checkAndUpdateSequence(long updateSeq) { long old = this.updateSeq; assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + @@ -185,7 +230,7 @@ public long updateSequence(long updateSeq) { out.writeLong(nodeOrder); out.writeLong(updateSeq); - U.writeMap(out, this); + U.writeMap(out, map); } /** {@inheritDoc} */ @@ -195,7 +240,12 @@ public long updateSequence(long updateSeq) { nodeOrder = in.readLong(); updateSeq = in.readLong(); - putAll(U.readMap(in)); + map = new HashMap<>(); + + Map map0 = U.readMap(in); + + if (map0 != null) + map.putAll(map0); } /** {@inheritDoc} */ @@ -260,4 +310,69 @@ public String toFullString() { return Long.compare(updateSeq, o.updateSeq); } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap put(UUID key, GridDhtPartitionMap val) { + return map.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void putAll(Map m) { + map.putAll(m); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap get(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap remove(Object key) { + return map.remove(key); + } + + /** {@inheritDoc} */ + @Override public int size() { + return map.size(); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return map.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean containsValue(Object val) { + return map.containsValue(val); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(Object key) { + return map.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public void clear() { + map.clear(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Set keySet() { + return map.keySet(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Collection values() { + return map.values(); + } + + /** {@inheritDoc} */ + @Override public @NotNull Set> entrySet() { + return map.entrySet(); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index 3cbf75767e7fa..4186fdbc92fe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -26,31 +26,40 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; /** * Partition map from single node. */ -public class GridDhtPartitionMap implements Comparable, Externalizable { +public class GridDhtPartitionMap implements Comparable, Externalizable, Message { + /** Type code. */ + public static final short TYPE_CODE = 518; + /** */ private static final long serialVersionUID = 0L; /** Node ID. */ + @Order(0) protected UUID nodeId; /** Update sequence number. */ + @Order(value = 1, method = "updateSequence") protected long updateSeq; /** Topology version. */ + @Order(value = 2, method = "topologyVersion") protected AffinityTopologyVersion top; /** */ + @Order(3) protected GridPartitionStateMap map; /** */ @@ -197,6 +206,17 @@ public GridPartitionStateMap map() { return map; } + /** + * @param map Map. + */ + public void map(GridPartitionStateMap map) { + this.map = new GridPartitionStateMap(); + + if (map != null) + for (Map.Entry entry : map.entrySet()) + put(entry.getKey(), entry.getValue()); + } + /** * @return Node ID. */ @@ -204,6 +224,13 @@ public UUID nodeId() { return nodeId; } + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @return Update sequence. */ @@ -211,6 +238,13 @@ public long updateSequence() { return updateSeq; } + /** + * @param updateSeq Update sequence. + */ + public void updateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + /** * @param updateSeq New update sequence value. * @param topVer Current topology version. @@ -239,6 +273,13 @@ public AffinityTopologyVersion topologyVersion() { return top; } + /** + * @param top Topology version. + */ + public void topologyVersion(AffinityTopologyVersion top) { + this.top = top; + } + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionMap o) { assert nodeId.equals(o.nodeId); @@ -337,4 +378,9 @@ public String toFullString() { @Override public String toString() { return S.toString(GridDhtPartitionMap.class, this, "top", top, "updateSeq", updateSeq, "size", size()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index df9672c02e580..98390f503ce1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -28,9 +28,6 @@ * Request for single partition info. */ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { - /** */ - private static final byte COMPRESSED_FLAG_MASK = 0x01; - /** */ private static final byte RESTORE_STATE_FLAG_MASK = 0x02; @@ -128,20 +125,6 @@ public void flags(byte flags) { this.flags = flags; } - /** - * @return {@code True} if message data is compressed. - */ - public final boolean compressed() { - return (flags & COMPRESSED_FLAG_MASK) != 0; - } - - /** - * @param compressed {@code True} if message data is compressed. - */ - public final void compressed(boolean compressed) { - flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); - } - /** * @param restoreState Restore exchange state flag. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3456d33cbab19..abf0d704c73d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2117,8 +2117,7 @@ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) { msg = new GridDhtPartitionsSingleMessage(exchangeId(), cctx.kernalContext().clientNode(), - cctx.versions().last(), - true); + cctx.versions().last()); } else { msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), @@ -2165,14 +2164,12 @@ else if (localJoinExchange()) } /** - * @param compress Message compress flag. * @return Message. */ - private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) { + private GridDhtPartitionsFullMessage createPartitionsMessage() { GridCacheVersion last = lastVer.get(); GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage( - compress, exchangeId(), last != null ? last : cctx.versions().last(), partHistSuppliers, @@ -2958,7 +2955,7 @@ public void forceClientReconnect(ClusterNode node, GridDhtPartitionsSingleMessag onDone(null, reconnectEx); - GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(); fullMsg.setErrorsMap(exchangeGlobalExceptions); @@ -3119,7 +3116,7 @@ public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleM return; } - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(); msg.rebalanced(rebalanced()); @@ -3289,7 +3286,7 @@ private void onAffinityInitialized(IgniteInternalFuture>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + GridDhtPartitionsFullMessage m = createPartitionsMessage(); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); @@ -3858,7 +3855,7 @@ else if (exchCtx.events().hasServerLeft()) cctx.versions().onExchange(lastVer.get().order()); - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(); if (!cctx.affinity().rebalanceRequired() && !deactivateCluster()) msg.rebalanced(true); @@ -4420,8 +4417,7 @@ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSi if (dynamicCacheStartExchange() && exchangeLocE != null) { res = new GridDhtPartitionsSingleMessage(msg.restoreExchangeId(), cctx.kernalContext().clientNode(), - cctx.versions().last(), - true); + cctx.versions().last()); res.setError(exchangeLocE); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index ead64e48bcff9..96e329734d259 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -29,21 +27,19 @@ import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.lang.IgniteThrowableFunction; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -57,33 +53,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private static final byte REBALANCED_FLAG_MASK = 0x01; /** grpId -> FullMap */ + @Order(value = 6, method = "localPartitions") + @Compress @GridToStringInclude private Map parts; - /** - * Serialized local partitions. - *

- * TODO Remove this field after completing task IGNITE-26976. - */ - @Order(value = 6, method = "partitionBytes") - private byte[] partsBytes; - /** */ @Order(value = 7, method = "duplicatedPartitionsData") private Map dupPartsData; /** Partitions update counters. */ @Order(value = 8, method = "partitionCounters") + @Compress @GridToStringInclude private IgniteDhtPartitionCountersMap partCntrs; /** Partitions history suppliers. */ @Order(value = 9, method = "partitionHistorySuppliers") + @Compress @GridToStringInclude private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers; /** Partitions that must be cleared and re-loaded. */ @Order(value = 10, method = "partitionsToReload") + @Compress @GridToStringInclude private IgniteDhtPartitionsToReloadMap partsToReload; @@ -104,6 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * All logic resides within getter and setter. */ @Order(value = 13, method = "errorMessages") + @Compress @SuppressWarnings("unused") private Map errMsgs; @@ -180,7 +174,6 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, cp.parts = null; cp.dupPartsData = dupPartsData; - cp.partsBytes = partsBytes; cp.partCntrs = partCntrs; cp.partHistSuppliers = partHistSuppliers; cp.partsToReload = partsToReload; @@ -258,17 +251,17 @@ public Map partitions() { } /** - * @return Serialized local partitions. + * @return Local partitions as is for serializer. */ - public byte[] partitionBytes() { - return partsBytes; + public Map localPartitions() { + return parts; } /** - * @param partsBytes Serialized local partitions. + * @param parts Local partitions. */ - public void partitionBytes(byte[] partsBytes) { - this.partsBytes = partsBytes; + public void localPartitions(Map parts) { + this.parts = parts; } /** @@ -294,7 +287,6 @@ public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nu parts.put(grpId, fullMap); if (dupDataCache != null) { - assert compressed(); assert parts.containsKey(dupDataCache); if (dupPartsData == null) @@ -506,43 +498,6 @@ public void lostPartitions(Map lostParts) { this.lostParts = lostParts; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - boolean marshal = !F.isEmpty(parts) && partsBytes == null; - - if (marshal) { - // Reserve at least 2 threads for system operations. - int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - - Collection objectsToMarshall = new ArrayList<>(); - - if (!F.isEmpty(parts) && partsBytes == null) - objectsToMarshall.add(parts); - - Collection marshalled = U.doInParallel( - parallelismLvl, - ctx.kernalContext().pools().getSystemExecutorService(), - objectsToMarshall, - new IgniteThrowableFunction() { - @Override public byte[] apply(Object payload) throws IgniteCheckedException { - byte[] marshalled = U.marshal(ctx, payload); - - if (compressed()) - marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel()); - - return marshalled; - } - }); - - Iterator iter = marshalled.iterator(); - - if (!F.isEmpty(parts) && partsBytes == null) - partsBytes = iter.next(); - } - } - /** * @return Topology version. */ @@ -561,58 +516,29 @@ public void topologyVersion(AffinityTopologyVersion topVer) { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig()); - - Collection objectsToUnmarshall = new ArrayList<>(); - - // Reserve at least 2 threads for system operations. - int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - - if (partsBytes != null && parts == null) - objectsToUnmarshall.add(partsBytes); - - Collection unmarshalled = U.doInParallel( - parallelismLvl, - ctx.kernalContext().pools().getSystemExecutorService(), - objectsToUnmarshall, - new IgniteThrowableFunction() { - @Override public Object apply(byte[] binary) throws IgniteCheckedException { - return compressed() - ? U.unmarshalZip(ctx.marshaller(), binary, clsLdr) - : U.unmarshal(ctx, binary, clsLdr); - } - } - ); - - Iterator iter = unmarshalled.iterator(); - - if (partsBytes != null && parts == null) { - parts = (Map)iter.next(); - - if (dupPartsData != null) { - assert parts != null; + if (dupPartsData != null) { + assert parts != null; - for (Map.Entry e : dupPartsData.entrySet()) { - GridDhtPartitionFullMap map1 = parts.get(e.getKey()); - GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + for (Map.Entry e : dupPartsData.entrySet()) { + GridDhtPartitionFullMap map1 = parts.get(e.getKey()); + GridDhtPartitionFullMap map2 = parts.get(e.getValue()); - assert map1 != null : e.getKey(); - assert map2 != null : e.getValue(); - assert map1.size() == map2.size(); + assert map1 != null : e.getKey(); + assert map2 != null : e.getValue(); + assert map1.size() == map2.size(); - for (Map.Entry e0 : map2.entrySet()) { - GridDhtPartitionMap partMap1 = map1.get(e0.getKey()); + for (Map.Entry e0 : map2.entrySet()) { + GridDhtPartitionMap partMap1 = map1.get(e0.getKey()); - assert partMap1 != null && partMap1.map().isEmpty() : partMap1; - assert !partMap1.hasMovingPartitions() : partMap1; + assert partMap1 != null && partMap1.map().isEmpty() : partMap1; + assert !partMap1.hasMovingPartitions() : partMap1; - GridDhtPartitionMap partMap2 = e0.getValue(); + GridDhtPartitionMap partMap2 = e0.getValue(); - assert partMap2 != null; + assert partMap2 != null; - for (Map.Entry stateEntry : partMap2.entrySet()) - partMap1.put(stateEntry.getKey(), stateEntry.getValue()); - } + for (Map.Entry stateEntry : partMap2.entrySet()) + partMap1.put(stateEntry.getKey(), stateEntry.getValue()); } } } @@ -679,7 +605,6 @@ public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager disco * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - partsBytes = null; partCntrs = null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9266a486c7c74..b66d6250f3694 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -30,7 +31,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -39,34 +39,31 @@ * Sent in response to {@link GridDhtPartitionsSingleRequest} and during processing partitions exchange future. */ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMessage { - /** Local partitions. Serialized as {@link #partsBytes}, may be compressed. */ + /** Local partitions. */ + @Order(value = 6, method = "localPartitions") + @Compress @GridToStringInclude private Map parts; - /** - * Serialized local partitions. Unmarshalled to {@link #parts}. - *

- * TODO Remove this field after completing task IGNITE-26976. - */ - @Order(value = 6, method = "partitionBytes") - private byte[] partsBytes; - /** */ @Order(value = 7, method = "duplicatedPartitionsData") private Map dupPartsData; /** Partitions update counters. */ @Order(value = 8, method = "partitionUpdateCounters") + @Compress @GridToStringInclude private Map partCntrs; /** Partitions sizes. */ @Order(value = 9, method = "partitionSizesMap") + @Compress @GridToStringInclude private Map partsSizes; /** Partitions history reservation counters. */ @Order(value = 10, method = "partitionHistoryCountersMap") + @Compress @GridToStringInclude private Map partHistCntrs; @@ -105,17 +102,13 @@ public GridDhtPartitionsSingleMessage() { * @param exchId Exchange ID. * @param client Client message flag. * @param lastVer Last version. - * @param compress {@code True} if it is possible to use compression for message. */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, - @Nullable GridCacheVersion lastVer, - boolean compress + @Nullable GridCacheVersion lastVer ) { super(exchId, lastVer); - compressed(compress); - this.client = client; } @@ -187,7 +180,6 @@ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap, @Nulla parts.put(cacheId, locMap); if (dupDataCache != null) { - assert compressed(); assert F.isEmpty(locMap.map()); assert parts.containsKey(dupDataCache); @@ -332,17 +324,17 @@ public Map partitions() { } /** - * @return Serialized local partitions. + * @return Local partitions as is for serializer. */ - public byte[] partitionBytes() { - return partsBytes; + public Map localPartitions() { + return parts; } /** - * @param partsBytes Serialized local partitions. + * @param parts Local partitions. */ - public void partitionBytes(byte[] partsBytes) { - this.partsBytes = partsBytes; + public void localPartitions(Map parts) { + this.parts = parts; } /** @@ -387,36 +379,10 @@ public void exchangeStartTime(long exchangeStartTime) { this.exchangeStartTime = exchangeStartTime; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (parts != null && partsBytes == null) { - byte[] partsBytes0 = U.marshal(ctx, parts); - - if (compressed()) { - try { - partsBytes0 = U.zip(partsBytes0); - } - catch (IgniteCheckedException e) { - U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); - } - } - - partsBytes = partsBytes0; - } - } - /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) { - parts = compressed() - ? U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())) - : U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } - if (dupPartsData != null) { assert parts != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 9ca26f636ae3a..dfcd6ac5d467f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -1167,7 +1167,7 @@ private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long u updateSeq = seq; } - node2part.updateSequence(updateSeq); + node2part.checkAndUpdateSequence(updateSeq); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index b004053ca6f35..ba62046013f8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -1950,7 +1950,7 @@ else if (isStaleUpdate(cur, parts)) { long updateSeq = this.updateSeq.incrementAndGet(); - node2part.newUpdateSequence(updateSeq); + node2part.updateSequence(updateSeq); boolean changed = false; @@ -2650,7 +2650,7 @@ private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, Aff updateSeq = seq; } - node2part.updateSequence(updateSeq); + node2part.checkAndUpdateSequence(updateSeq); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java index 855d3c79ef4a5..352ae8df0ea41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java @@ -24,17 +24,22 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Grid partition state map. States are encoded using bits. *

* Null values are prohibited. */ -public class GridPartitionStateMap extends AbstractMap implements Serializable { +public class GridPartitionStateMap extends AbstractMap implements Serializable, Message { /** Empty map. */ public static final GridPartitionStateMap EMPTY = new GridPartitionStateMap(0); + /** Type code. */ + public static final short TYPE_CODE = 517; + /** */ private static final long serialVersionUID = 0L; @@ -55,9 +60,11 @@ public class GridPartitionStateMap extends AbstractMap Type of the message. * @return Message. */ - public T readMessage(); + public default T readMessage() { + return readMessage(false); + } + + /** + * Reads nested message. + * + * @param compress Whether the message is compressed. + * @param Type of the message. + * @return Message. + */ + public T readMessage(boolean compress); /** * Reads {@link CacheObject}. @@ -259,9 +270,25 @@ public default void setBuffer(ByteBuffer buf) { * @param Type of the read map. * @return Map. */ - // TODO: IGNITE-26329 — switch to the new readMap method without the flag parameter + // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter + public default > M readMap(MessageCollectionItemType keyType, + MessageCollectionItemType valType, boolean linked) { + return readMap(keyType, valType, linked, false); + } + + /** + * Reads map. + * + * @param keyType Map key type. + * @param valType Map value type. + * @param linked Whether {@link LinkedHashMap} should be created. + * @param compress Whether the map is compressed. + * @param Type of the red map. + * @return Map. + */ + // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked); + MessageCollectionItemType valType, boolean linked, boolean compress); /** * Tells whether last invocation of any of {@code readXXX(...)} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index 43fda9736d9e1..ad746b094f8ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -247,7 +247,18 @@ public default void setBuffer(ByteBuffer buf) { * @param val Message. * @return Whether value was fully written. */ - public boolean writeMessage(Message val); + public default boolean writeMessage(Message val) { + return writeMessage(val, false); + } + + /** + * Writes nested message. + * + * @param val Message. + * @param compress Whether to compress message. + * @return Whether value was fully written. + */ + public boolean writeMessage(Message val, boolean compress); /** * Writes {@link CacheObject}. @@ -313,8 +324,24 @@ public default void setBuffer(ByteBuffer buf) { * @param Initial value types of the map to write. * @return Whether value was fully written. */ + public default boolean writeMap(Map map, MessageCollectionItemType keyType, + MessageCollectionItemType valType) { + return writeMap(map, keyType, valType, false); + } + + /** + * Writes map. + * + * @param map Map. + * @param keyType Map key type. + * @param valType Map value type. + * @param compress Whether to compress map. + * @param Initial key types of the map to write. + * @param Initial value types of the map to write. + * @return Whether value was fully written. + */ public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType); + MessageCollectionItemType valType, boolean compress); /** * @return Whether header of current message is already written. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index 4875ec0a465e8..8236a4479ba51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -286,7 +286,7 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeMessage(Message val) { + @Override public boolean writeMessage(Message val, boolean compress) { return writeField(Message.class); } @@ -307,7 +307,7 @@ private boolean writeField(Class type) { /** {@inheritDoc} */ @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType) { + MessageCollectionItemType valType, boolean compress) { return writeField(Map.class); } @@ -518,7 +518,7 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T readMessage() { + @Override public T readMessage(boolean compress) { readField(Message.class); return null; @@ -568,7 +568,7 @@ private void readField(Class type) { /** {@inheritDoc} */ @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked) { + MessageCollectionItemType valType, boolean linked, boolean compress) { readField(Map.class); return null; From 7e4be6a8887c8433a48e4782097e85bf6dba9308 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 29 Dec 2025 14:37:19 +0500 Subject: [PATCH 2/6] WIP-second --- .../communication/CompressedDataMessage.java | 124 ++++++++++++++++++ .../communication/GridIoMessageFactory.java | 1 + .../preloader/GridDhtPartitionFullMap.java | 2 +- .../dht/preloader/GridDhtPartitionMap.java | 2 +- .../internal/util/GridPartitionStateMap.java | 2 +- 5 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java new file mode 100644 index 0000000000000..6c4bdfce9cdd3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.nio.ByteBuffer; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** */ +public class CompressedDataMessage implements Message { + /** Type code. */ + public static final short TYPE_CODE = 517; + + /** */ + private T data; + + /** */ + private byte[] compressedData; + + /** */ + private byte[] chunk; + + /** */ + private boolean finalChunk; + + /** */ + public CompressedDataMessage() { + // No-op. + } + + /** + * @param data Data to compress. + */ + public CompressedDataMessage(T data) { + this.data = data; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType())) + return false; + + writer.onHeaderWritten(); + } + + if (chunk == null) { + //chunk = compressedData.nextChunk(); + finalChunk = (chunk == null); + } + + switch (writer.state()) { + case 0: + if (!writer.writeBoolean(finalChunk)) + return false; + + writer.incrementState(); + + if (finalChunk) + return true; + + case 1: + if (!writer.writeByteArray(chunk)) + return false; + + chunk = null; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + switch (reader.state()) { + case 0: + finalChunk = reader.readBoolean(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + chunk = reader.readByteArray(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 385522ba6c3ed..3dace50fd073e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -524,6 +524,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer()); factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer()); + factory.register(CompressedDataMessage.TYPE_CODE, CompressedDataMessage::new); factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index ae5579e8131b7..2127578e99dde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -40,7 +40,7 @@ public class GridDhtPartitionFullMap extends AbstractMap implements Comparable, Externalizable, Message { /** Type code. */ - public static final short TYPE_CODE = 519; + public static final short TYPE_CODE = 520; /** */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index 4186fdbc92fe8..accd3f730031a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -41,7 +41,7 @@ */ public class GridDhtPartitionMap implements Comparable, Externalizable, Message { /** Type code. */ - public static final short TYPE_CODE = 518; + public static final short TYPE_CODE = 519; /** */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java index 352ae8df0ea41..f15c1516dd538 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java @@ -38,7 +38,7 @@ public class GridPartitionStateMap extends AbstractMap Date: Mon, 12 Jan 2026 18:09:26 +0500 Subject: [PATCH 3/6] add CompressedMessage --- .../internal/MessageSerializerGenerator.java | 4 +- .../internal/direct/DirectMessageReader.java | 111 ++++++- .../internal/direct/DirectMessageWriter.java | 31 +- .../direct/stream/DirectByteBufferStream.java | 189 +----------- .../communication/ChunkedByteReader.java | 52 ++++ .../communication/CompressedDataMessage.java | 124 -------- .../communication/CompressedMessage.java | 284 ++++++++++++++++++ .../communication/GridIoMessageFactory.java | 3 +- .../GridDhtPartitionsSingleMessage.java | 1 + .../communication/MessageReader.java | 23 +- .../communication/MessageWriter.java | 21 +- .../AbstractMessageSerializationTest.java | 14 +- 12 files changed, 520 insertions(+), 337 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index c0f84275f83da..19160d5c83152 100644 --- a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -395,7 +395,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { if (compress) - returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true"); + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); // "true" else returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); } @@ -616,7 +616,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { if (compress) - returnFalseIfReadFailed(name, "reader.readMessage", "true"); + returnFalseIfReadFailed(name, "reader.readMessage"); // "true" else returnFalseIfReadFailed(name, "reader.readMessage"); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index db8ab965da624..f810d5a18191c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.direct; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Map; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -49,17 +51,35 @@ public class DirectMessageReader implements MessageReader { @GridToStringInclude private final DirectMessageState state; + /** */ + private final MessageFactory msgFactory; + + /** */ + private final IgniteCacheObjectProcessor cacheObjProc; + /** Buffer for reading. */ private ByteBuffer buf; /** Whether last field was fully read. */ private boolean lastRead; + /** */ + private DirectMessageReader tmpReader; + + /** */ + private ByteBuffer tmpBuf; + + /** */ + private boolean uncompressFinished; + /** * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. */ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) { + this.msgFactory = msgFactory; + this.cacheObjProc = cacheObjProc; + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { return new StateItem(msgFactory, cacheObjProc); @@ -315,10 +335,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Nullable @Override public T readMessage(boolean compress) { + @Nullable @Override public T readMessage() { DirectByteBufferStream stream = state.item().stream; - T msg = stream.readMessage(this, compress); + T msg = stream.readMessage(this); lastRead = stream.lastFinished(); @@ -396,9 +416,87 @@ public ByteBuffer getBuffer() { MessageCollectionItemType valType, boolean linked, boolean compress) { DirectByteBufferStream stream = state.item().stream; - M map = stream.readMap(keyType, valType, linked, this, compress); + M map; - lastRead = stream.lastFinished(); + if (compress) { + System.out.println(">>> BEFORE READ"); + + Message msg = stream.readMessage(this); + + System.out.println(">>> AFTER READ [msg=" + msg + ']'); + + if (msg == null) { + lastRead = stream.lastFinished(); + + return null; + } + + assert msg instanceof CompressedMessage : msg; + + CompressedMessage msg0 = (CompressedMessage)msg; + + if (msg0.dataSize() == 0) { + lastRead = true; + + return null; + } + + byte[] uncompressedBytes = msg0.uncompressed(); + + byte[] res; + + int pos = buf.position(); + + if (buf.hasRemaining()) { + byte[] remaining = new byte[buf.remaining()]; + + System.out.println(">>> remaining: " + Arrays.toString(remaining)); + + buf.get(remaining); + buf.position(pos); + + res = new byte[uncompressedBytes.length + remaining.length]; + + System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); + System.arraycopy(remaining, 0, res, uncompressedBytes.length, remaining.length); + } + else { + res = new byte[uncompressedBytes.length]; + + System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); + } + + res = new byte[uncompressedBytes.length]; + + System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); + + if (tmpReader == null) + tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); + + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocateDirect(600_000); + + int startPos = tmpBuf.position(); + + tmpBuf.put(res); + tmpBuf.position(startPos); + + tmpReader.setBuffer(tmpBuf); + + map = tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader); + + lastRead = tmpReader.state.item().stream.lastFinished(); + + if (lastRead) { + tmpReader = null; + tmpBuf = null; + } + } + else { + map = stream.readMap(keyType, valType, linked, this); + + lastRead = stream.lastFinished(); + } return map; } @@ -418,6 +516,11 @@ public ByteBuffer getBuffer() { state.item().state++; } + /** {@inheritDoc} */ + @Override public void decrementState() { + state.item().state--; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageRead() { state.forward(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index a6cc0233b37d1..f18cdc2ce60af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -48,11 +49,16 @@ public class DirectMessageWriter implements MessageWriter { @GridToStringInclude private final DirectMessageState state; + /** */ + private final MessageFactory msgFactory; + /** Buffer for writing. */ private ByteBuffer buf; /** */ public DirectMessageWriter(final MessageFactory msgFactory) { + this.msgFactory = msgFactory; + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { return new StateItem(msgFactory); @@ -293,10 +299,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeMessage(@Nullable Message msg, boolean compress) { + @Override public boolean writeMessage(@Nullable Message msg) { DirectByteBufferStream stream = state.item().stream; - stream.writeMessage(msg, this, compress); + stream.writeMessage(msg, this); return stream.lastFinished(); } @@ -356,7 +362,21 @@ public ByteBuffer getBuffer() { MessageCollectionItemType valType, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMap(map, keyType, valType, this, compress); + if (compress) { + DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory); + + tmpWriter.setBuffer(ByteBuffer.allocateDirect(600_000)); + + DirectByteBufferStream tmpStream = tmpWriter.state.item().stream; + + tmpStream.writeMap(map, keyType, valType, tmpWriter); + + CompressedMessage compressedMsg = map != null ? new CompressedMessage(tmpWriter.getBuffer()) : CompressedMessage.empty(); + + stream.writeMessage(compressedMsg, this); + } + else + stream.writeMap(map, keyType, valType, this); return stream.lastFinished(); } @@ -381,6 +401,11 @@ public ByteBuffer getBuffer() { state.item().state++; } + /** {@inheritDoc} */ + @Override public void decrementState() { + state.item().state--; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageWrite() { state.forward(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index e6280f7727117..ff66fec11d6e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.direct.stream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -32,12 +29,9 @@ import java.util.RandomAccess; import java.util.Set; import java.util.UUID; -import java.util.zip.Deflater; -import java.util.zip.DeflaterOutputStream; -import java.util.zip.Inflater; -import java.util.zip.InflaterInputStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -346,9 +340,6 @@ public class DirectByteBufferStream { /** */ private byte cacheObjType; - /** */ - private boolean uncompressFinished; - /** * Constructror for stream used for writing messages. * @@ -889,26 +880,14 @@ public void writeGridLongList(@Nullable GridLongList val) { /** * @param msg Message. * @param writer Writer. - * @param compress Whether to compress message. */ - public void writeMessage(Message msg, MessageWriter writer, boolean compress) { - if (compress && buf.position() != 0) { - lastFinished = false; - - return; - } - + public void writeMessage(Message msg, MessageWriter writer) { if (msg != null) { if (buf.hasRemaining()) { try { writer.beforeInnerMessageWrite(); - boolean finished = msgFactory.serializer(msg.directType()).writeTo(msg, writer); - - if (compress) - compressData(); - - lastFinished = finished; + lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer); } finally { writer.afterInnerMessageWrite(lastFinished); @@ -917,12 +896,8 @@ public void writeMessage(Message msg, MessageWriter writer, boolean compress) { else lastFinished = false; } - else { - if (compress) - writeInt(-1); - else - writeShort(Short.MIN_VALUE); - } + else + writeShort(Short.MIN_VALUE); } /** @@ -1038,21 +1013,13 @@ private void writeRandomAccessList(List list, MessageCollectionItemType i * @param keyType Key type. * @param valType Value type. * @param writer Writer. - * @param compress Whether to compress map. */ public void writeMap( Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, - MessageWriter writer, - boolean compress + MessageWriter writer ) { - if (compress && buf.position() != 0) { - lastFinished = false; - - return; - } - if (map != null) { if (mapIt == null) { writeInt(map.size()); @@ -1074,32 +1041,21 @@ public void writeMap( if (!keyDone) { write(keyType, e.getKey(), writer); - if (!lastFinished) { - if (compress) - compressData(); - + if (!lastFinished) return; - } keyDone = true; } write(valType, e.getValue(), writer); - if (!lastFinished) { - if (compress) - compressData(); - + if (!lastFinished) return; - } mapCur = NULL; keyDone = false; } - if (compress) - compressData(); - mapIt = null; } else @@ -1569,26 +1525,7 @@ public GridLongList readGridLongList() { * @param reader Reader. * @return Message. */ - public T readMessage(MessageReader reader, boolean compress) { - if (compress && !uncompressFinished) { - int startPos = buf.position(); - - if (readInt() == -1) { - lastFinished = true; - - return null; - } - - buf.position(startPos); - - boolean uncompressed = uncompressData(); - - if (!lastFinished || !uncompressed) - return null; - - uncompressFinished = true; - } - + public T readMessage(MessageReader reader) { if (!msgTypeDone) { if (buf.remaining() < Message.DIRECT_TYPE_SIZE) { lastFinished = false; @@ -1621,9 +1558,6 @@ public T readMessage(MessageReader reader, boolean compress) msgTypeDone = false; - if (compress) - uncompressFinished = false; - msg = null; return (T)msg0; @@ -1747,30 +1681,10 @@ private > C readCollection(MessageCollectionItemType ite * @param valType Value type. * @param linked Whether linked map should be created. * @param reader Reader. - * @param compress Whether the map is compressed. * @return Map. */ public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, - boolean linked, MessageReader reader, boolean compress) { - if (compress && !uncompressFinished) { - int startPos = buf.position(); - - if (readInt() == -1) { - lastFinished = true; - - return null; - } - - buf.position(startPos); - - boolean uncompressed = uncompressData(); - - if (!lastFinished || !uncompressed) - return null; - - uncompressFinished = true; - } - + boolean linked, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1816,9 +1730,6 @@ private > C readCollection(MessageCollectionItemType ite map = null; - if (compress) - uncompressFinished = false; - return map0; } @@ -2202,11 +2113,14 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w break; case MSG: + if (val instanceof CompressedMessage) + throw new IllegalArgumentException("CompressedMessage is not supported in collections."); + try { if (val != null) writer.beforeInnerMessageWrite(); - writeMessage((Message)val, writer, false); + writeMessage((Message)val, writer); } finally { if (val != null) @@ -2300,7 +2214,7 @@ protected Object read(MessageCollectionItemType type, MessageReader reader) { return readGridLongList(); case MSG: - return readMessage(reader, false); + return readMessage(reader); default: throw new IllegalArgumentException("Unknown type: " + type); @@ -2356,79 +2270,6 @@ private void readUuidRaw() { return S.toString(DirectByteBufferStream.class, this); } - /** */ - private void compressData() { - if (buf.position() == 0) - return; - - byte[] rawData = new byte[buf.position()]; - - buf.flip(); - buf.get(rawData); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(rawData.length); - Deflater deflater = new Deflater(Deflater.BEST_SPEED, true); - - try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, deflater)) { - dos.write(rawData); - dos.finish(); - } - catch (IOException ex) { - throw new IgniteException(ex); - } - finally { - deflater.end(); - } - - buf.clear(); - - writeByteArray(baos.toByteArray()); - } - - /** */ - private boolean uncompressData() { - byte[] compressedData = readByteArray(); - - if (!lastFinished || compressedData == null) - return false; - - byte[] uncompressedData; - - Inflater inflater = new Inflater(true); - - try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(compressedData), inflater)) { - uncompressedData = iis.readAllBytes(); - } - catch (IOException ex) { - throw new IgniteException(ex); - } - finally { - inflater.end(); - } - - byte[] tmpBuf = null; - - if (buf.remaining() > 0) { - tmpBuf = new byte[buf.remaining()]; - buf.get(tmpBuf); - } - - int tmpBufLength = tmpBuf != null ? tmpBuf.length : 0; - - if (uncompressedData.length + tmpBufLength > buf.capacity()) - buf = ByteBuffer.allocateDirect(uncompressedData.length + tmpBufLength); - - buf.clear(); - buf.put(uncompressedData); - - if (tmpBuf != null) - buf.put(tmpBuf); - - buf.flip(); - - return true; - } - /** * Array creator. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java new file mode 100644 index 0000000000000..a039fefd5a516 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +/** TODO */ +public class ChunkedByteReader { + /** */ + private final byte[] inputData; + + /** */ + private final int chunkSize; + + /** */ + private int position; + + /** */ + public ChunkedByteReader(byte[] inputData, int chunkSize) { + this.inputData = inputData; + this.chunkSize = chunkSize; + } + + /** TODO */ + public byte[] nextChunk() { + if (position >= inputData.length) + return null; + + int curChunkSize = Math.min(inputData.length - position, chunkSize); + + byte[] chunk = new byte[curChunkSize]; + + System.arraycopy(inputData, position, chunk, 0, curChunkSize); + + position += curChunkSize; + + return chunk; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java deleted file mode 100644 index 6c4bdfce9cdd3..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedDataMessage.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import java.nio.ByteBuffer; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** */ -public class CompressedDataMessage implements Message { - /** Type code. */ - public static final short TYPE_CODE = 517; - - /** */ - private T data; - - /** */ - private byte[] compressedData; - - /** */ - private byte[] chunk; - - /** */ - private boolean finalChunk; - - /** */ - public CompressedDataMessage() { - // No-op. - } - - /** - * @param data Data to compress. - */ - public CompressedDataMessage(T data) { - this.data = data; - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - if (chunk == null) { - //chunk = compressedData.nextChunk(); - finalChunk = (chunk == null); - } - - switch (writer.state()) { - case 0: - if (!writer.writeBoolean(finalChunk)) - return false; - - writer.incrementState(); - - if (finalChunk) - return true; - - case 1: - if (!writer.writeByteArray(chunk)) - return false; - - chunk = null; - - writer.incrementState(); - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - finalChunk = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - chunk = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TYPE_CODE; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java new file mode 100644 index 0000000000000..0ae56ca0410d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import org.apache.ignite.IgniteException; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** TODO */ +public class CompressedMessage implements Message { + /** Type code. */ + public static final short TYPE_CODE = 517; + + /** */ + private static final int CHUNK_SIZE = 10 * 1024 * 1024; + + /** */ + private static final int BUFFER_CAPACITY = 100 * 1024; + + /** */ + private ByteBuffer tmpBuf; + + /** */ + private int dataSize = -1; + + /** */ + private ChunkedByteReader chunkedReader; + + /** */ + private byte[] chunk; + + /** */ + private boolean finalChunk; + + /** */ + public CompressedMessage() { + // No-op. + } + + /** + * @param buf Source buffer with seralized data. + */ + public CompressedMessage(ByteBuffer buf) { + dataSize = buf.position(); + chunkedReader = new ChunkedByteReader(compress(buf), CHUNK_SIZE); + } + + /** TODO */ + public static CompressedMessage empty() { + CompressedMessage msg = new CompressedMessage(); + + msg.dataSize = 0; + msg.finalChunk = true; + msg.chunk = null; + + return msg; + } + + /** */ + public int dataSize() { + return dataSize; + } + + /** TODO */ + public byte[] uncompressed() { + assert finalChunk; + + byte[] uncompress = uncompress(); + + assert uncompress != null; + assert uncompress.length == dataSize : "Expected=" + dataSize + ", actual=" + uncompress.length; + + return uncompress; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType())) + return false; + + writer.onHeaderWritten(); + } + + while (true) { + if (chunk == null && chunkedReader != null) { + chunk = chunkedReader.nextChunk(); + + System.out.println(">>> After next chunk [" + Arrays.toString(chunk) + ']'); + + finalChunk = (chunk == null); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt(dataSize)) + return false; + + System.out.println(">>> WRITE dataSize=" + dataSize); + + writer.incrementState(); + + if (dataSize == 0) + return true; + + case 1: + if (!writer.writeBoolean(finalChunk)) + return false; + + writer.incrementState(); + + if (finalChunk) + return true; + + case 2: + if (!writer.writeByteArray(chunk)) + return false; + + System.out.println(">>> WRITED chunk [length=" + chunk.length + ", chunk=" + Arrays.toString(chunk) + ']'); + + chunk = null; + + writer.decrementState(); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocate(BUFFER_CAPACITY); + + assert chunk == null : chunk; + + while (true) { + switch (reader.state()) { + case 0: + dataSize = reader.readInt(); + + System.out.println(">>> READ dataSize=" + dataSize); + + if (!reader.isLastRead()) + return false; + + if (dataSize == 0) + return true; + + reader.incrementState(); + + case 1: + finalChunk = reader.readBoolean(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + if (finalChunk) + return true; + + case 2: + chunk = reader.readByteArray(); + + if (!reader.isLastRead()) + return false; + + if (chunk != null) { + tmpBuf.put(chunk); + + System.out.println(">>> ADD chunk to tmpBuf [length=" + chunk.length + " , chunk=" + Arrays.toString(chunk) + ']'); + + reader.decrementState(); + + chunk = null; + } + } + } + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TYPE_CODE; + } + + /** + * @param buf Buffer. + */ + private byte[] compress(ByteBuffer buf) { + byte[] data = new byte[buf.position()]; + + buf.flip(); + buf.get(data); + + System.out.println(">>> RAW DATA: length=" + data.length + ", data=" + Arrays.toString(data)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length); + Deflater deflater = new Deflater(Deflater.BEST_SPEED, true); + + try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, deflater)) { + dos.write(data); + dos.finish(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + deflater.end(); + } + + byte[] res = baos.toByteArray(); + + System.out.println(">>> SUCCESS compress [length=" + res.length + ", res=" + Arrays.toString(res) + ']'); + + return res; + } + + /** */ + private byte[] uncompress() { + if (tmpBuf == null) + return null; + + byte[] uncompressedData; + + Inflater inflater = new Inflater(true); + + try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(tmpBuf.array()), inflater)) { + uncompressedData = iis.readAllBytes(); + } + catch (IOException ex) { + throw new IgniteException(ex); + } + finally { + inflater.end(); + } + + System.out.println(">>> SUCCESS uncompress [length=" + uncompressedData.length + ", data=" + Arrays.toString(uncompressedData) + ']'); + + tmpBuf.clear(); + + return uncompressedData; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CompressedMessage{" + + "chunk=" + Arrays.toString(chunk) + + ", tmpBuf=" + tmpBuf + + ", dataSize=" + dataSize + + ", chunkedReader=" + chunkedReader + + ", finalChunk=" + finalChunk + + '}'; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3dace50fd073e..0f31fcfd0f915 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; +//import org.apache.ignite.internal.codegen.CompressedMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -524,7 +525,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer()); factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer()); - factory.register(CompressedDataMessage.TYPE_CODE, CompressedDataMessage::new); + factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new); // new CompressedMessageSerializer() factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b66d6250f3694..b187026aca5c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -272,6 +272,7 @@ public void partitionSizesMap(Map partsSizes) { * @return Partitions history reservation counters. */ public Map partitionHistoryCountersMap() { + System.out.println(">>> partitionHistoryCountersMap size=" + (partHistCntrs != null ? partHistCntrs.size() : null)); return partHistCntrs; } diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index 5bc3e93fab18f..a59449eac8237 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -199,18 +200,7 @@ public default void setBuffer(ByteBuffer buf) { * @param Type of the message. * @return Message. */ - public default T readMessage() { - return readMessage(false); - } - - /** - * Reads nested message. - * - * @param compress Whether the message is compressed. - * @param Type of the message. - * @return Message. - */ - public T readMessage(boolean compress); + public T readMessage(); /** * Reads {@link CacheObject}. @@ -282,8 +272,8 @@ public default T readMessage() { * @param keyType Map key type. * @param valType Map value type. * @param linked Whether {@link LinkedHashMap} should be created. - * @param compress Whether the map is compressed. - * @param Type of the red map. + * @param compress Whether map should be compressed. + * @param Type of the read map. * @return Map. */ // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter @@ -311,6 +301,11 @@ public default T readMessage() { */ public void incrementState(); + /** + * Deccrements read state. + */ + public void decrementState(); + /** * Callback called before inner message is read. */ diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index ad746b094f8ec..f2f34ec2ec571 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -247,18 +248,7 @@ public default void setBuffer(ByteBuffer buf) { * @param val Message. * @return Whether value was fully written. */ - public default boolean writeMessage(Message val) { - return writeMessage(val, false); - } - - /** - * Writes nested message. - * - * @param val Message. - * @param compress Whether to compress message. - * @return Whether value was fully written. - */ - public boolean writeMessage(Message val, boolean compress); + public boolean writeMessage(Message val); /** * Writes {@link CacheObject}. @@ -335,7 +325,7 @@ public default boolean writeMap(Map map, MessageCollectionItemType * @param map Map. * @param keyType Map key type. * @param valType Map value type. - * @param compress Whether to compress map. + * @param compress Whether map need to compress. * @param Initial key types of the map to write. * @param Initial value types of the map to write. * @return Whether value was fully written. @@ -365,6 +355,11 @@ public boolean writeMap(Map map, MessageCollectionItemType keyType, */ public void incrementState(); + /** + * Decrements state. + */ + public void decrementState(); + /** * Callback called before inner message is written. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index 8236a4479ba51..56d56afe01ebc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -286,7 +286,7 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeMessage(Message val, boolean compress) { + @Override public boolean writeMessage(Message val) { return writeField(Message.class); } @@ -329,6 +329,11 @@ private boolean writeField(Class type) { ++state; } + /** {@inheritDoc} */ + @Override public void decrementState() { + --state; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageWrite() {} @@ -518,7 +523,7 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T readMessage(boolean compress) { + @Override public T readMessage() { readField(Message.class); return null; @@ -594,6 +599,11 @@ private void readField(Class type) { ++state; } + /** {@inheritDoc} */ + @Override public void decrementState() { + --state; + } + /** {@inheritDoc} */ @Override public void beforeInnerMessageRead() {} From c2ad3fd6be63ba2c877fdf4d56776a4c9921c463 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 14 Jan 2026 23:55:24 +0500 Subject: [PATCH 4/6] ITS WORK --- .../internal/MessageSerializerGenerator.java | 4 +-- .../internal/direct/DirectMessageReader.java | 3 -- .../internal/direct/DirectMessageWriter.java | 28 +++++++++++++++---- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 19160d5c83152..c0f84275f83da 100644 --- a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -395,7 +395,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { if (compress) - returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); // "true" + returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr, "true"); else returnFalseIfWriteFailed(write, "writer.writeMessage", getExpr); } @@ -616,7 +616,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { if (compress) - returnFalseIfReadFailed(name, "reader.readMessage"); // "true" + returnFalseIfReadFailed(name, "reader.readMessage", "true"); else returnFalseIfReadFailed(name, "reader.readMessage"); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index f810d5a18191c..1ff93f8a01337 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -69,9 +69,6 @@ public class DirectMessageReader implements MessageReader { /** */ private ByteBuffer tmpBuf; - /** */ - private boolean uncompressFinished; - /** * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index f18cdc2ce60af..2b3057458cb7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -45,6 +45,9 @@ * Message writer implementation. */ public class DirectMessageWriter implements MessageWriter { + /** */ + private static final int TMP_BUF_CAPACITY = 1024 * 100; + /** State. */ @GridToStringInclude private final DirectMessageState state; @@ -55,6 +58,12 @@ public class DirectMessageWriter implements MessageWriter { /** Buffer for writing. */ private ByteBuffer buf; + /** */ + private DirectMessageWriter tmpWriter; + + /** */ + private ByteBuffer tmpBuf; + /** */ public DirectMessageWriter(final MessageFactory msgFactory) { this.msgFactory = msgFactory; @@ -363,17 +372,24 @@ public ByteBuffer getBuffer() { DirectByteBufferStream stream = state.item().stream; if (compress) { - DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory); + if (tmpWriter == null) + tmpWriter = new DirectMessageWriter(msgFactory); - tmpWriter.setBuffer(ByteBuffer.allocateDirect(600_000)); + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); - DirectByteBufferStream tmpStream = tmpWriter.state.item().stream; + tmpWriter.setBuffer(tmpBuf); - tmpStream.writeMap(map, keyType, valType, tmpWriter); + tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter); - CompressedMessage compressedMsg = map != null ? new CompressedMessage(tmpWriter.getBuffer()) : CompressedMessage.empty(); + CompressedMessage msg = map != null ? new CompressedMessage(tmpWriter.getBuffer()) : CompressedMessage.empty(); - stream.writeMessage(compressedMsg, this); + stream.writeMessage(msg, this); + + if (stream.lastFinished()) { + tmpWriter = null; + tmpBuf = null; + } } else stream.writeMap(map, keyType, valType, this); From e07f4ec7c0b48484dd62e1dd67363136bf3d1699 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Thu, 15 Jan 2026 00:11:30 +0500 Subject: [PATCH 5/6] add support for messages --- .../internal/direct/DirectMessageReader.java | 152 +++++++++--------- .../internal/direct/DirectMessageWriter.java | 59 ++++--- .../communication/ChunkedByteReader.java | 52 ------ .../communication/CompressedMessage.java | 72 +++++---- .../GridDhtPartitionsSingleMessage.java | 1 - .../communication/MessageReader.java | 14 +- .../communication/MessageWriter.java | 16 +- .../AbstractMessageSerializationTest.java | 4 +- 8 files changed, 182 insertions(+), 188 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 1ff93f8a01337..559c199d1faec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -18,12 +18,13 @@ package org.apache.ignite.internal.direct; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -47,6 +48,9 @@ * Message reader implementation. */ public class DirectMessageReader implements MessageReader { + /** */ + private static final int TMP_BUF_CAPACITY = 1024 * 100; + /** State. */ @GridToStringInclude private final DirectMessageState state; @@ -332,12 +336,21 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Nullable @Override public T readMessage() { + @Nullable @Override public T readMessage(boolean compress) { DirectByteBufferStream stream = state.item().stream; - T msg = stream.readMessage(this); + T msg; - lastRead = stream.lastFinished(); + if (compress) + msg = readCompressedMessageAndDeserialize( + stream, + () -> tmpReader.state.item().stream.readMessage(tmpReader) + ); + else { + msg = stream.readMessage(this); + + lastRead = stream.lastFinished(); + } return msg; } @@ -415,80 +428,11 @@ public ByteBuffer getBuffer() { M map; - if (compress) { - System.out.println(">>> BEFORE READ"); - - Message msg = stream.readMessage(this); - - System.out.println(">>> AFTER READ [msg=" + msg + ']'); - - if (msg == null) { - lastRead = stream.lastFinished(); - - return null; - } - - assert msg instanceof CompressedMessage : msg; - - CompressedMessage msg0 = (CompressedMessage)msg; - - if (msg0.dataSize() == 0) { - lastRead = true; - - return null; - } - - byte[] uncompressedBytes = msg0.uncompressed(); - - byte[] res; - - int pos = buf.position(); - - if (buf.hasRemaining()) { - byte[] remaining = new byte[buf.remaining()]; - - System.out.println(">>> remaining: " + Arrays.toString(remaining)); - - buf.get(remaining); - buf.position(pos); - - res = new byte[uncompressedBytes.length + remaining.length]; - - System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); - System.arraycopy(remaining, 0, res, uncompressedBytes.length, remaining.length); - } - else { - res = new byte[uncompressedBytes.length]; - - System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); - } - - res = new byte[uncompressedBytes.length]; - - System.arraycopy(uncompressedBytes, 0, res, 0, uncompressedBytes.length); - - if (tmpReader == null) - tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); - - if (tmpBuf == null) - tmpBuf = ByteBuffer.allocateDirect(600_000); - - int startPos = tmpBuf.position(); - - tmpBuf.put(res); - tmpBuf.position(startPos); - - tmpReader.setBuffer(tmpBuf); - - map = tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader); - - lastRead = tmpReader.state.item().stream.lastFinished(); - - if (lastRead) { - tmpReader = null; - tmpBuf = null; - } - } + if (compress) + map = readCompressedMessageAndDeserialize( + stream, + () -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader) + ); else { map = stream.readMap(keyType, valType, linked, this); @@ -540,6 +484,58 @@ public ByteBuffer getBuffer() { return S.toString(DirectMessageReader.class, this); } + /** */ + private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Callable callable) { + Message msg = stream.readMessage(this); + + if (msg == null) { + lastRead = stream.lastFinished(); + + return null; + } + + assert msg instanceof CompressedMessage : msg; + + CompressedMessage msg0 = (CompressedMessage)msg; + + if (msg0.dataSize() == 0) { + lastRead = true; + + return null; + } + + if (tmpReader == null) + tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); + + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + + int startPos = tmpBuf.position(); + + tmpBuf.put(msg0.uncompressed()); + tmpBuf.position(startPos); + + tmpReader.setBuffer(tmpBuf); + + T res; + + try { + res = callable.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + + lastRead = tmpReader.state.item().stream.lastFinished(); + + if (lastRead) { + tmpReader = null; + tmpBuf = null; + } + + return res; + } + /** */ private static class StateItem implements DirectMessageStateItem { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 2b3057458cb7f..28f1ca0419707 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -308,10 +308,17 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeMessage(@Nullable Message msg) { + @Override public boolean writeMessage(@Nullable Message msg, boolean compress) { DirectByteBufferStream stream = state.item().stream; - stream.writeMessage(msg, this); + if (compress) + writeCompressedMessage( + () -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter), + msg == null, + stream + ); + else + stream.writeMessage(msg, this); return stream.lastFinished(); } @@ -371,26 +378,12 @@ public ByteBuffer getBuffer() { MessageCollectionItemType valType, boolean compress) { DirectByteBufferStream stream = state.item().stream; - if (compress) { - if (tmpWriter == null) - tmpWriter = new DirectMessageWriter(msgFactory); - - if (tmpBuf == null) - tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); - - tmpWriter.setBuffer(tmpBuf); - - tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter); - - CompressedMessage msg = map != null ? new CompressedMessage(tmpWriter.getBuffer()) : CompressedMessage.empty(); - - stream.writeMessage(msg, this); - - if (stream.lastFinished()) { - tmpWriter = null; - tmpBuf = null; - } - } + if (compress) + writeCompressedMessage( + () -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter), + map == null, + stream + ); else stream.writeMap(map, keyType, valType, this); @@ -444,6 +437,28 @@ public ByteBuffer getBuffer() { return S.toString(DirectMessageWriter.class, this); } + /** */ + private void writeCompressedMessage(Runnable task, boolean empty, DirectByteBufferStream stream) { + if (tmpWriter == null) + tmpWriter = new DirectMessageWriter(msgFactory); + + if (tmpBuf == null) + tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + + tmpWriter.setBuffer(tmpBuf); + + task.run(); + + CompressedMessage msg = empty ? CompressedMessage.empty() : new CompressedMessage(tmpWriter.getBuffer()); + + stream.writeMessage(msg, this); + + if (stream.lastFinished()) { + tmpWriter = null; + tmpBuf = null; + } + } + /** */ private static class StateItem implements DirectMessageStateItem { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java deleted file mode 100644 index a039fefd5a516..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkedByteReader.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -/** TODO */ -public class ChunkedByteReader { - /** */ - private final byte[] inputData; - - /** */ - private final int chunkSize; - - /** */ - private int position; - - /** */ - public ChunkedByteReader(byte[] inputData, int chunkSize) { - this.inputData = inputData; - this.chunkSize = chunkSize; - } - - /** TODO */ - public byte[] nextChunk() { - if (position >= inputData.length) - return null; - - int curChunkSize = Math.min(inputData.length - position, chunkSize); - - byte[] chunk = new byte[curChunkSize]; - - System.arraycopy(inputData, position, chunk, 0, curChunkSize); - - position += curChunkSize; - - return chunk; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java index 0ae56ca0410d6..110437ccb69bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java @@ -31,7 +31,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -/** TODO */ +/** */ public class CompressedMessage implements Message { /** Type code. */ public static final short TYPE_CODE = 517; @@ -70,7 +70,7 @@ public CompressedMessage(ByteBuffer buf) { chunkedReader = new ChunkedByteReader(compress(buf), CHUNK_SIZE); } - /** TODO */ + /** */ public static CompressedMessage empty() { CompressedMessage msg = new CompressedMessage(); @@ -86,7 +86,7 @@ public int dataSize() { return dataSize; } - /** TODO */ + /** */ public byte[] uncompressed() { assert finalChunk; @@ -114,8 +114,6 @@ public byte[] uncompressed() { if (chunk == null && chunkedReader != null) { chunk = chunkedReader.nextChunk(); - System.out.println(">>> After next chunk [" + Arrays.toString(chunk) + ']'); - finalChunk = (chunk == null); } @@ -124,8 +122,6 @@ public byte[] uncompressed() { if (!writer.writeInt(dataSize)) return false; - System.out.println(">>> WRITE dataSize=" + dataSize); - writer.incrementState(); if (dataSize == 0) @@ -144,8 +140,6 @@ public byte[] uncompressed() { if (!writer.writeByteArray(chunk)) return false; - System.out.println(">>> WRITED chunk [length=" + chunk.length + ", chunk=" + Arrays.toString(chunk) + ']'); - chunk = null; writer.decrementState(); @@ -168,8 +162,6 @@ public byte[] uncompressed() { case 0: dataSize = reader.readInt(); - System.out.println(">>> READ dataSize=" + dataSize); - if (!reader.isLastRead()) return false; @@ -198,8 +190,6 @@ public byte[] uncompressed() { if (chunk != null) { tmpBuf.put(chunk); - System.out.println(">>> ADD chunk to tmpBuf [length=" + chunk.length + " , chunk=" + Arrays.toString(chunk) + ']'); - reader.decrementState(); chunk = null; @@ -222,8 +212,6 @@ private byte[] compress(ByteBuffer buf) { buf.flip(); buf.get(data); - System.out.println(">>> RAW DATA: length=" + data.length + ", data=" + Arrays.toString(data)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length); Deflater deflater = new Deflater(Deflater.BEST_SPEED, true); @@ -238,11 +226,7 @@ private byte[] compress(ByteBuffer buf) { deflater.end(); } - byte[] res = baos.toByteArray(); - - System.out.println(">>> SUCCESS compress [length=" + res.length + ", res=" + Arrays.toString(res) + ']'); - - return res; + return baos.toByteArray(); } /** */ @@ -264,8 +248,6 @@ private byte[] uncompress() { inflater.end(); } - System.out.println(">>> SUCCESS uncompress [length=" + uncompressedData.length + ", data=" + Arrays.toString(uncompressedData) + ']'); - tmpBuf.clear(); return uncompressedData; @@ -274,11 +256,45 @@ private byte[] uncompress() { /** {@inheritDoc} */ @Override public String toString() { return "CompressedMessage{" + - "chunk=" + Arrays.toString(chunk) + - ", tmpBuf=" + tmpBuf + - ", dataSize=" + dataSize + - ", chunkedReader=" + chunkedReader + - ", finalChunk=" + finalChunk + - '}'; + "chunk=" + Arrays.toString(chunk) + + ", tmpBuf=" + tmpBuf + + ", dataSize=" + dataSize + + ", chunkedReader=" + chunkedReader + + ", finalChunk=" + finalChunk + + '}'; + } + + /** */ + private static class ChunkedByteReader { + /** */ + private final byte[] inputData; + + /** */ + private final int chunkSize; + + /** */ + private int position; + + /** */ + ChunkedByteReader(byte[] inputData, int chunkSize) { + this.inputData = inputData; + this.chunkSize = chunkSize; + } + + /** */ + byte[] nextChunk() { + if (position >= inputData.length) + return null; + + int curChunkSize = Math.min(inputData.length - position, chunkSize); + + byte[] chunk = new byte[curChunkSize]; + + System.arraycopy(inputData, position, chunk, 0, curChunkSize); + + position += curChunkSize; + + return chunk; + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b187026aca5c0..b66d6250f3694 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -272,7 +272,6 @@ public void partitionSizesMap(Map partsSizes) { * @return Partitions history reservation counters. */ public Map partitionHistoryCountersMap() { - System.out.println(">>> partitionHistoryCountersMap size=" + (partHistCntrs != null ? partHistCntrs.size() : null)); return partHistCntrs; } diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index a59449eac8237..7fcb6545a3754 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; - import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -200,7 +199,18 @@ public default void setBuffer(ByteBuffer buf) { * @param Type of the message. * @return Message. */ - public T readMessage(); + public default T readMessage() { + return readMessage(false); + } + + /** + * Reads nested message. + * + * @param compress Whether message should be decompressed. + * @param Type of the message. + * @return Message. + */ + public T readMessage(boolean compress); /** * Reads {@link CacheObject}. diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index f2f34ec2ec571..e073ca8313423 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; - import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -248,7 +247,18 @@ public default void setBuffer(ByteBuffer buf) { * @param val Message. * @return Whether value was fully written. */ - public boolean writeMessage(Message val); + public default boolean writeMessage(Message val) { + return writeMessage(val, false); + } + + /** + * Writes nested message. + * + * @param val Message. + * @param compress Whether message should be compressed. + * @return Whether value was fully written. + */ + public boolean writeMessage(Message val, boolean compress); /** * Writes {@link CacheObject}. @@ -325,7 +335,7 @@ public default boolean writeMap(Map map, MessageCollectionItemType * @param map Map. * @param keyType Map key type. * @param valType Map value type. - * @param compress Whether map need to compress. + * @param compress Whether map should be compressed. * @param Initial key types of the map to write. * @param Initial value types of the map to write. * @return Whether value was fully written. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index 56d56afe01ebc..317ff20a961ec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -286,7 +286,7 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeMessage(Message val) { + @Override public boolean writeMessage(Message val, boolean compress) { return writeField(Message.class); } @@ -523,7 +523,7 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T readMessage() { + @Override public T readMessage(boolean compress) { readField(Message.class); return null; From 6e7b189aaec41d9203c120e407b637c1258bc754 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Thu, 15 Jan 2026 13:11:23 +0500 Subject: [PATCH 6/6] fix tests --- .../internal/direct/DirectMessageReader.java | 54 +++++-------------- .../internal/direct/DirectMessageWriter.java | 47 +++++++++------- .../communication/CompressedMessage.java | 14 +++-- .../communication/GridIoMessageFactory.java | 3 +- ...CommunicationMessageSerializationTest.java | 3 ++ 5 files changed, 55 insertions(+), 66 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 559c199d1faec..3aca6db961537 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -23,8 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteException; +import java.util.function.Function; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -48,9 +47,6 @@ * Message reader implementation. */ public class DirectMessageReader implements MessageReader { - /** */ - private static final int TMP_BUF_CAPACITY = 1024 * 100; - /** State. */ @GridToStringInclude private final DirectMessageState state; @@ -67,12 +63,6 @@ public class DirectMessageReader implements MessageReader { /** Whether last field was fully read. */ private boolean lastRead; - /** */ - private DirectMessageReader tmpReader; - - /** */ - private ByteBuffer tmpBuf; - /** * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. @@ -344,7 +334,7 @@ public ByteBuffer getBuffer() { if (compress) msg = readCompressedMessageAndDeserialize( stream, - () -> tmpReader.state.item().stream.readMessage(tmpReader) + tmpReader -> tmpReader.state.item().stream.readMessage(tmpReader) ); else { msg = stream.readMessage(this); @@ -431,7 +421,7 @@ public ByteBuffer getBuffer() { if (compress) map = readCompressedMessageAndDeserialize( stream, - () -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader) + tmpReader -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader) ); else { map = stream.readMap(keyType, valType, linked, this); @@ -485,54 +475,36 @@ public ByteBuffer getBuffer() { } /** */ - private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Callable callable) { + private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Function fun) { Message msg = stream.readMessage(this); - if (msg == null) { - lastRead = stream.lastFinished(); + lastRead = stream.lastFinished(); + if (!lastRead || msg == null) return null; - } assert msg instanceof CompressedMessage : msg; CompressedMessage msg0 = (CompressedMessage)msg; - if (msg0.dataSize() == 0) { - lastRead = true; - + if (msg0.dataSize() == 0) return null; - } - if (tmpReader == null) - tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); + byte[] uncompressed = msg0.uncompressed(); - if (tmpBuf == null) - tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + ByteBuffer tmpBuf = ByteBuffer.allocateDirect(uncompressed.length); - int startPos = tmpBuf.position(); + tmpBuf.put(uncompressed); + tmpBuf.flip(); - tmpBuf.put(msg0.uncompressed()); - tmpBuf.position(startPos); + DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); tmpReader.setBuffer(tmpBuf); - T res; - - try { - res = callable.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } + T res = fun.apply(tmpReader); lastRead = tmpReader.state.item().stream.lastFinished(); - if (lastRead) { - tmpReader = null; - tmpBuf = null; - } - return res; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 28f1ca0419707..4c30538a76cf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -46,7 +47,7 @@ */ public class DirectMessageWriter implements MessageWriter { /** */ - private static final int TMP_BUF_CAPACITY = 1024 * 100; + private static final int TMP_BUF_CAPACITY = 1024 * 1024; /** State. */ @GridToStringInclude @@ -58,12 +59,6 @@ public class DirectMessageWriter implements MessageWriter { /** Buffer for writing. */ private ByteBuffer buf; - /** */ - private DirectMessageWriter tmpWriter; - - /** */ - private ByteBuffer tmpBuf; - /** */ public DirectMessageWriter(final MessageFactory msgFactory) { this.msgFactory = msgFactory; @@ -313,7 +308,7 @@ public ByteBuffer getBuffer() { if (compress) writeCompressedMessage( - () -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter), + tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg, tmpWriter), msg == null, stream ); @@ -380,7 +375,7 @@ public ByteBuffer getBuffer() { if (compress) writeCompressedMessage( - () -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter), + tmpWriter -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter), map == null, stream ); @@ -438,25 +433,37 @@ public ByteBuffer getBuffer() { } /** */ - private void writeCompressedMessage(Runnable task, boolean empty, DirectByteBufferStream stream) { - if (tmpWriter == null) - tmpWriter = new DirectMessageWriter(msgFactory); + private void writeCompressedMessage(Consumer consumer, boolean empty, DirectByteBufferStream stream) { + if (empty) { + stream.writeMessage(CompressedMessage.empty(), this); + + return; + } - if (tmpBuf == null) - tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); + + DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory); tmpWriter.setBuffer(tmpBuf); - task.run(); + boolean finished; + + do { + if (!tmpBuf.hasRemaining()) { + tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2); - CompressedMessage msg = empty ? CompressedMessage.empty() : new CompressedMessage(tmpWriter.getBuffer()); + tmpWriter.setBuffer(tmpBuf); - stream.writeMessage(msg, this); + tmpWriter.reset(); + } - if (stream.lastFinished()) { - tmpWriter = null; - tmpBuf = null; + consumer.accept(tmpWriter); + + finished = tmpWriter.state.item().stream.lastFinished(); } + while (!finished); + + stream.writeMessage(new CompressedMessage(tmpBuf), this); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java index 110437ccb69bb..f8b70b8f0cbb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java @@ -37,10 +37,10 @@ public class CompressedMessage implements Message { public static final short TYPE_CODE = 517; /** */ - private static final int CHUNK_SIZE = 10 * 1024 * 1024; + private static final int CHUNK_SIZE = 1024 * 1024; /** */ - private static final int BUFFER_CAPACITY = 100 * 1024; + private static final int BUFFER_CAPACITY = 10 * 1024 * 1024; /** */ private ByteBuffer tmpBuf; @@ -188,6 +188,14 @@ public byte[] uncompressed() { return false; if (chunk != null) { + if (tmpBuf.remaining() <= CHUNK_SIZE) { + ByteBuffer newTmpBuf = ByteBuffer.allocate(tmpBuf.capacity() * 2); + + newTmpBuf.put(tmpBuf); + + tmpBuf = newTmpBuf; + } + tmpBuf.put(chunk); reader.decrementState(); @@ -248,7 +256,7 @@ private byte[] uncompress() { inflater.end(); } - tmpBuf.clear(); + tmpBuf = null; return uncompressedData; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 0f31fcfd0f915..0dfd249e541cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; -//import org.apache.ignite.internal.codegen.CompressedMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -525,7 +524,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer()); factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer()); - factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new); // new CompressedMessageSerializer() + factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new); factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java index c5d6afb5bc63c..782e156f35ce4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java @@ -38,6 +38,9 @@ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessa if (msg instanceof NodeIdMessage) FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true); + if (msg instanceof CompressedMessage) + FieldUtils.writeField(msg, "dataSize", 0, true); + return msg; }