Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/chappe/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,72 @@ def share_velocity_analysis(
}


def compare_channels(
channel_posts: dict[str, list[dict[str, Any]]],
*,
by: str = "forwards",
limit: int = 5,
) -> dict[str, Any]:
"""Cross-channel leaderboard.

For each channel, returns the top `limit` posts ranked by `by`. Also
produces a single combined leaderboard (top posts across all channels) and
a summary that names the forward-rate leader and the raw-metric leader.
"""
key = "engagement_score" if by == "engagement" else by
per_channel: dict[str, dict[str, Any]] = {}
combined: list[dict[str, Any]] = []
ratio_by_channel: dict[str, float] = {}

for channel, posts in channel_posts.items():
ranked = rank_posts(posts, by=by, limit=limit)
per_channel[channel] = {
"posts_available": len(posts),
"top_posts": ranked,
}
for post in ranked:
combined.append({"channel": channel, **post})
rates = [
float(enrich_post(post).get("forward_rate") or 0)
for post in posts
if int(post.get("views") or 0) > 0
]
if rates:
ratio_by_channel[channel] = round(sum(rates) / len(rates), 6)

combined.sort(key=lambda row: row.get(key) or 0, reverse=True)

fwd_ratio_winner = None
if ratio_by_channel:
channel, rate = max(ratio_by_channel.items(), key=lambda kv: kv[1])
fwd_ratio_winner = {"channel": channel, "mean_forward_rate": rate}

raw_winner = None
if combined:
top = combined[0]
raw_winner = {
"channel": top.get("channel"),
"metric": key,
"value": top.get(key) or 0,
"post_id": top.get("id"),
"link": top.get("link"),
}

return {
"by": key,
"limit": limit,
"per_channel": per_channel,
"combined_leaderboard": combined[: limit * max(len(channel_posts), 1)],
"summary": {
"channels_compared": len(channel_posts),
"posts_compared": sum(len(p) for p in channel_posts.values()),
"by_forward_ratio_winner": fwd_ratio_winner,
"by_raw_metric_winner": raw_winner,
"mean_forward_rate_by_channel": ratio_by_channel,
},
}


def find_outliers(posts: list[dict[str, Any]], *, limit: int = 20) -> list[dict[str, Any]]:
enriched = [enrich_post(post) for post in posts]
scores = [post["engagement_score"] for post in enriched]
Expand Down
56 changes: 56 additions & 0 deletions src/chappe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from . import __version__
from .agents import agent_installation_status, available_hosts, install_agent_assets
from .analytics import (
compare_channels,
filter_posts_by_period,
find_outliers,
generate_ideas,
Expand Down Expand Up @@ -1360,6 +1361,61 @@ def agent_context(
briefing(ctx, channel=channel, period=period, budget=budget)


@app.command("compare")
def compare(
ctx: typer.Context,
channels: list[str] = typer.Argument(..., help="Two or more channel handles to compare."),
by: str = typer.Option("forwards", "--by", help="Metric: forwards, views, replies, reactions, engagement."),
limit: int = typer.Option(5, "--limit", help="Top posts per channel."),
period: str = typer.Option("365d", "--period", help="Date window applied to each channel."),
) -> None:
def run():
allowed = {"forwards", "views", "replies", "reactions", "engagement", "engagement_score"}
if by not in allowed:
raise ChappeError(
f"Unsupported metric: {by}",
ExitCode.USAGE_ERROR,
details={"allowed": sorted(allowed)},
)
if len(channels) < 2:
raise ChappeError(
"compare requires at least two channels.",
ExitCode.USAGE_ERROR,
details={"channels_given": channels},
)
store = _store(ctx)
channel_posts: dict[str, list[dict[str, Any]]] = {}
metric_quality: dict[str, dict[str, Any]] = {}
unsynced: list[str] = []
for channel in channels:
posts = filter_posts_by_period(store.list_posts(channel, limit=5000), period)
channel_posts[channel] = posts
metric_quality[channel] = _ranking_metric_quality(posts, by)
if not posts:
unsynced.append(channel)
result = compare_channels(channel_posts, by=by, limit=limit)
for channel, quality in metric_quality.items():
if channel in result["per_channel"]:
result["per_channel"][channel]["metric_quality"] = quality
if unsynced:
next_commands = [f"chappe sync {channel} --limit 100 --comments" for channel in unsynced]
else:
next_commands = [f"chappe briefing {channel}" for channel in channels[:3]]
_emit(
ctx,
{
"ok": True,
"channels": channels,
"period": period,
**result,
"unsynced_channels": unsynced,
"next_commands": next_commands,
},
)

_handle(ctx, run)


@draft_app.command("create")
def draft_create(ctx: typer.Context, channel: str, file: Path = typer.Option(..., "--file")) -> None:
def run():
Expand Down
74 changes: 74 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,80 @@ def test_briefing_includes_data_quality_and_contract(tmp_path):
assert '"commented_posts_available": 1' in result.stdout


def test_compare_requires_at_least_two_channels(tmp_path):
result = runner.invoke(
app,
["compare", "@nn_for_science"],
env={"CHAPPE_HOME": str(tmp_path)},
)
assert result.exit_code != 0
assert "at least two channels" in result.stdout.lower() or "at least two channels" in result.stderr.lower()


def test_compare_returns_per_channel_top_and_combined(tmp_path):
store = Store(tmp_path / ".local" / "share" / "chappe" / "chappe.db")
store.upsert_posts(
"@a",
[
{
"id": "1",
"date": "2026-01-01T00:00:00+00:00",
"views": 1000,
"forwards": 200,
"link": "https://t.me/a/1",
}
],
)
store.upsert_posts(
"@b",
[
{
"id": "1",
"date": "2026-01-01T00:00:00+00:00",
"views": 5000,
"forwards": 500,
"link": "https://t.me/b/1",
}
],
)
result = runner.invoke(
app,
["compare", "@a", "@b", "--by", "forwards", "--limit", "1"],
env={"CHAPPE_HOME": str(tmp_path)},
)
assert result.exit_code == 0
assert '"@a"' in result.stdout
assert '"@b"' in result.stdout
assert '"combined_leaderboard":' in result.stdout
assert '"by_forward_ratio_winner":' in result.stdout
assert '"by_raw_metric_winner":' in result.stdout
assert '"@a"' in result.stdout # higher forward rate
assert '"value": 500' in result.stdout # raw forwards winner


def test_compare_emits_sync_next_command_when_channel_unsynced(tmp_path):
Store(tmp_path / ".local" / "share" / "chappe" / "chappe.db").upsert_posts(
"@a",
[
{
"id": "1",
"date": "2026-01-01T00:00:00+00:00",
"views": 100,
"forwards": 10,
}
],
)
result = runner.invoke(
app,
["compare", "@a", "@neverseen"],
env={"CHAPPE_HOME": str(tmp_path)},
)
assert result.exit_code == 0
assert '"unsynced_channels":' in result.stdout
assert "@neverseen" in result.stdout
assert "chappe sync @neverseen" in result.stdout


def test_posts_top_warns_when_metric_is_all_zero(tmp_path):
Store(tmp_path / ".local" / "share" / "chappe" / "chappe.db").upsert_posts(
"@nn_for_science",
Expand Down