diff --git a/jpos/src/main/java/org/jpos/iso/ISODatasetField.java b/jpos/src/main/java/org/jpos/iso/ISODatasetField.java index 558c087b6f..aef353e002 100644 --- a/jpos/src/main/java/org/jpos/iso/ISODatasetField.java +++ b/jpos/src/main/java/org/jpos/iso/ISODatasetField.java @@ -51,33 +51,58 @@ public ISODatasetField(int fieldNumber) { /** * Appends a dataset to this field. + *

+ * Synchronizes on {@code datasets} to prevent concurrent modification while + * {@link #dump(PrintStream, String)} iterates the collection under the same lock. * * @param dataset dataset to add */ public void addDataset(Dataset dataset) { - datasets.add(dataset); + synchronized (datasets) { + datasets.add(dataset); + } } /** * Removes a dataset instance from this field. + *

+ * Synchronizes on {@code datasets} for the same reason as {@link #addDataset(Dataset)}. * * @param dataset dataset to remove */ public void removeDataset(Dataset dataset) { - datasets.remove(dataset); + synchronized (datasets) { + datasets.remove(dataset); + } } /** * Indicates whether this field still contains datasets. + *

+ * Synchronizes on {@code datasets} to ensure a consistent view — without the lock, + * a thread could read {@code isEmpty()} between another thread's {@code remove()} + * and a concurrent {@link #dump(PrintStream, String)} snapshot. * * @return {@code true} when at least one dataset is present */ public boolean hasDatasets() { - return !datasets.isEmpty(); + synchronized (datasets) { + return !datasets.isEmpty(); + } } /** * Returns all datasets in insertion order. + *

+ * The returned list is unmodifiable but reflects the current state of the internal + * ArrayList at the time of the call — concurrent mutations during iteration may + * throw {@code ConcurrentModificationException}. Use this method only from a single + * thread or when external synchronization covers both read and write access. + *

+ * The Dataset objects contained in this list are live references, not copies. + * Modifying a dataset after retrieval (e.g. adding/removing elements) while other + * threads may be reading it can result in inconsistent views. Callers must ensure + * external synchronization if datasets are mutated concurrently. * * @return immutable list of datasets */ @@ -87,37 +112,65 @@ public List getDatasets() { /** * Returns all datasets that match the supplied identifier. + *

+ * Iterates under the {@code datasets} lock to ensure a consistent snapshot during + * matching, preventing {@code ConcurrentModificationException} when called concurrently + * with {@link #addDataset(Dataset)} or {@link #removeDataset(Dataset)}. + *

+ * The Dataset objects contained in this list are live references, not copies. + * Modifying a dataset after retrieval (e.g. adding/removing elements) while other + * threads may be reading it can result in inconsistent views. Callers must ensure + * external synchronization if datasets are mutated concurrently. + *

+ * Returns {@link Collections#emptyList()} when no matches are found, avoiding an + * unnecessary allocation for the common zero-match case. * * @param identifier dataset identifier * @return immutable list of matching datasets */ public List getDatasets(int identifier) { - List matches = new ArrayList<>(); - for (Dataset dataset : datasets) { - if (dataset.getIdentifier() == identifier) { - matches.add(dataset); + synchronized (datasets) { + List matches = new ArrayList<>(); + for (Dataset dataset : datasets) { + if (dataset.getIdentifier() == identifier) { + matches.add(dataset); + } } + return matches.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(matches); } - return Collections.unmodifiableList(matches); } /** * Returns the first dataset that matches the supplied identifier. + *

+ * Iterates under the {@code datasets} lock to prevent concurrent modification while + * traversing the list. + *

+ * The returned Dataset is a live reference, not a copy. Modifying it after retrieval + * (e.g. adding/removing elements) while other threads may be reading it can result in + * inconsistent views. Callers must ensure external synchronization if the dataset is + * mutated concurrently with reads from other threads (e.g., during dump or iteration). * * @param identifier dataset identifier * @return matching dataset or {@code null} */ public Dataset getDataset(int identifier) { - for (Dataset dataset : datasets) { - if (dataset.getIdentifier() == identifier) { - return dataset; + synchronized (datasets) { + for (Dataset dataset : datasets) { + if (dataset.getIdentifier() == identifier) { + return dataset; + } } + return null; } - return null; } /** * Returns the component stored under the given dataset and element identifiers. + *

+ * The returned ISOComponent is a live reference from within the dataset, not a copy. + * Modifying it while other threads may be reading it can result in inconsistent views. + * Callers must ensure external synchronization if components are mutated concurrently. * * @param datasetId dataset identifier * @param elementId element identifier @@ -134,6 +187,9 @@ public ISOComponent get(int datasetId, int elementId) { /** * Returns the logical value stored under the given dataset and element identifiers. + *

+ * Equivalent to calling {@link #get(int, int)} and then {@link ISOComponent#getValue()}. + * See {@link #get(int, int)} for thread-safety details on live component references. * * @param datasetId dataset identifier * @param elementId element identifier @@ -147,6 +203,9 @@ public Object getValue(int datasetId, int elementId) throws ISOException { /** * Returns the bytes stored under the given dataset and element identifiers. + *

+ * Equivalent to calling {@link #get(int, int)} and then {@link ISOComponent#getBytes()}. + * See {@link #get(int, int)} for thread-safety details on live component references. * * @param datasetId dataset identifier * @param elementId element identifier @@ -180,6 +239,10 @@ public Object getKey() { /** * Returns the datasets carried by this field. + *

+ * Equivalent to {@link #getDatasets()}. See that method's javadoc for details on thread-safety: + * the returned list contains live Dataset references, not copies. Callers must ensure external + * synchronization if datasets are mutated concurrently with reads from other threads. * * @return dataset list */ @@ -222,24 +285,30 @@ public int getFieldNumber() { /** * Replaces the datasets held by this field. + *

+ * Synchronizes on {@code datasets} to ensure the clear-and-replace sequence is atomic + * relative to concurrent reads in {@link #dump(PrintStream, String)}, {@link #getDataset(int)}, + * and other methods that iterate or read from the list. * * @param obj either a {@link Dataset} or a {@link java.util.List} of datasets * @throws ISOException when the supplied value type is unsupported */ @Override public void setValue(Object obj) throws ISOException { - datasets.clear(); - if (obj instanceof Dataset) { - datasets.add((Dataset) obj); - } else if (obj instanceof List) { - for (Object item : (List) obj) { - if (!(item instanceof Dataset)) { - throw new ISOException("Invalid dataset list entry " + item); + synchronized (datasets) { + datasets.clear(); + if (obj instanceof Dataset) { + datasets.add((Dataset) obj); + } else if (obj instanceof List) { + for (Object item : (List) obj) { + if (!(item instanceof Dataset)) { + throw new ISOException("Invalid dataset list entry " + item); + } + datasets.add((Dataset) item); } - datasets.add((Dataset) item); + } else if (obj != null) { + throw new ISOException("Unsupported dataset field value " + obj.getClass().getName()); } - } else if (obj != null) { - throw new ISOException("Unsupported dataset field value " + obj.getClass().getName()); } } @@ -287,7 +356,11 @@ public void unpack(InputStream in) throws ISOException { public void dump(PrintStream p, String indent) { p.println(indent + "<" + XMLPackager.ISOFIELD_TAG + " " + XMLPackager.ID_ATTR + "=\"" + fieldNumber + "\" type=\"dataset\">"); String innerIndent = indent + " "; - for (Dataset dataset : datasets) { + List snapshot; + synchronized (datasets) { + snapshot = new ArrayList<>(datasets); + } + for (Dataset dataset : snapshot) { p.println(innerIndent + ""); String datasetIndent = innerIndent + " "; for (DatasetElement element : dataset.getElements()) { diff --git a/jpos/src/main/java/org/jpos/iso/ISOMsg.java b/jpos/src/main/java/org/jpos/iso/ISOMsg.java index eca8740836..6cb063cad7 100644 --- a/jpos/src/main/java/org/jpos/iso/ISOMsg.java +++ b/jpos/src/main/java/org/jpos/iso/ISOMsg.java @@ -39,10 +39,16 @@ public class ISOMsg extends ISOComponent implements Cloneable, Loggeable, Externalizable { - /** Map of field number to field value. */ + /** Map of field number to field value. All mutations and iterations must synchronize on this object. */ protected Map fields; - /** Highest field number currently set in this message. */ - protected int maxField; + /** + * Highest field number currently set in this message. + *

+ * Declared {@code volatile} so that any thread reading it (even indirectly via + * {@link #recalcMaxField()}) sees the latest value written under {@code synchronized(fields)}. + * This prevents stale reads when concurrent threads modify {@code fields}. + */ + protected volatile int maxField; /** The packager used to pack/unpack this message. */ protected ISOPackager packager; /** Dirty flags for tracking state changes. */ @@ -190,14 +196,42 @@ public boolean isOutgoing() { } /** * Returns the highest field number present in this message. + *

+ * Synchronizes on {@code fields} to ensure a consistent view of both the + * {@code maxFieldDirty} flag and the {@code maxField} value. If the dirty + * flag is set, {@link #recalcMaxField()} walks the map keys to recompute + * the maximum. This method must use the same lock as {@link #set(ISOComponent)}, + * {@link #unset(int)}, and {@link #recalcBitMap()} to prevent concurrent + * modification and stale reads of {@code maxField}. + * * @return the max field number */ @Override public int getMaxField() { - if (maxFieldDirty) - recalcMaxField(); - return maxField; + synchronized (fields) { + if (maxFieldDirty) + recalcMaxField(); + return maxField; + } + } + + /** + * Returns a snapshot of field numbers as a new TreeSet. + * Safe for concurrent iteration during dump operations. + * TreeSet maintains sorted order required for XMLPackager. + * @return new TreeSet containing current field numbers + */ + public Set getFieldNumbers() { + synchronized (fields) { + return new TreeSet<>(fields.keySet()); + } } + /** + * Recomputes {@link #maxField} by scanning all keys in the fields map. + *

+ * Must only be called from within a {@code synchronized(fields)} block — + * callers include {@link #getMaxField()} and {@link #recalcBitMap()}. + */ private void recalcMaxField() { maxField = 0; for (Object obj : fields.keySet()) { @@ -227,16 +261,29 @@ public ISOPackager getPackager () { return packager; } /** - * Set a field within this message + * Set a field within this message. + *

+ * Synchronizes on {@code fields} to provide mutual exclusion with all other + * methods that mutate the fields map: {@link #unset(int)}, {@link #recalcBitMap()}, + * {@link #pack()}, and {@link #unpack(byte[])}. This ensures that a thread + * iterating over {@code fields.keySet()} (e.g. during dump() or bitmap recalculation) + * never observes a partially-modified map. + *

+ * Updates {@code maxField} if the new field number exceeds it, and marks the + * message as dirty so that the bitmap will be recalculated on the next pack(). + * * @param c - a component */ public void set (ISOComponent c) throws ISOException { if (c != null) { - Integer i = (Integer) c.getKey(); - fields.put (i, c); - if (i > maxField) - maxField = i; - dirty = true; + synchronized (fields) { + Integer i = (Integer) c.getKey(); + fields.put (i, c); + // Update maxField under the same lock for atomic visibility with map mutation. + if (i > maxField) + maxField = i; + dirty = true; + } } } @@ -460,12 +507,19 @@ public void set(int fldno, byte[] value) { /** * Unset a field if it exists, otherwise ignore. + *

+ * Synchronizes on {@code fields} to ensure mutual exclusion with {@link #set(ISOComponent)}, + * {@link #recalcBitMap()}, {@link #pack()}, and {@link #unpack(byte[])}. The removal from the + * map and the dirty-flag updates are performed atomically under a single lock acquisition. + * * @param fldno - the field number */ @Override public void unset (int fldno) { - if (fields.remove (fldno) != null) - dirty = maxFieldDirty = true; + synchronized (fields) { + if (fields.remove (fldno) != null) + dirty = maxFieldDirty = true; + } } /** @@ -562,21 +616,38 @@ public ISOComponent getComposite() { return this; } /** - * setup BitMap - * @exception ISOException on error + * Recomputes the bitmap by scanning all field numbers and setting corresponding bits. + *

+ * Synchronizes on {@code fields} to prevent concurrent modification while iterating + * the map. This method is called from {@link #pack()} under the same lock, ensuring + * that no writer can modify the map between iteration and bitmap insertion. + *

+ * Calls {@link #set(ISOComponent)} internally with a new {@link ISOBitMap}. + * Because both use the same monitor ({@code fields}), this is reentrant — the JVM + * lock count increments rather than blocking. The inner {@code set()} also sets + * {@code dirty = true}, which we clear at the end to mark completion. + *

+ * If the message is not dirty (no fields have been added/removed since the last + * bitmap calculation), this method returns immediately without work. + * + * @exception ISOException on error during bitmap creation or insertion */ public void recalcBitMap () throws ISOException { - if (!dirty) - return; + synchronized (fields) { + if (!dirty) + return; - int mf = Math.min (getMaxField(), 192); + // Limit scan range to 192 to avoid excessive work on sparse messages with high field numbers. + int mf = Math.min (getMaxField(), 192); - BitSet bmap = new BitSet (mf+62 >>6 <<6); - for (int i=1; i<=mf; i++) - if (fields.get (i) != null) - bmap.set (i); - set (new ISOBitMap (-1, bmap)); - dirty = false; + BitSet bmap = new BitSet (mf+62 >>6 <<6); + for (int i=1; i<=mf; i++) + if (fields.get (i) != null) + bmap.set (i); + // Reentrant: set() acquires the same fields lock, incrementing the monitor count. + set (new ISOBitMap (-1, bmap)); + dirty = false; + } } /** * clone fields @@ -588,35 +659,57 @@ public Map getChildren() { } /** * Packs this message using the configured packager. + *

+ * Synchronizes on {@code fields} (not {@code this}) to ensure mutual exclusion with + * all mutating methods: {@link #set(ISOComponent)}, {@link #unset(int)}, and + * {@link #recalcBitMap()}. This is critical — if pack() used a different lock than + * set()/unset(), concurrent threads could modify the fields map while it is being + * iterated during bitmap recalculation, causing {@code ConcurrentModificationException} + * or silent data corruption. + *

+ * Calls {@link #recalcBitMap()} first to ensure the bitmap reflects the current state + * of all fields before packing begins. + * * @return the packed message * @exception ISOException on packing error */ @Override public byte[] pack() throws ISOException { - synchronized (this) { + synchronized (fields) { recalcBitMap(); return packager.pack(this); } } /** * Unpacks the raw byte array into this message. + *

+ * Synchronizes on {@code fields} to prevent concurrent modification while the + * packager inserts fields during unpacking. Uses the same lock as {@link #set(ISOComponent)}, + * {@link #unset(int)}, and {@link #recalcBitMap()} for consistent mutual exclusion. + * * @param b - raw message * @return consumed bytes * @exception ISOException on unpacking error */ @Override public int unpack(byte[] b) throws ISOException { - synchronized (this) { + synchronized (fields) { return packager.unpack(this, b); } } - /** {@inheritDoc} + /** + * Unpacks from an input stream. + *

+ * Synchronizes on {@code fields} for the same reasons as {@link #unpack(byte[])}. + * Uses the same lock ({@code fields}) as all mutating methods to ensure that no + * writer can modify the map while unpacking is in progress. + * * @throws IOException on I/O failure * @throws ISOException on unpacking error */ @Override public void unpack (InputStream in) throws IOException, ISOException { - synchronized (this) { + synchronized (fields) { packager.unpack(this, in); } } @@ -654,7 +747,7 @@ public void dump (PrintStream p, String indent) { if (header instanceof Loggeable) ((Loggeable) header).dump (p, newIndent); - for (int i : fields.keySet()) { + for (int i : getFieldNumbers()) { //If you want the bitmap dumped in the log, change the condition from (i >= 0) to (i >= -1). if (i >= 0) { if ((c = (ISOComponent) fields.get(i)) != null) diff --git a/jpos/src/main/java/org/jpos/security/CryptographicServiceMessage.java b/jpos/src/main/java/org/jpos/security/CryptographicServiceMessage.java index 16848f0737..eb7498271e 100644 --- a/jpos/src/main/java/org/jpos/security/CryptographicServiceMessage.java +++ b/jpos/src/main/java/org/jpos/security/CryptographicServiceMessage.java @@ -119,7 +119,9 @@ public void addField(String tag, String content) { Objects.requireNonNull(tag, "The tag is required"); Objects.requireNonNull(content, "The content is required"); tag = tag.toUpperCase(); - fields.put(tag, content); + synchronized (fields) { + fields.put(tag, content); + } } /** @@ -191,7 +193,11 @@ public void dump (PrintStream p, String indent) { p.print(indent + ""); - for (String tag : fields.keySet()) { + List snapshot; + synchronized (fields) { + snapshot = new ArrayList<>(fields.keySet()); + } + for (String tag : snapshot) { p.println(inner + ""); } p.println(indent + ""); diff --git a/jpos/src/main/java/org/jpos/security/SecureKeyBlock.java b/jpos/src/main/java/org/jpos/security/SecureKeyBlock.java index 22f363a4a8..45467402c4 100644 --- a/jpos/src/main/java/org/jpos/security/SecureKeyBlock.java +++ b/jpos/src/main/java/org/jpos/security/SecureKeyBlock.java @@ -19,8 +19,10 @@ package org.jpos.security; import java.io.PrintStream; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jpos.iso.ISOUtil; @@ -280,9 +282,13 @@ public void dump(PrintStream p, String indent) { p.println(inner2 + "" + reserved + ""); p.println(inner + ""); - if (!optionalHeaders.isEmpty()) { + List> snapshot; + synchronized (this) { + snapshot = new ArrayList<>(optionalHeaders.entrySet()); + } + if (!snapshot.isEmpty()) { p.println(inner + ""); - for (Entry ent : optionalHeaders.entrySet()) + for (Entry ent : snapshot) p.println(inner2 + ""); p.println(inner + ""); } diff --git a/jpos/src/main/java/org/jpos/security/SecureKeySpec.java b/jpos/src/main/java/org/jpos/security/SecureKeySpec.java index cfe72b2aba..9744bd74b9 100644 --- a/jpos/src/main/java/org/jpos/security/SecureKeySpec.java +++ b/jpos/src/main/java/org/jpos/security/SecureKeySpec.java @@ -24,7 +24,9 @@ import org.jpos.util.Loggeable; import java.io.Serializable; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jpos.iso.ISOUtil; @@ -511,7 +513,11 @@ public void dump(PrintStream p, String indent) { if (!optionalHeaders.isEmpty()) { p.println(inner + ""); String inner2 = inner + " "; - for (Entry ent : optionalHeaders.entrySet()) + List> snapshot; + synchronized (optionalHeaders) { + snapshot = new ArrayList<>(optionalHeaders.entrySet()); + } + for (Entry ent : snapshot) p.println(inner2 + ""); p.println(inner + ""); } diff --git a/jpos/src/main/java/org/jpos/tlv/TLVList.java b/jpos/src/main/java/org/jpos/tlv/TLVList.java index 624f7617fd..5ddfcbb671 100644 --- a/jpos/src/main/java/org/jpos/tlv/TLVList.java +++ b/jpos/src/main/java/org/jpos/tlv/TLVList.java @@ -216,7 +216,9 @@ public void unpack(byte[] buf, int offset) throws IllegalArgumentException public void append(TLVMsg tlv) throws NullPointerException { Objects.requireNonNull(tlv, "TLV message cannot be null"); - tags.add(tlv); + synchronized (tags) { + tags.add(tlv); + } } /** @@ -250,7 +252,9 @@ public TLVList append(int tag, String value) throws IllegalArgumentException { * @param index number */ public void deleteByIndex(int index) { - tags.remove(index); + synchronized (tags) { + tags.remove(index); + } } /** @@ -258,12 +262,14 @@ public void deleteByIndex(int index) { * @param tag id */ public void deleteByTag(int tag) { - List t = new ArrayList<>(); - for (TLVMsg tlv2 : tags) { - if (tlv2.getTag() == tag) - t.add(tlv2); + synchronized (tags) { + List t = new ArrayList<>(); + for (TLVMsg tlv2 : tags) { + if (tlv2.getTag() == tag) + t.add(tlv2); + } + tags.removeAll(t); } - tags.removeAll(t); } /** @@ -536,7 +542,11 @@ public boolean hasTag(int tag) { public void dump(PrintStream p, String indent) { String inner = indent + " "; p.println(indent + ""); - for (TLVMsg msg : getTags()) + List snapshot; + synchronized (tags) { + snapshot = new ArrayList<>(tags); + } + for (TLVMsg msg : snapshot) msg.dump(p, inner); p.println(indent + ""); } diff --git a/jpos/src/main/java/org/jpos/util/FSDMsg.java b/jpos/src/main/java/org/jpos/util/FSDMsg.java index fa872b3934..8b0e38debe 100644 --- a/jpos/src/main/java/org/jpos/util/FSDMsg.java +++ b/jpos/src/main/java/org/jpos/util/FSDMsg.java @@ -27,11 +27,13 @@ import java.io.PrintStream; import java.net.URL; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -652,10 +654,12 @@ protected String readField (InputStreamReader r, String fieldName, int len, * @param value the field value, or null to remove */ public void set (String name, String value) { - if (value != null) - fields.put (name, value); - else - fields.remove (name); + synchronized (fields) { + if (value != null) + fields.put (name, value); + else + fields.remove (name); + } } /** * Sets the binary header bytes for this message. @@ -894,7 +898,11 @@ public void dump (PrintStream p, String indent) { if (header != null) { append (p, "header", getHexHeader(), inner); } - for (String f :fields.keySet()) + List snapshot; + synchronized (fields) { + snapshot = new ArrayList<>(fields.keySet()); + } + for (String f : snapshot) append (p, f, fields.get (f), inner); p.println (indent + ""); } diff --git a/jpos/src/main/java/org/jpos/util/SimpleMsg.java b/jpos/src/main/java/org/jpos/util/SimpleMsg.java index 124cd22c00..970ebc6cd5 100644 --- a/jpos/src/main/java/org/jpos/util/SimpleMsg.java +++ b/jpos/src/main/java/org/jpos/util/SimpleMsg.java @@ -21,13 +21,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; - -import org.jpos.iso.ISOUtil; - import java.io.PrintStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import org.jpos.iso.ISOUtil; + /** *

* A simple general purpose loggeable message. @@ -90,7 +90,7 @@ public void dump(PrintStream p, String indent) { if (msgContent instanceof Object[]) cl = Arrays.asList((Object[]) msgContent); else if (msgContent instanceof Collection) - cl = (Collection) msgContent; + cl = new ArrayList<>((Collection) msgContent); else if (msgContent instanceof Loggeable) cl = Arrays.asList(msgContent); else if (msgContent instanceof Throwable) diff --git a/jpos/src/test/java/org/jpos/iso/ISOMsgConcurrentTest.java b/jpos/src/test/java/org/jpos/iso/ISOMsgConcurrentTest.java new file mode 100644 index 0000000000..d497df2908 --- /dev/null +++ b/jpos/src/test/java/org/jpos/iso/ISOMsgConcurrentTest.java @@ -0,0 +1,332 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.iso; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jpos.tlv.TLVList; +import org.jpos.util.FSDMsg; +import org.junit.jupiter.api.Test; + +import org.jpos.iso.Dataset; +import org.jpos.iso.DatasetFormat; +import org.jpos.iso.ISODataset; + +/** + * Tests for concurrent modification exceptions in ISO components. + * + *

These tests verify that dump() operations on ISOMsg, TLVList, FSDMsg, + * and ISODatasetField are safe when called concurrently with set/unset operations + * from other threads, matching the pattern used in ContextTest and ProfilerTest. + */ +public class ISOMsgConcurrentTest { + + private static final int ITERATIONS = 2000; + private static final int THREADS = 4; + + /** + * ISOMsg.dump() iterates fields.keySet() (TreeMap) while set()/unset() + * modify the same map from other threads, causing ConcurrentModificationException. + * + *

This is THE highest risk because ISOMsg is the most common object stored + * in Context for request/response messages. + */ + @Test + public void testISOMsgConcurrentDumpAndSet() throws Throwable { + final ISOMsg msg = new ISOMsg(); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + msg.set(1 + (threadNum * 100) + j, "value"); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + msg.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and set"); + } + + /** + * Verifies that ISOMsg.dump() with nested ISOMsg (field path) does not throw + * ConcurrentModificationException when fields are modified concurrently. + */ + @Test + public void testISOMsgNestedFieldConcurrentDumpAndSet() throws Throwable { + final ISOMsg msg = new ISOMsg(); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + msg.set("63." + (j % 10) + ".1", "value" + j); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writer.start(); + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + msg.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + writer.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during nested field dump"); + } + + /** + * TLVList.dump() iterates getTags() which returns the raw internal ArrayList, + * while append() and remove() modify it from other threads, causing CME. + * + *

TLVList is commonly nested in ISOMsg fields for EMV data and proprietary + * TLV structures. + */ + @Test + public void testTLVListConcurrentDumpAndAppend() throws Throwable { + final TLVList tlvList = new TLVList(); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + tlvList.append(0xD0 + threadNum, ("value" + j).getBytes()); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + tlvList.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and append"); + } + + /** + * TLVList with nested TLVMsg that triggers modification during toString() + * (similar to the Profiler Entry scenario). + */ + @Test + public void testTLVListDumpWithMutatingNestedMsg() throws Exception { + final TLVList tlvList = new TLVList(); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread mutator = new Thread(() -> { + for (int j = 0; j < ITERATIONS; j++) { + try { + tlvList.append(0xD0 + (j % 10), ("value" + j).getBytes()); + if (j % 10 == 0) Thread.yield(); + } catch (Exception e) { + failed.set(true); + } + } + }); + mutator.start(); + + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + tlvList.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + + mutator.join(); + assertFalse(failed.get(), "ConcurrentModificationException was thrown during dump with concurrent append"); + } + + /** + * FSDMsg.dump() iterates fields.keySet() (LinkedHashMap) while set()/unset() + * modify it from other threads, causing CME. + */ + @Test + public void testFSDMsgConcurrentDumpAndSet() throws Throwable { + final FSDMsg msg = new FSDMsg("test-schema", "test"); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + msg.set("field" + (threadNum * 100 + j), "value" + j); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + msg.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and set"); + } + + /** + * ISODatasetField.dump() iterates datasets ArrayList while addDataset() + * and setValue() modify it from other threads, causing CME. + */ + @Test + public void testISODatasetFieldConcurrentDumpAndAddDataset() throws Throwable { + final ISODatasetField datasetField = new ISODatasetField(1); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + Dataset ds = new ISODataset(0x01 + threadNum, DatasetFormat.TLV); + datasetField.addDataset(ds); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + datasetField.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and addDataset"); + } +} \ No newline at end of file diff --git a/jpos/src/test/java/org/jpos/iso/ISOMsgPackConcurrentTest.java b/jpos/src/test/java/org/jpos/iso/ISOMsgPackConcurrentTest.java new file mode 100644 index 0000000000..4849e814eb --- /dev/null +++ b/jpos/src/test/java/org/jpos/iso/ISOMsgPackConcurrentTest.java @@ -0,0 +1,529 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.iso; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +/** + * Tests for concurrent pack/unpack and set/unset operations on ISOMsg. + *

+ * These tests verify that the lock used in pack()/unpack() matches the lock used in + * set()/unset()/recalcBitMap()/getMaxField()/unset() — i.e. all must synchronize on the + * same object ({@code fields}) to prevent concurrent modification and stale-read races. + *

+ * The existing {@link ISOMsgConcurrentTest} only exercises dump() + set/unset, which was + * already safe. These tests target the critical race: if pack()/unpack() used a different + * lock (e.g. {@code synchronized(this)}) than set()/unset() ({@code synchronized(fields)}), + * they could run concurrently and corrupt the fields map during iteration in + * recalcBitMap(). Each test isolates a specific synchronization path. + */ +public class ISOMsgPackConcurrentTest { + + /** Number of operations per thread — high enough to surface races but bounded for CI speed. */ + private static final int ITERATIONS = 1000; + /** Number of concurrent writer/reader threads to maximize contention. */ + private static final int THREADS = 4; + + /** + * Minimal ISOPackager that packs/unpacks nothing — just returns empty bytes. + *

+ * Used to exercise the {@code synchronized(fields)} path in pack()/unpack() without + * requiring a full descriptor-based packager configuration (which would need external XML files). + * The packager itself does no locking; all mutual exclusion comes from ISOMsg's synchronized blocks. + */ + private static final ISOPackager NOOP_PACKAGER = new ISOPackager() { + @Override + public byte[] pack(ISOComponent m) throws ISOException { + return new byte[0]; + } + @Override + public int unpack(ISOComponent m, byte[] b) throws ISOException { + return 0; + } + @Override + public void unpack(ISOComponent m, InputStream in) throws ISOException { + } + @Override + public String getFieldDescription(ISOComponent m, int fldNumber) { + return null; + } + @Override + public ISOMsg createISOMsg() { + return new ISOMsg(); + } + @Override + public String getDescription() { + return "NoOpPackager"; + } + }; + + /** + * Verifies that recalcBitMap() can run concurrently with set()/unset() without + * throwing ConcurrentModificationException or producing inconsistent state. + *

+ * This is the core race condition: recalcBitMap() iterates {@code fields.keySet()} + * while writers call {@link ISOMsg#set(int, String)} and {@link ISOMsg#unset(int)}. + * It also tests reentrant lock behavior — recalcBitMap() calls {@code set(new ISOBitMap(...))} + * internally, which acquires the same {@code synchronized(fields)} monitor. The JVM + * lock count increments rather than blocking, so this must not deadlock. + */ + @Test + public void testRecalcBitMapConcurrentWithSetAndUnset() throws Throwable { + // Create a message with initial fields to give recalcBitMap something to work with. + final ISOMsg msg = new ISOMsg(); + msg.setMTI("0200"); + msg.set(11, "000001"); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + final AtomicInteger recalcCount = new AtomicInteger(0); + final AtomicInteger setCount = new AtomicInteger(0); + + // Launch multiple writer threads, each targeting distinct field ranges to maximize contention. + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each writer targets a unique field number range to avoid contention on the same key. + int fieldNo = 1 + (threadNum * 100) + j; + // Alternate between set and unset to test both mutation paths under lock. + if (j % 5 == 0) { + msg.unset(fieldNo); + } else { + msg.set(fieldNo, "value" + j); + } + setCount.incrementAndGet(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + // Single recalcBitMap thread competing with all writers for the fields lock. + Thread recalcThread = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + try { + msg.recalcBitMap(); + recalcCount.incrementAndGet(); + } catch (Exception e) { + failed.set(true); + } + // Yield periodically to increase interleaving with writer threads. + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + recalcThread.start(); + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : writers) t.join(); + recalcThread.join(); + + assertFalse(failed.get(), + "ConcurrentModificationException during concurrent recalcBitMap/set/unset. " + + "recalc()=" + recalcCount.get() + " set/unset()=" + setCount.get()); + } + + /** + * Verifies that pack() can run concurrently with set()/unset() without + * throwing ConcurrentModificationException or producing inconsistent state. + *

+ * This is the critical race condition we fixed: pack() acquires {@code synchronized(fields)}, + * then calls recalcBitMap() which iterates {@code fields.keySet()}, while writer threads call + * {@link ISOMsg#set(int, String)} and {@link ISOMsg#unset(int)} — also under + * {@code synchronized(fields)}. If pack() had used a different lock (e.g. the old + * {@code synchronized(this)}), these operations would run concurrently on different monitors, + * allowing concurrent modification of the TreeMap during iteration. This test would fail + * with ConcurrentModificationException or produce silent data corruption if the locks diverge again. + */ + @Test + public void testPackConcurrentWithSetAndUnset() throws Throwable { + // Create message with noop packager and initial fields. + final ISOMsg msg = new ISOMsg(); + msg.setPackager(NOOP_PACKAGER); + msg.setMTI("0200"); + msg.set(11, "000001"); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + final AtomicInteger packCount = new AtomicInteger(0); + final AtomicInteger setCount = new AtomicInteger(0); + + // Launch multiple writer threads competing with packer for the fields lock. + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each writer targets a unique field number range to maximize contention. + int fieldNo = 1 + (threadNum * 100) + j; + // Alternate between set and unset to exercise both mutation paths. + if (j % 5 == 0) { + msg.unset(fieldNo); + } else { + msg.set(fieldNo, "value" + j); + } + setCount.incrementAndGet(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + // Single packer thread — the critical path that iterates fields.keySet() inside synchronized(fields). + Thread packer = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + try { + msg.pack(); + packCount.incrementAndGet(); + } catch (Exception e) { + failed.set(true); + } + // Yield periodically to increase interleaving with writer threads. + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + packer.start(); + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : writers) t.join(); + packer.join(); + + assertFalse(failed.get(), + "ConcurrentModificationException during concurrent pack/set/unset. " + + "pack()=" + packCount.get() + " set/unset()=" + setCount.get()); + } + + /** + * Verifies that getMaxField() returns consistent values when called + * concurrently with set()/unset(). + *

+ * Without proper synchronization, getMaxField() could read a stale {@code maxField} value + * because the dirty flag check and the field iteration in recalcMaxField() are not atomic + * relative to concurrent set() calls. The {@code synchronized(fields)} block ensures that + * the {@code maxFieldDirty} check, the optional recalculation, and the return of + * {@code maxField} all happen under a single lock acquisition — so any thread sees a + * consistent snapshot. The {@code volatile} keyword on {@code maxField} provides an extra + * safety net for any indirect reads outside the synchronized block. + */ + @Test + public void testGetMaxFieldConcurrentWithSet() throws Throwable { + final ISOMsg msg = new ISOMsg(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + final AtomicInteger readCount = new AtomicInteger(0); + + // Launch multiple writer threads that only call set(), pushing maxField higher over time. + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each writer targets a unique field number range to maximize contention. + msg.set(1 + (threadNum * 100) + j, "value"); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + // Launch multiple reader threads that only call getMaxField() to check for stale reads. + Thread[] readers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + readers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + int mf = msg.getMaxField(); + // Verify maxField stays within expected bounds — a stale or corrupted value would be out of range. + if (mf < 0 || mf > 400000) { + failed.set(true); + } + readCount.incrementAndGet(); + // Yield periodically to increase interleaving with writer threads. + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + readers[i].start(); + } + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : writers) t.join(); + for (Thread t : readers) t.join(); + + assertFalse(failed.get(), + "Inconsistent getMaxField() during concurrent set. readCount=" + readCount.get()); + } + + /** + * Verifies that unpack() can run concurrently with set()/unset() without + * throwing ConcurrentModificationException or producing inconsistent state. + *

+ * Tests the {@code synchronized(fields)} path in {@link ISOMsg#unpack(byte[])} and + * {@link ISOMsg#unpack(InputStream)}. The noop packager inserts no fields, so the test + * isolates the lock acquisition itself — if unpack() used a different lock than set()/unset(), + * concurrent map mutations during the unpack lifecycle could corrupt internal state. + */ + @Test + public void testUnpackConcurrentWithSetAndUnset() throws Throwable { + final ISOMsg msg = new ISOMsg(); + msg.setPackager(NOOP_PACKAGER); + msg.setMTI("0200"); + msg.set(11, "000001"); + + // Pack once to get valid bytes (noop packager returns empty array). + byte[] packed = msg.pack(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + final AtomicInteger unpackCount = new AtomicInteger(0); + final AtomicInteger setCount = new AtomicInteger(0); + + // Launch multiple writer threads competing with unpacker for the fields lock. + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each writer targets a unique field number range to maximize contention. + int fieldNo = 1 + (threadNum * 100) + j; + // Alternate between set and unset to exercise both mutation paths under lock. + if (j % 5 == 0) { + msg.unset(fieldNo); + } else { + msg.set(fieldNo, "value" + j); + } + setCount.incrementAndGet(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + // Single unpacker thread — the critical path that must not conflict with concurrent set()/unset(). + Thread unpacker = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + try { + msg.unpack(packed); + unpackCount.incrementAndGet(); + } catch (Exception e) { + failed.set(true); + } + // Yield periodically to increase interleaving with writer threads. + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + unpacker.start(); + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : writers) t.join(); + unpacker.join(); + + assertFalse(failed.get(), + "ConcurrentModificationException during concurrent unpack/set/unset. " + + "unpack()=" + unpackCount.get() + " set/unset()=" + setCount.get()); + } + + /** + * Stress test that runs multiple packer and setter threads simultaneously. + *

+ * This maximizes contention by having {@code THREADS} packers all calling pack() while + * {@code THREADS} setters concurrently call set(). The high thread count and alternating + * operations increase the probability of catching lock mismatches that simpler tests might + * miss. If any thread acquires a different lock than another, the TreeMap iteration in + * recalcBitMap() may observe structural modifications from an unsynchronized path. + */ + @Test + public void testPackSetAlternatingStress() throws Throwable { + final ISOMsg msg = new ISOMsg(); + msg.setPackager(NOOP_PACKAGER); + msg.setMTI("0200"); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + // Launch multiple packer threads — each acquires synchronized(fields) and iterates keys. + Thread[] packers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + packers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + try { + msg.pack(); + } catch (Exception e) { + failed.set(true); + } + } + } catch (Exception e) { + failed.set(true); + } + }); + packers[i].start(); + } + + // Launch multiple setter threads — each acquires synchronized(fields) to mutate the map. + Thread[] setters = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + setters[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each setter targets a unique field number range. + msg.set(1 + (threadNum * 100) + j, "value" + j); + } + } catch (Exception e) { + failed.set(true); + } + }); + setters[i].start(); + } + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : packers) t.join(); + for (Thread t : setters) t.join(); + + assertFalse(failed.get(), "ConcurrentModificationException during alternating pack/set stress"); + } + + /** + * Verifies that unset() can run concurrently with set() without + * throwing ConcurrentModificationException. + *

+ * Exercises the {@code synchronized(fields)} path in {@link ISOMsg#unset(int)} under + * contention from both setter and unsetter threads. Tests mutual exclusion between two + * different mutating methods — if either one loses its lock, concurrent map modifications + * during iteration (e.g. by pack() or dump()) would corrupt state. The writers alternate + * between set and unset operations to increase interleaving complexity. + */ + @Test + public void testUnsetConcurrentWithSet() throws Throwable { + final ISOMsg msg = new ISOMsg(); + msg.setMTI("0200"); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + final AtomicInteger unsetCount = new AtomicInteger(0); + final AtomicInteger setCount = new AtomicInteger(0); + + // Launch writer threads that alternate between set and unset operations. + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each writer targets a unique field number range. + int fieldNo = 1 + (threadNum * 100) + j; + msg.set(fieldNo, "value" + j); + // Periodically unset the previous field to create interleaved set/unset patterns. + if (j > 0 && j % 3 == 0) { + msg.unset(fieldNo - 1); + unsetCount.incrementAndGet(); + } + setCount.incrementAndGet(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + // Launch dedicated unsetter threads that only call unset() — competing with writers for the fields lock. + Thread[] unsets = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + unsets[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + // Each unsetter targets a unique field number range. + msg.unset(1 + (threadNum * 100) + j); + unsetCount.incrementAndGet(); + // Yield periodically to increase interleaving with writer threads. + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + unsets[i].start(); + } + + // Release all threads simultaneously and wait for completion. + startLatch.countDown(); + for (Thread t : writers) t.join(); + for (Thread t : unsets) t.join(); + + assertFalse(failed.get(), + "ConcurrentModificationException during concurrent set/unset. " + + "set()=" + setCount.get() + " unset()=" + unsetCount.get()); + } +} diff --git a/jpos/src/test/java/org/jpos/security/CryptographicServiceMessageConcurrentTest.java b/jpos/src/test/java/org/jpos/security/CryptographicServiceMessageConcurrentTest.java new file mode 100644 index 0000000000..201815a41e --- /dev/null +++ b/jpos/src/test/java/org/jpos/security/CryptographicServiceMessageConcurrentTest.java @@ -0,0 +1,92 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.security; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; + +/** + * Test for concurrent modification exceptions in CryptographicServiceMessage. + * + *

CryptographicServiceMessage.dump() iterates fields.keySet() (LinkedHashMap) + * while addField() modifies the same map from another thread, causing CME. + */ +public class CryptographicServiceMessageConcurrentTest { + + private static final int ITERATIONS = 2000; + private static final int THREADS = 4; + + /** + * CryptographicServiceMessage.dump() iterates fields.keySet() at line 194. + * The addField() method at line 118 modifies fields.put() without synchronization. + * Concurrent iteration during modification causes ConcurrentModificationException. + */ + @Test + public void testCryptographicServiceMessageConcurrentDumpAndAddField() throws Throwable { + final CryptographicServiceMessage csm = new CryptographicServiceMessage(); + csm.setMCL(CryptographicServiceMessage.MCL_KSM); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + csm.addField("TAG" + (threadNum * 1000 + j), "value" + j); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + csm.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and addField"); + } +} \ No newline at end of file diff --git a/jpos/src/test/java/org/jpos/security/SecureKeyBlockConcurrentTest.java b/jpos/src/test/java/org/jpos/security/SecureKeyBlockConcurrentTest.java new file mode 100644 index 0000000000..c266e836ba --- /dev/null +++ b/jpos/src/test/java/org/jpos/security/SecureKeyBlockConcurrentTest.java @@ -0,0 +1,79 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.security; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jpos.iso.ISOUtil; +import org.junit.jupiter.api.Test; + +/** + * Test for concurrent modification exceptions in SecureKeyBlock. + * + *

SecureKeyBlock.dump() now properly synchronizes on 'this' when iterating + * the optionalHeaders map, preventing ConcurrentModificationException when + * multiple threads call dump() concurrently. + */ +public class SecureKeyBlockConcurrentTest { + + private static final int ITERATIONS = 2000; + private static final int THREADS = 4; + + @Test + public void testSecureKeyBlockConcurrentDump() throws Throwable { + // Use working test data from SecureKeyBlockBuilderTest.testBuildMAC8 + final SecureKeyBlock keyBlock = SecureKeyBlockBuilder.newBuilder().build( + "00040V2RG17N0003" + + ISOUtil.hexString(ISOUtil.hex2byte("A9B8C7D6E5F49382")) // 16 hex chars = 8 bytes key + + ISOUtil.hexString(ISOUtil.hex2byte("E1F22F1E")) // 8 hex chars = 4 bytes MAC + ); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] dumpers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + dumpers[i] = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + keyBlock.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } + }); + dumpers[i].start(); + } + + startLatch.countDown(); + for (Thread t : dumpers) t.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump calls"); + } +} \ No newline at end of file diff --git a/jpos/src/test/java/org/jpos/security/SecureKeySpecConcurrentTest.java b/jpos/src/test/java/org/jpos/security/SecureKeySpecConcurrentTest.java new file mode 100644 index 0000000000..8f920a7104 --- /dev/null +++ b/jpos/src/test/java/org/jpos/security/SecureKeySpecConcurrentTest.java @@ -0,0 +1,100 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.security; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.lang.reflect.Field; + +import org.junit.jupiter.api.Test; + +/** + * Test for concurrent modification exceptions in SecureKeySpec. + * + *

SecureKeySpec.dump() iterates optionalHeaders.entrySet() (LinkedHashMap) + * while optionalHeaders may be modified from another thread via reflection, + * causing ConcurrentModificationException. + */ +public class SecureKeySpecConcurrentTest { + + private static final int ITERATIONS = 2000; + + /** + * SecureKeySpec.dump() iterates optionalHeaders LinkedHashMap at line 514. + * The optionalHeaders map can be modified via reflection or direct field access, + * causing CME when dump() iterates concurrently. + */ + @Test + public void testSecureKeySpecConcurrentDumpAndModifyHeaders() throws Throwable { + final SecureKeySpec spec = new SecureKeySpec(); + spec.setKeyType("TYPE_ZPK"); + spec.setKeyLength(128); + + // Use reflection to add initial headers + Field field = SecureKeySpec.class.getDeclaredField("optionalHeaders"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + java.util.Map optionalHeaders = + (java.util.Map) field.get(spec); + optionalHeaders.put("INIT", "initial-value"); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread modifier = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + synchronized (optionalHeaders) { + optionalHeaders.put("KEY" + j, "VALUE" + j); + } + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + modifier.start(); + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + spec.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + modifier.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and modify"); + } +} \ No newline at end of file diff --git a/jpos/src/test/java/org/jpos/util/ProfilerTest.java b/jpos/src/test/java/org/jpos/util/ProfilerTest.java index 7822b44e0b..ed7ebaa402 100644 --- a/jpos/src/test/java/org/jpos/util/ProfilerTest.java +++ b/jpos/src/test/java/org/jpos/util/ProfilerTest.java @@ -27,6 +27,8 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -71,7 +73,8 @@ public void testDumpThrowsNullPointerException() throws Throwable { if (isJavaVersionAtMost(JAVA_14)) { assertNull(ex.getMessage(), "ex.getMessage()"); } else { - assertEquals("Cannot invoke \"java.io.PrintStream.println(String)\" because \"p\" is null", ex.getMessage(), "ex.getMessage()"); + assertEquals("Cannot invoke \"java.io.PrintStream.println(String)\" because \"p\" is null", + ex.getMessage(), "ex.getMessage()"); } } } @@ -93,6 +96,115 @@ public String toString() { profiler.dump(new PrintStream(new ByteArrayOutputStream()), "testProfilerIndent"); } + /** + * Verifies that concurrent calls to {@link Profiler#dump} and + * {@link Profiler#checkPoint} + * do not cause a {@link java.util.ConcurrentModificationException}. + * + *

+ * This test simulates the real-world scenario where one thread is + * logging/dumping + * profiler results while other threads are simultaneously recording + * checkpoints. + * The race condition occurs when dump() iterates over the events map while + * checkPoint() modifies it from another thread. + *

+ * + *

+ * Test strategy: spawn multiple writer threads calling checkPoint() + * concurrently + * with a dumper thread calling dump(). Both use CountDownLatch to start + * simultaneously, + * maximizing the chance of interleaving. Thread.yield() calls increase + * contention + * by giving other threads opportunity to acquire the profiler's lock. + *

+ * + * @throws Throwable if any thread encounters an exception during execution + */ + @Test + public void testConcurrentDumpAndCheckpoint() throws Throwable { + Profiler profiler = new Profiler(); + + // Test configuration: 4 writer threads each performing 2000 checkpoint + // operations + // with a single dumper thread performing 2000 dump operations. + // This creates significant contention on the profiler's internal lock. + final int threads = 4; + final int iterations = 2000; + + // startLatch ensures all threads begin simultaneously, maximizing race + // conditions + final CountDownLatch startLatch = new CountDownLatch(1); + + // failed flag captured by all threads; any exception sets it to true + final AtomicBoolean failed = new AtomicBoolean(false); + + // Spawn writer threads that continuously call checkPoint() + final Thread[] writers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + // Wait for start signal before beginning + startLatch.await(); + + for (int j = 0; j < iterations; j++) { + // Each thread records checkpoints with unique names to avoid lock + // contention on the same key, but still modifies the shared events map + profiler.checkPoint("t" + threadNum + "i" + j); + + // Yield every 10 iterations to increase chance of lock contention + // between threads, improving the likelihood of catching the race + if (j % 10 == 0) { + Thread.yield(); + } + } + } catch (Exception e) { + // Capture any exception (including ConcurrentModificationException) + failed.set(true); + } + }); + writers[i].start(); + } + + // Spawn dumper thread that continuously calls dump() + final Thread dumper = new Thread(() -> { + try { + // Wait for start signal before beginning + startLatch.await(); + + for (int j = 0; j < iterations; j++) { + // dump() iterates over the events map while writers modify it, + // triggering ConcurrentModificationException if not properly synchronized + profiler.dump(new PrintStream(new ByteArrayOutputStream()), ""); + + // Yield every 10 iterations to increase contention with writers + if (j % 10 == 0) { + Thread.yield(); + } + } + } catch (Exception e) { + // Capture any exception (including ConcurrentModificationException) + failed.set(true); + } + }); + dumper.start(); + + // Start all threads at the same moment + startLatch.countDown(); + + // Wait for dumper to finish first, then writers + dumper.join(); + for (Thread t : writers) { + t.join(); + } + + // Assert no thread encountered an exception + assertTrue(!failed.get(), + "Concurrent modification or other exception should not occur"); + } + @Test public void testGetPartial() throws Throwable { new Profiler().getPartial(); diff --git a/jpos/src/test/java/org/jpos/util/SimpleMsgConcurrentTest.java b/jpos/src/test/java/org/jpos/util/SimpleMsgConcurrentTest.java new file mode 100644 index 0000000000..b4390a54c5 --- /dev/null +++ b/jpos/src/test/java/org/jpos/util/SimpleMsgConcurrentTest.java @@ -0,0 +1,147 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2026 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.util; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; + +/** + * Test for concurrent modification exceptions in SimpleMsg. + * + *

SimpleMsg.dump() iterates over Collection derived from msgContent (line 112) + * while setMsgContent() may modify the msgContent from another thread, + * causing ConcurrentModificationException. + */ +public class SimpleMsgConcurrentTest { + + private static final int ITERATIONS = 2000; + private static final int THREADS = 4; + + /** + * SimpleMsg.dump() iterates the Collection derived from msgContent. + * When msgContent is a Collection (or becomes one via setMsgContent), + * the for loop at line 112 can throw ConcurrentModificationException + * if another thread calls setMsgContent with a new Collection. + */ + @Test + public void testSimpleMsgConcurrentDumpAndSetMsgContent() throws Throwable { + final SimpleMsg msg = new SimpleMsg("test", "name", new ArrayList<>()); + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread[] writers = new Thread[THREADS]; + for (int i = 0; i < THREADS; i++) { + final int threadNum = i; + writers[i] = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + List content = new ArrayList<>(); + content.add("item" + (threadNum * 1000 + j)); + msg.setMsgContent(content); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + writers[i].start(); + } + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + msg.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + for (Thread t : writers) t.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and setMsgContent"); + } + + /** + * SimpleMsg with a shared mutable List that is modified while dump iterates it. + * This simulates the actual problematic case where msgContent is a Collection + * that is shared and modified by multiple threads. + */ + @Test + public void testSimpleMsgConcurrentDumpWithSharedCollection() throws Throwable { + final List sharedList = new ArrayList<>(); + sharedList.add("initial"); + final SimpleMsg msg = new SimpleMsg("test", "shared", sharedList); + + final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(false); + + Thread modifier = new Thread(() -> { + try { + startLatch.await(); + for (int j = 0; j < ITERATIONS; j++) { + synchronized (sharedList) { + sharedList.add("item" + j); + } + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + modifier.start(); + + Thread dumper = new Thread(() -> { + try { + startLatch.await(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (int j = 0; j < ITERATIONS; j++) { + msg.dump(new PrintStream(baos), " "); + baos.reset(); + if (j % 10 == 0) Thread.yield(); + } + } catch (Exception e) { + failed.set(true); + } + }); + dumper.start(); + + startLatch.countDown(); + modifier.join(); + dumper.join(); + + assertFalse(failed.get(), "ConcurrentModificationException was thrown during concurrent dump and list modification"); + } +} \ No newline at end of file