Skip to content

Commit 99e9e3c

Browse files
nficanoclaude
andcommitted
refactor: prefer streams for collection transforms; tighten conventions
Convert imperative collection-transform loops to Stream pipelines across core/runtime/client where the body is pure map/filter/reduce (no checked exceptions, no primitives, no I/O). Examples: every enum fromWire, the Lease capability builder, AgentRegistry's resolve + describe, the SessionLoop job-filter chain, and the ResumeBuffer ring snapshots. Tighten Java conventions while in the area: - annotate IdempotencyStore.claim nullable return - make ArcpClient's sealed Message switch exhaustive (no default branch) - narrow catch (Exception) to RuntimeException where Agent.run is not the boundary; narrow WebSocketTransport close to ExecutionException | TimeoutException - move ReplayingPublisher and StdioTransport off synchronized blocks to ReentrantLock, with comments explaining why the lock spans blocking I/O - promote volatile long lastInboundMillis to AtomicLong for consistency with neighbouring counters - TckProvider extends AutoCloseable so ConformanceSuite can drop its hand-rolled finally close Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1c7d4fa commit 99e9e3c

18 files changed

Lines changed: 168 additions & 181 deletions

File tree

arcp-client/src/main/java/dev/arcp/client/ArcpClient.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
8484
private final Duration ackInterval;
8585
private final AtomicLong lastSeenSeq = new AtomicLong(-1);
8686
private final AtomicLong lastAckedSeq = new AtomicLong(-1);
87+
private final AtomicLong lastInboundMillis = new AtomicLong(System.currentTimeMillis());
8788
private @Nullable ScheduledFuture<?> ackTick;
8889
private @Nullable ScheduledFuture<?> heartbeatWatchdog;
89-
private volatile long lastInboundMillis = System.currentTimeMillis();
9090
private final ConcurrentHashMap<JobId, SubmissionPublisher<EventBody>> liveSubscribers =
9191
new ConcurrentHashMap<>();
9292
@SuppressWarnings("unused")
@@ -212,14 +212,14 @@ public void onSubscribe(Flow.Subscription s) {
212212

213213
@Override
214214
public void onNext(Envelope envelope) {
215-
lastInboundMillis = System.currentTimeMillis();
215+
lastInboundMillis.set(System.currentTimeMillis());
216216
Long seq = envelope.eventSeq();
217217
if (seq != null) {
218218
lastSeenSeq.updateAndGet(prev -> Math.max(prev, seq));
219219
}
220220
try {
221221
dispatch(envelope);
222-
} catch (Exception e) {
222+
} catch (RuntimeException e) {
223223
log.warn("client dispatch error for {}: {}", envelope.type(), e.toString());
224224
}
225225
}
@@ -256,7 +256,15 @@ private void dispatch(Envelope envelope) {
256256
case JobSubscribed ignored -> { /* signal */ }
257257
case SessionJobs jobs -> handleListResponse(jobs);
258258
case SessionPing ping -> handlePing(ping);
259-
default -> log.debug("client ignored: {}", envelope.type());
259+
case SessionPong ignored -> log.debug("client ignored: {}", envelope.type());
260+
case SessionAck ignored -> log.debug("client ignored: {}", envelope.type());
261+
case SessionHello ignored -> log.debug("client ignored: {}", envelope.type());
262+
case SessionBye ignored -> log.debug("client ignored: {}", envelope.type());
263+
case SessionListJobs ignored -> log.debug("client ignored: {}", envelope.type());
264+
case JobSubmit ignored -> log.debug("client ignored: {}", envelope.type());
265+
case JobCancel ignored -> log.debug("client ignored: {}", envelope.type());
266+
case JobSubscribe ignored -> log.debug("client ignored: {}", envelope.type());
267+
case JobUnsubscribe ignored -> log.debug("client ignored: {}", envelope.type());
260268
}
261269
}
262270

@@ -305,7 +313,7 @@ private void maybeAck() {
305313
}
306314

307315
private void watchHeartbeat(long intervalMs) {
308-
long elapsed = System.currentTimeMillis() - lastInboundMillis;
316+
long elapsed = System.currentTimeMillis() - lastInboundMillis.get();
309317
if (elapsed > intervalMs * 2) {
310318
log.info("client observed heartbeat loss; closing session");
311319
close();
@@ -319,11 +327,7 @@ private void handleAccepted(Envelope envelope, JobAccepted accepted) {
319327
// We associate by traversing pending submits in insertion order; the
320328
// runtime guarantees ordering per-session, so the oldest pending submit
321329
// is the one being acknowledged.
322-
MessageId match = null;
323-
for (var entry : pendingSubmits.entrySet()) {
324-
match = entry.getKey();
325-
break;
326-
}
330+
MessageId match = pendingSubmits.keySet().stream().findFirst().orElse(null);
327331
if (match == null) {
328332
return;
329333
}
@@ -370,11 +374,7 @@ private void handleError(Envelope envelope, JobError err) {
370374
Outstanding o = jid != null ? outstanding.remove(jid) : null;
371375
if (o == null) {
372376
// Top-level (unassigned) error: drop the oldest pending submit.
373-
MessageId first = null;
374-
for (var k : pendingSubmits.keySet()) {
375-
first = k;
376-
break;
377-
}
377+
MessageId first = pendingSubmits.keySet().stream().findFirst().orElse(null);
378378
if (first != null) {
379379
Outstanding pending = pendingSubmits.remove(first);
380380
if (pending != null) {

arcp-client/src/main/java/dev/arcp/client/ReplayingPublisher.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.Flow;
88
import java.util.concurrent.SubmissionPublisher;
99
import java.util.concurrent.atomic.AtomicBoolean;
10+
import java.util.concurrent.locks.ReentrantLock;
1011

1112
/**
1213
* Multicast {@link Flow.Publisher} that buffers every emission and replays the
@@ -18,24 +19,37 @@ final class ReplayingPublisher<T> implements Flow.Publisher<T> {
1819

1920
private final List<T> buffer = new CopyOnWriteArrayList<>();
2021
private final SubmissionPublisher<T> live;
22+
private final ReentrantLock lock = new ReentrantLock();
2123
private volatile boolean closed;
2224

2325
ReplayingPublisher() {
2426
this.live = new SubmissionPublisher<>(
2527
Executors.newVirtualThreadPerTaskExecutor(), 1024);
2628
}
2729

28-
synchronized void submit(T item) {
29-
buffer.add(item);
30-
live.submit(item);
30+
// Lock spans live.submit so concurrent producers preserve buffer/live order;
31+
// back-pressure blocking on a full submission queue therefore stalls peers.
32+
void submit(T item) {
33+
lock.lock();
34+
try {
35+
buffer.add(item);
36+
live.submit(item);
37+
} finally {
38+
lock.unlock();
39+
}
3140
}
3241

33-
synchronized void close() {
34-
if (closed) {
35-
return;
42+
void close() {
43+
lock.lock();
44+
try {
45+
if (closed) {
46+
return;
47+
}
48+
closed = true;
49+
live.close();
50+
} finally {
51+
lock.unlock();
3652
}
37-
closed = true;
38-
live.close();
3953
}
4054

4155
@Override
@@ -46,7 +60,8 @@ public void subscribe(Flow.Subscriber<? super T> downstream) {
4660

4761
// Hold the publisher lock while snapshotting AND attaching to live so
4862
// no submit() can interleave between the two and produce a gap.
49-
synchronized (this) {
63+
lock.lock();
64+
try {
5065
snapshot = new ArrayList<>(buffer);
5166
wasClosed = closed;
5267
if (!wasClosed) {
@@ -78,6 +93,8 @@ public void onComplete() {
7893
}
7994
});
8095
}
96+
} finally {
97+
lock.unlock();
8198
}
8299

83100
downstream.onSubscribe(new Flow.Subscription() {

arcp-client/src/main/java/dev/arcp/client/WebSocketTransport.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.Flow;
1919
import java.util.concurrent.SubmissionPublisher;
2020
import java.util.concurrent.TimeUnit;
21+
import org.jspecify.annotations.Nullable;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

@@ -63,7 +64,7 @@ public void onOpen(WebSocket webSocket) {
6364
}
6465

6566
@Override
66-
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
67+
public @Nullable CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
6768
WebSocketTransport t = futureSocket.get();
6869
if (t != null) {
6970
t.handleText(data, last);
@@ -73,7 +74,7 @@ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean
7374
}
7475

7576
@Override
76-
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
77+
public @Nullable CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
7778
WebSocketTransport t = futureSocket.get();
7879
if (t != null) {
7980
t.inbound.close();
@@ -146,7 +147,8 @@ public void close() {
146147
socket.sendClose(WebSocket.NORMAL_CLOSURE, "bye").get(2, TimeUnit.SECONDS);
147148
} catch (InterruptedException e) {
148149
Thread.currentThread().interrupt();
149-
} catch (Exception ignored) {
150+
} catch (java.util.concurrent.ExecutionException
151+
| java.util.concurrent.TimeoutException ignored) {
150152
// best-effort close
151153
}
152154
inbound.close();

arcp-core/src/main/java/dev/arcp/core/capabilities/Capabilities.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonInclude;
55
import com.fasterxml.jackson.annotation.JsonProperty;
6-
import java.util.ArrayList;
7-
import java.util.Collections;
86
import java.util.EnumSet;
97
import java.util.HashSet;
108
import java.util.List;
119
import java.util.Set;
10+
import java.util.stream.Collectors;
1211
import org.jspecify.annotations.Nullable;
1312

1413
/**
@@ -35,25 +34,19 @@ static Capabilities fromJson(
3534
@JsonProperty("encodings") @Nullable List<String> encodings,
3635
@JsonProperty("features") @Nullable List<String> features,
3736
@JsonProperty("agents") @Nullable List<AgentDescriptor> agents) {
38-
Set<Feature> parsed = EnumSet.noneOf(Feature.class);
39-
if (features != null) {
40-
for (String w : features) {
41-
Feature.fromWire(w).ifPresent(parsed::add);
42-
}
43-
}
37+
Set<Feature> parsed = features == null
38+
? EnumSet.noneOf(Feature.class)
39+
: features.stream()
40+
.flatMap(w -> Feature.fromWire(w).stream())
41+
.collect(Collectors.toCollection(() -> EnumSet.noneOf(Feature.class)));
4442
return new Capabilities(
4543
encodings == null ? List.of("json") : encodings, parsed, agents);
4644
}
4745

4846
@JsonProperty("features")
4947
@JsonInclude(JsonInclude.Include.NON_EMPTY)
5048
public List<String> featuresWire() {
51-
List<String> out = new ArrayList<>(features.size());
52-
for (Feature f : features) {
53-
out.add(f.wire());
54-
}
55-
Collections.sort(out);
56-
return out;
49+
return features.stream().map(Feature::wire).sorted().toList();
5750
}
5851

5952
public static Set<Feature> intersect(Set<Feature> a, Set<Feature> b) {

arcp-core/src/main/java/dev/arcp/core/capabilities/Feature.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonValue;
5+
import java.util.Arrays;
56
import java.util.Optional;
67

78
public enum Feature {
@@ -28,11 +29,8 @@ public String wire() {
2829

2930
@JsonCreator
3031
public static Optional<Feature> fromWire(String wire) {
31-
for (Feature f : values()) {
32-
if (f.wire.equals(wire)) {
33-
return Optional.of(f);
34-
}
35-
}
36-
return Optional.empty();
32+
return Arrays.stream(values())
33+
.filter(f -> f.wire.equals(wire))
34+
.findFirst();
3735
}
3836
}

arcp-core/src/main/java/dev/arcp/core/error/ErrorCode.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonValue;
5+
import java.util.Arrays;
56

67
public enum ErrorCode {
78
PERMISSION_DENIED(false),
@@ -37,11 +38,9 @@ public String wire() {
3738

3839
@JsonCreator
3940
public static ErrorCode fromWire(String wire) {
40-
for (ErrorCode c : values()) {
41-
if (c.name().equals(wire)) {
42-
return c;
43-
}
44-
}
45-
return INTERNAL_ERROR;
41+
return Arrays.stream(values())
42+
.filter(c -> c.name().equals(wire))
43+
.findFirst()
44+
.orElse(INTERNAL_ERROR);
4645
}
4746
}

arcp-core/src/main/java/dev/arcp/core/events/EventBody.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,10 @@ public String wire() {
4242
}
4343

4444
public static Kind fromWire(String wire) {
45-
for (Kind k : values()) {
46-
if (k.wire.equals(wire)) {
47-
return k;
48-
}
49-
}
50-
throw new IllegalArgumentException("unknown event kind: " + wire);
45+
return java.util.Arrays.stream(values())
46+
.filter(k -> k.wire.equals(wire))
47+
.findFirst()
48+
.orElseThrow(() -> new IllegalArgumentException("unknown event kind: " + wire));
5149
}
5250
}
5351
}

arcp-core/src/main/java/dev/arcp/core/lease/Lease.java

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.Objects;
14+
import java.util.stream.Collectors;
1415

1516
/**
1617
* Capability bag: namespace → list of pattern strings.
@@ -24,11 +25,12 @@ public final class Lease {
2425

2526
public Lease(Map<String, List<String>> capabilities) {
2627
Objects.requireNonNull(capabilities, "capabilities");
27-
Map<String, List<String>> copy = new LinkedHashMap<>();
28-
for (var e : capabilities.entrySet()) {
29-
copy.put(e.getKey(), List.copyOf(e.getValue()));
30-
}
31-
this.capabilities = Collections.unmodifiableMap(copy);
28+
this.capabilities = Collections.unmodifiableMap(capabilities.entrySet().stream()
29+
.collect(Collectors.toMap(
30+
Map.Entry::getKey,
31+
e -> List.copyOf(e.getValue()),
32+
(a, b) -> a,
33+
LinkedHashMap::new)));
3234
}
3335

3436
public static Lease empty() {
@@ -46,17 +48,21 @@ public List<String> patterns(String namespace) {
4648

4749
/** Parsed cost.budget initial amounts per currency. */
4850
public Map<String, BigDecimal> budget() {
49-
Map<String, BigDecimal> out = new LinkedHashMap<>();
50-
for (String entry : patterns("cost.budget")) {
51-
int colon = entry.indexOf(':');
52-
if (colon <= 0 || colon == entry.length() - 1) {
53-
throw new IllegalArgumentException("invalid cost.budget pattern: " + entry);
54-
}
55-
String currency = entry.substring(0, colon);
56-
BigDecimal amount = new BigDecimal(entry.substring(colon + 1));
57-
out.merge(currency, amount, BigDecimal::add);
58-
}
59-
return out;
51+
return patterns("cost.budget").stream()
52+
.map(entry -> {
53+
int colon = entry.indexOf(':');
54+
if (colon <= 0 || colon == entry.length() - 1) {
55+
throw new IllegalArgumentException("invalid cost.budget pattern: " + entry);
56+
}
57+
return Map.entry(
58+
entry.substring(0, colon),
59+
new BigDecimal(entry.substring(colon + 1)));
60+
})
61+
.collect(Collectors.toMap(
62+
Map.Entry::getKey,
63+
Map.Entry::getValue,
64+
BigDecimal::add,
65+
LinkedHashMap::new));
6066
}
6167

6268
@JsonCreator
@@ -66,18 +72,10 @@ static Lease fromJson(Map<String, List<String>> wire) {
6672

6773
/** §9.4 subset check: every key/pattern in {@code child} appears in {@code this}. */
6874
public boolean contains(Lease child) {
69-
for (var e : child.capabilities.entrySet()) {
75+
return child.capabilities.entrySet().stream().allMatch(e -> {
7076
List<String> parent = capabilities.get(e.getKey());
71-
if (parent == null) {
72-
return false;
73-
}
74-
for (String pat : e.getValue()) {
75-
if (!parent.contains(pat)) {
76-
return false;
77-
}
78-
}
79-
}
80-
return true;
77+
return parent != null && parent.containsAll(e.getValue());
78+
});
8179
}
8280

8381
public static final class Builder {

arcp-core/src/main/java/dev/arcp/core/messages/Message.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,10 @@ public String wire() {
5656
}
5757

5858
public static Type fromWire(String wire) {
59-
for (Type t : values()) {
60-
if (t.wire.equals(wire)) {
61-
return t;
62-
}
63-
}
64-
throw new IllegalArgumentException("unknown message type: " + wire);
59+
return java.util.Arrays.stream(values())
60+
.filter(t -> t.wire.equals(wire))
61+
.findFirst()
62+
.orElseThrow(() -> new IllegalArgumentException("unknown message type: " + wire));
6563
}
6664
}
6765
}

0 commit comments

Comments
 (0)