diff --git a/.opencode/skills/group-metadata-members/SKILL.md b/.opencode/skills/group-metadata-members/SKILL.md new file mode 100644 index 00000000..43069e0d --- /dev/null +++ b/.opencode/skills/group-metadata-members/SKILL.md @@ -0,0 +1,166 @@ +--- +name: group-metadata-members +description: Retrieve and inspect members from an io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata object in a MAT heap dump using OQL queries and the inspect-object command. +license: Apache-2.0 +compatibility: opencode +metadata: + mat-cli: required +--- + +## Overview + +This skill helps you extract consumer group membership information from a heap dump containing `io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata` objects. + +## Finding GroupMetadata Objects + +Use the `instances` command to list all `GroupMetadata` objects and identify the one you need: + +```bash +mat-cli instances --class "io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata" +``` + +The output shows object addresses — note the address of the group you want to inspect. + +## Inspecting GroupMetadata Fields + +Once you have the object address, inspect its core fields: + +```bash +mat-cli inspect-object --address
\ + --field-paths groupId,generationId,protocolType,protocol,members +``` + +- `groupId` — the consumer group ID (String) +- `generationId` — generation counter (int) +- `protocolType` — protocol type (String, e.g. "consumer") +- `protocol` — the current protocol (String, e.g. "range") +- `members` — a `Map` keyed by member ID + +## Extracting Member Details + +`members` is a `HashMap`. To inspect an individual member, resolve the member ID key from the map and inspect its `MemberMetadata`: + +```bash +mat-cli inspect-object --address \ + --field-paths memberId,clientId,clientHost,sessionTimeout,rebalanceTimeout,groupInstanceId,subscription,assignment +``` + +Key `MemberMetadata` fields: +- `memberId` — unique member identifier (String) +- `clientId` — client ID (String) +- `clientHost` — host the client is running on (String) +- `sessionTimeout` — session timeout in ms (int) +- `rebalanceTimeout` — rebalance timeout in ms (int) +- `groupInstanceId` — static group instance ID, may be null (String) +- `subscription` — subscribed topics (usually a `SubscriptionData` or similar) +- `assignment` — byte array of partition assignment bytes (`byte[]`) + +## Dumping All Members via OQL + +To extract all members with their IDs and assignment sizes in one shot: + +```sql +SELECT m.value.memberId.toString(), m.value.clientId.toString(), m.value.clientHost.toString(), m.value.sessionTimeout, m.value.rebalanceTimeout, m.value.assignment.@length +FROM io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata g +OBJECTS g.members.table m +``` + +> **Note**: `g.members` is a `HashMap`. The `table` reference gives access to the underlying `HashMap$Node[]`. Each node has fields `key` (String = memberId) and `value` (MemberMetadata). The `m` alias iterates over the array entries. + +## Expected Output Format + +When dumping group members programmatically, the JSON output follows this structure (see `group_metadata_members_ds.json` for a real example — but use only sanitized reference data): + +```json +{ + "groupId": "example-group-id", + "memberCount": 3, + "members": [ + { + "memberId": "consumer-1-example-uuid", + "clientId": "consumer-1", + "clientHost": "/10.0.0.1", + "sessionTimeout": 45000, + "rebalanceTimeout": 300000, + "groupInstanceId": null, + "subscription": { + "topics": ["topic-a", "topic-b"], + "userData": null + }, + "assignmentLength": 42, + "assignment": [ + "0x00", "0x01", "0x02", "0x03", "0x04" + ] + }, + { + "memberId": "consumer-2-example-uuid", + "clientId": "consumer-2", + "clientHost": "/10.0.0.2", + "sessionTimeout": 45000, + "rebalanceTimeout": 300000, + "groupInstanceId": "my-instance-id", + "subscription": { + "topics": ["topic-a"], + "userData": null + }, + "assignmentLength": 24, + "assignment": [ + "0x00", "0x01", "0x02" + ] + }, + { + "memberId": "consumer-3-example-uuid", + "clientId": "consumer-3", + "clientHost": "/10.0.0.3", + "sessionTimeout": 30000, + "rebalanceTimeout": 120000, + "groupInstanceId": null, + "subscription": { + "topics": ["topic-c", "topic-d", "topic-e"], + "userData": null + }, + "assignmentLength": 60, + "assignment": [ + "0x00", "0x01", "0x02", "0x03", "0x04", + "0x05", "0x06", "0x07" + ] + } + ] +} +``` + +## Decoding Partition Assignments + +The `assignment` field in each member is a Kafka protocol-encoded `ConsumerMemberAssignment` byte array. A helper script is bundled at `decode_assignments.py`: + +```bash +python3 .opencode/skills/group-metadata-members/decode_assignments.py +``` + +It reads the JSON dump and produces a partitioned view like: + +``` +Group: my-group (3 members) + + rdkafka-a1b2c3d4 (rdkafka-...): + topic-a: [0, 1, 2] + topic-b: [0, 1, 2, 3, 4] + +--- Partition → Member map --- + +topic-a: + partition- 0 => rdkafka-a1b2c3d4 + partition- 1 => rdkafka-a1b2c3d4 + partition- 2 => rdkafka-a1b2c3d4 + +--- Coverage check --- + topic-a: partitions 0-2 fully covered + topic-b: missing partitions [2, 3, 4] ← ORPHANED +``` + +The default input path is `group_metadata_members_ds.json` in the project root. Pass a different file as the first argument to override. + +## Tips + +- Use `mat-cli oql --query "SELECT * FROM io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata"` to verify the class is present in the heap. +- If `members` appears empty (`{}`) but you expect entries, the group may be in a `Dead` or `Empty` state — check `generationId` and protocol state. diff --git a/.opencode/skills/group-metadata-members/decode_assignments.py b/.opencode/skills/group-metadata-members/decode_assignments.py new file mode 100644 index 00000000..d7999862 --- /dev/null +++ b/.opencode/skills/group-metadata-members/decode_assignments.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +"""Decode Kafka ConsumerMemberAssignment bytes from group_metadata_members_ds.json +and produce a partition→member map (e.g. "topic-partition-3 => member1"). + +Format: version(2) + num_topics(4) + [topicLen(2) + topic(N) + num_parts(4) + [part(4)...]] + userDataLen(4) +""" + +import json +import sys +from collections import defaultdict + + +def decode_assignment(hex_bytes: list[str]) -> dict[str, list[int]]: + """Return {topic: [partition, ...]} decoded from a list of '0xNN' strings.""" + raw = bytes(int(h, 16) for h in hex_bytes) + i = 0 + + def read_int(n, signed=True): + nonlocal i + v = int.from_bytes(raw[i:i + n], "big", signed=signed) + i += n + return v + + version = read_int(2) + n_topics = read_int(4) + result = {} + for _ in range(n_topics): + t_len = read_int(2) + topic = raw[i:i + t_len].decode("utf-8") + i += t_len + n_parts = read_int(4) + partitions = [read_int(4) for _ in range(n_parts)] + result[topic] = partitions + return result + + +def short_member(member_id: str) -> str: + """Return the first 8 chars after 'rdkafka-' for readability.""" + prefix = "rdkafka-" + if member_id.startswith(prefix): + return "rdkafka-" + member_id[len(prefix):len(prefix) + 8] + return member_id[:16] + + +def main(path: str) -> None: + with open(path) as f: + data = json.load(f) + + group_id = data.get("groupId", "unknown") + members = data["members"] + + # tp_map: "topic::partition" -> [memberId, ...] + tp_map: dict[str, list[str]] = defaultdict(list) + + print(f"Group: {group_id} ({len(members)} members)\n") + + for m in members: + member_id = m["memberId"] + short = short_member(member_id) + assignment_hex = m.get("assignment", []) + if not assignment_hex: + print(f" {short}: ") + continue + try: + topics = decode_assignment(assignment_hex) + except Exception as e: + print(f" {short}: decode error — {e}") + continue + + print(f" {short} ({member_id}):") + for topic, partitions in sorted(topics.items()): + print(f" {topic}: {sorted(partitions)}") + for p in partitions: + tp_map[f"{topic}::{p}"].append(short) + + print("\n--- Partition → Member map ---") + # Group by topic for readability + by_topic: dict[str, dict[int, list[str]]] = defaultdict(dict) + for key, owners in tp_map.items(): + topic, part = key.rsplit("::", 1) + by_topic[topic][int(part)] = owners + + for topic in sorted(by_topic): + print(f"\n{topic}:") + for part in sorted(by_topic[topic]): + owners = by_topic[topic][part] + owner_str = ", ".join(owners) if owners else "" + flag = " *** MULTI-OWNER ***" if len(owners) > 1 else "" + print(f" partition-{part:3d} => {owner_str}{flag}") + + # Report any gaps + print("\n--- Coverage check ---") + for topic in sorted(by_topic): + parts = sorted(by_topic[topic].keys()) + if parts: + expected = list(range(parts[-1] + 1)) + missing = [p for p in expected if p not in by_topic[topic]] + if missing: + print(f" {topic}: missing partitions {missing} ← ORPHANED") + else: + print(f" {topic}: partitions 0-{parts[-1]} fully covered") + + +if __name__ == "__main__": + path = sys.argv[1] if len(sys.argv) > 1 else \ + "group_metadata_members_ds.json" + main(path) diff --git a/AGENTS.md b/AGENTS.md index f655527c..5b9aece0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -26,3 +26,6 @@ Keep commit subjects short, imperative, and descriptive. Existing history uses p ## Contributor Notes Do not commit `target/` outputs or other generated artifacts. Prefer small, reviewable changes and update docs when build, packaging, or contributor workflows change. For MAT CLI work, keep the runtime headless: avoid adding `org.eclipse.mat.ui*` dependencies unless they are strictly required. + +## Skills +- `group-metadata-members`: Retrieve and inspect members from an io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata object in a MAT heap dump using OQL queries and the inspect-object command.