Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions .opencode/skills/group-metadata-members/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
name: group-metadata-members
description: Retrieve and inspect members from an io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata object in a MAT heap dump using OQL queries and the inspect-object command.
license: Apache-2.0
compatibility: opencode
metadata:
mat-cli: required
---

## Overview

This skill helps you extract consumer group membership information from a heap dump containing `io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata` objects.

## Finding GroupMetadata Objects

Use the `instances` command to list all `GroupMetadata` objects and identify the one you need:

```bash
mat-cli instances <heap-file> --class "io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata"
```

The output shows object addresses — note the address of the group you want to inspect.

## Inspecting GroupMetadata Fields

Once you have the object address, inspect its core fields:

```bash
mat-cli inspect-object <heap-file> --address <address> \
--field-paths groupId,generationId,protocolType,protocol,members
```
Comment on lines +29 to +31

- `groupId` — the consumer group ID (String)
- `generationId` — generation counter (int)
- `protocolType` — protocol type (String, e.g. "consumer")
- `protocol` — the current protocol (String, e.g. "range")
- `members` — a `Map<String, MemberMetadata>` keyed by member ID

## Extracting Member Details

`members` is a `HashMap`. To inspect an individual member, resolve the member ID key from the map and inspect its `MemberMetadata`:

```bash
mat-cli inspect-object <heap-file> --address <member-metadata-address> \
--field-paths memberId,clientId,clientHost,sessionTimeout,rebalanceTimeout,groupInstanceId,subscription,assignment
```
Comment on lines +44 to +46

Key `MemberMetadata` fields:
- `memberId` — unique member identifier (String)
- `clientId` — client ID (String)
- `clientHost` — host the client is running on (String)
- `sessionTimeout` — session timeout in ms (int)
- `rebalanceTimeout` — rebalance timeout in ms (int)
- `groupInstanceId` — static group instance ID, may be null (String)
- `subscription` — subscribed topics (usually a `SubscriptionData` or similar)
- `assignment` — byte array of partition assignment bytes (`byte[]`)

## Dumping All Members via OQL

To extract all members with their IDs and assignment sizes in one shot:

```sql
SELECT m.value.memberId.toString(), m.value.clientId.toString(), m.value.clientHost.toString(), m.value.sessionTimeout, m.value.rebalanceTimeout, m.value.assignment.@length
FROM io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata g
OBJECTS g.members.table m
```
Comment on lines +62 to +66

> **Note**: `g.members` is a `HashMap`. The `table` reference gives access to the underlying `HashMap$Node[]`. Each node has fields `key` (String = memberId) and `value` (MemberMetadata). The `m` alias iterates over the array entries.

## Expected Output Format

When dumping group members programmatically, the JSON output follows this structure (see `group_metadata_members_ds.json` for a real example — but use only sanitized reference data):

```json
{
"groupId": "example-group-id",
"memberCount": 3,
"members": [
{
"memberId": "consumer-1-example-uuid",
"clientId": "consumer-1",
"clientHost": "/10.0.0.1",
"sessionTimeout": 45000,
"rebalanceTimeout": 300000,
"groupInstanceId": null,
"subscription": {
"topics": ["topic-a", "topic-b"],
"userData": null
},
"assignmentLength": 42,
"assignment": [
"0x00", "0x01", "0x02", "0x03", "0x04"
]
},
{
"memberId": "consumer-2-example-uuid",
"clientId": "consumer-2",
"clientHost": "/10.0.0.2",
"sessionTimeout": 45000,
"rebalanceTimeout": 300000,
"groupInstanceId": "my-instance-id",
"subscription": {
"topics": ["topic-a"],
"userData": null
},
"assignmentLength": 24,
"assignment": [
"0x00", "0x01", "0x02"
]
},
{
"memberId": "consumer-3-example-uuid",
"clientId": "consumer-3",
"clientHost": "/10.0.0.3",
"sessionTimeout": 30000,
"rebalanceTimeout": 120000,
"groupInstanceId": null,
"subscription": {
"topics": ["topic-c", "topic-d", "topic-e"],
"userData": null
},
"assignmentLength": 60,
"assignment": [
"0x00", "0x01", "0x02", "0x03", "0x04",
"0x05", "0x06", "0x07"
]
}
]
}
```

## Decoding Partition Assignments

The `assignment` field in each member is a Kafka protocol-encoded `ConsumerMemberAssignment` byte array. A helper script is bundled at `decode_assignments.py`:

```bash
python3 .opencode/skills/group-metadata-members/decode_assignments.py <path-to-json>
```

It reads the JSON dump and produces a partitioned view like:

```
Group: my-group (3 members)

rdkafka-a1b2c3d4 (rdkafka-...):
topic-a: [0, 1, 2]
topic-b: [0, 1, 2, 3, 4]

--- Partition → Member map ---

topic-a:
partition- 0 => rdkafka-a1b2c3d4
partition- 1 => rdkafka-a1b2c3d4
partition- 2 => rdkafka-a1b2c3d4

--- Coverage check ---
topic-a: partitions 0-2 fully covered
topic-b: missing partitions [2, 3, 4] ← ORPHANED
```

The default input path is `group_metadata_members_ds.json` in the project root. Pass a different file as the first argument to override.

## Tips

- Use `mat-cli oql <heap-file> --query "SELECT * FROM io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata"` to verify the class is present in the heap.
- If `members` appears empty (`{}`) but you expect entries, the group may be in a `Dead` or `Empty` state — check `generationId` and protocol state.
107 changes: 107 additions & 0 deletions .opencode/skills/group-metadata-members/decode_assignments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""Decode Kafka ConsumerMemberAssignment bytes from group_metadata_members_ds.json
and produce a partition→member map (e.g. "topic-partition-3 => member1").

Format: version(2) + num_topics(4) + [topicLen(2) + topic(N) + num_parts(4) + [part(4)...]] + userDataLen(4)
"""

import json
import sys
from collections import defaultdict


def decode_assignment(hex_bytes: list[str]) -> dict[str, list[int]]:
"""Return {topic: [partition, ...]} decoded from a list of '0xNN' strings."""
raw = bytes(int(h, 16) for h in hex_bytes)
i = 0

def read_int(n, signed=True):
nonlocal i
v = int.from_bytes(raw[i:i + n], "big", signed=signed)
i += n
return v

version = read_int(2)
n_topics = read_int(4)
result = {}
for _ in range(n_topics):
t_len = read_int(2)
topic = raw[i:i + t_len].decode("utf-8")
i += t_len
n_parts = read_int(4)
partitions = [read_int(4) for _ in range(n_parts)]
result[topic] = partitions
return result


def short_member(member_id: str) -> str:
"""Return the first 8 chars after 'rdkafka-' for readability."""
prefix = "rdkafka-"
if member_id.startswith(prefix):
return "rdkafka-" + member_id[len(prefix):len(prefix) + 8]
return member_id[:16]


def main(path: str) -> None:
with open(path) as f:
data = json.load(f)

group_id = data.get("groupId", "unknown")
members = data["members"]

# tp_map: "topic::partition" -> [memberId, ...]
tp_map: dict[str, list[str]] = defaultdict(list)

print(f"Group: {group_id} ({len(members)} members)\n")

for m in members:
member_id = m["memberId"]
short = short_member(member_id)
assignment_hex = m.get("assignment", [])
if not assignment_hex:
print(f" {short}: <empty assignment>")
continue
try:
topics = decode_assignment(assignment_hex)
except Exception as e:
print(f" {short}: decode error — {e}")
continue

print(f" {short} ({member_id}):")
for topic, partitions in sorted(topics.items()):
print(f" {topic}: {sorted(partitions)}")
for p in partitions:
tp_map[f"{topic}::{p}"].append(short)

print("\n--- Partition → Member map ---")
# Group by topic for readability
by_topic: dict[str, dict[int, list[str]]] = defaultdict(dict)
for key, owners in tp_map.items():
topic, part = key.rsplit("::", 1)
by_topic[topic][int(part)] = owners

for topic in sorted(by_topic):
print(f"\n{topic}:")
for part in sorted(by_topic[topic]):
owners = by_topic[topic][part]
owner_str = ", ".join(owners) if owners else "<unassigned>"
flag = " *** MULTI-OWNER ***" if len(owners) > 1 else ""
print(f" partition-{part:3d} => {owner_str}{flag}")

# Report any gaps
print("\n--- Coverage check ---")
for topic in sorted(by_topic):
parts = sorted(by_topic[topic].keys())
if parts:
expected = list(range(parts[-1] + 1))
missing = [p for p in expected if p not in by_topic[topic]]
if missing:
print(f" {topic}: missing partitions {missing} ← ORPHANED")
else:
print(f" {topic}: partitions 0-{parts[-1]} fully covered")


if __name__ == "__main__":
path = sys.argv[1] if len(sys.argv) > 1 else \
"group_metadata_members_ds.json"
main(path)
Comment on lines +105 to +107
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ Keep commit subjects short, imperative, and descriptive. Existing history uses p

## Contributor Notes
Do not commit `target/` outputs or other generated artifacts. Prefer small, reviewable changes and update docs when build, packaging, or contributor workflows change. For MAT CLI work, keep the runtime headless: avoid adding `org.eclipse.mat.ui*` dependencies unless they are strictly required.

## Skills
- `group-metadata-members`: Retrieve and inspect members from an io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata object in a MAT heap dump using OQL queries and the inspect-object command.