Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ classifiers = [
]

dependencies = [
"textual>=0.40.0",
"textual>=3.0",
"psutil>=5.9.0",
]

Expand Down
188 changes: 118 additions & 70 deletions src/netshow/app.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import os
import time
from collections import deque
from typing import Any, Optional, Union, cast

import psutil
from rich.text import Text
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.reactive import reactive
from textual.timer import Timer
from textual.widgets import DataTable, Footer, Header, Input, Static
from textual.widgets import DataTable, Footer, Header, Input, Sparkline, Static

from .detail_screen import ConnectionDetailScreen
from .helpers import get_lsof_conns, get_psutil_conns
Expand All @@ -24,9 +26,7 @@ class NetshowApp(App):
"""Network connection monitoring application using Textual TUI."""

CSS = CSS
BINDINGS = cast(
list[Union[Binding, tuple[str, str], tuple[str, str, str]]], BASIC_KEYBINDINGS
)
BINDINGS = cast(list[Union[Binding, tuple[str, str], tuple[str, str, str]]], BASIC_KEYBINDINGS)

total_connections = reactive(0)
active_connections = reactive(0)
Expand All @@ -36,6 +36,7 @@ class NetshowApp(App):
sort_mode = reactive("default")
selected_interface = reactive("all")
show_emojis = reactive(True)
expand_ipv6 = reactive(False)

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
Expand All @@ -46,6 +47,7 @@ def __init__(self, **kwargs: Any) -> None:
self.title = "Netshow" # Will be updated with data source
self.debounce_timer: Optional[Timer] = None
self.available_interfaces = self._get_available_interfaces()
self.bandwidth_history: deque[float] = deque(maxlen=20) # Last 20 samples

def _get_available_interfaces(self) -> list:
"""Get list of available network interfaces."""
Expand All @@ -61,18 +63,11 @@ def compose(self) -> ComposeResult:
with Vertical():
with Container(id="stats_container"):
with Horizontal(id="metrics_row"):
yield Static(
"📊 Connections: 0", id="conn_metric", classes="metric"
)
yield Static("⚡ Active: 0", id="active_metric", classes="metric")
yield Static(
"👂 Listening: 0", id="listen_metric", classes="metric"
)
yield Static(
"🔥 Bandwidth: 0 B/s (all)",
id="bandwidth_metric",
classes="metric",
)
yield Static("📊 0", id="conn_metric", classes="metric")
yield Static("⚡ 0", id="active_metric", classes="metric")
yield Static("👂 0", id="listen_metric", classes="metric")
yield Static("🔥 0 B/s", id="bandwidth_metric", classes="metric bandwidth")
yield Sparkline([], id="bandwidth_spark")

with Container(id="filter_container"):
yield Input(
Expand All @@ -98,14 +93,18 @@ def on_mount(self) -> None:
filter_container.display = False

# Refresh at regular intervals
self.timer: Timer = self.set_interval(
REFRESH_INTERVAL, self.refresh_connections
)
self.timer: Timer = self.set_interval(REFRESH_INTERVAL, self.refresh_connections)
self.refresh_connections()

# Focus the table for keyboard navigation
table.focus()

def on_resize(self) -> None:
"""Handle terminal resize - update metrics display."""
self._update_metrics_display(
self.total_connections, self.active_connections, self.listening_connections
)

def refresh_connections(self, sort_only: bool = False) -> None:
"""Refresh connection data and update the display."""
if not sort_only:
Expand Down Expand Up @@ -163,6 +162,25 @@ def _fetch_connection_data(self) -> None:
source_name = "psutil" if using_root else "lsof"
self.title = f"Netshow ({source_name})"

def _truncate_addr(self, addr: str) -> str:
"""Truncate IPv6 addresses for compact display."""
if not addr or self.expand_ipv6:
return addr
# Check for IPv6 (contains multiple colons or brackets)
if addr.startswith("[") or addr.count(":") > 1:
# Extract port if present
if "]:" in addr:
ip, port = addr.rsplit(":", 1)
return f"[…]:{port}"
elif addr.startswith("["):
return "[…]"
# Bare IPv6 with port (ip:port where ip has multiple colons)
parts = addr.rsplit(":", 1)
if len(parts) == 2 and parts[0].count(":") >= 1:
return f"…:{parts[1]}"
return "…"
return addr

def _update_table_display(self) -> None:
"""Update the table display with current connection data."""
table = self.query_one("#connections_table", DataTable)
Expand All @@ -184,18 +202,15 @@ def _update_table_display(self) -> None:
if i >= table.row_count:
break

status_icon = self._get_status_icon(c["status"])
status_styled = self._get_status_styled(c["status"])
speed_indicator = self._get_speed_indicator(c)
status_text = (
f"{status_icon} {c['status']}" if status_icon else c["status"]
)
new_row = [
c["pid"],
c["friendly"],
c["proc"],
c["laddr"],
c["raddr"],
status_text,
self._truncate_addr(c["laddr"]),
self._truncate_addr(c["raddr"]),
status_styled,
speed_indicator,
]

Expand Down Expand Up @@ -225,18 +240,15 @@ def _update_table_display(self) -> None:
# Fall back to clear and rebuild for smaller datasets or size changes
table.clear()
for c in conns:
status_icon = self._get_status_icon(c["status"])
status_styled = self._get_status_styled(c["status"])
speed_indicator = self._get_speed_indicator(c)
status_text = (
f"{status_icon} {c['status']}" if status_icon else c["status"]
)
table.add_row(
c["pid"],
c["friendly"],
c["proc"],
c["laddr"],
c["raddr"],
status_text,
self._truncate_addr(c["laddr"]),
self._truncate_addr(c["raddr"]),
status_styled,
speed_indicator,
)

Expand Down Expand Up @@ -265,6 +277,25 @@ def _update_table_display(self) -> None:
if cursor_row < table.row_count and hasattr(table, "cursor_coordinate"):
table.cursor_coordinate = (cursor_row, 0) # type: ignore

def _get_status_styled(self, status: str) -> Text:
"""Get a Rich Text styled status with color coding (Solarized)."""
status_styles = {
"ESTABLISHED": ("bold #859900", "🚀"), # green
"LISTEN": ("bold #268bd2", "👂"), # blue
"TIME_WAIT": ("#b58900", "⏳"), # yellow
"CLOSE_WAIT": ("bold #dc322f", "⏸️"), # red
"SYN_SENT": ("#d33682", "📤"), # magenta
"SYN_RECV": ("#d33682", "📥"), # magenta
"FIN_WAIT1": ("#b58900", "🔄"), # yellow
"FIN_WAIT2": ("#b58900", "🔁"), # yellow
"CLOSING": ("#dc322f", "🔚"), # red
"LAST_ACK": ("#dc322f", "🏁"), # red
}
style, icon = status_styles.get(status, ("#586e75", "❓")) # base01
if self.show_emojis:
return Text(f"{icon} {status}", style=style)
return Text(status, style=style)

def _get_status_icon(self, status: str) -> str:
"""Get an appropriate icon for connection status."""
if not self.show_emojis:
Expand All @@ -283,9 +314,7 @@ def _get_status_icon(self, status: str) -> str:
}
return status_icons.get(status, "❓")

def _get_speed_indicator(
self, connection: Union[dict[str, str], ConnectionData]
) -> str:
def _get_speed_indicator(self, connection: Union[dict[str, str], ConnectionData]) -> str:
"""Generate a speed indicator based on connection characteristics."""
if not self.show_emojis:
status = connection.get("status", "")
Expand All @@ -311,58 +340,67 @@ def _update_metrics_display(self, total: int, active: int, listening: int) -> No
active_metric = self.query_one("#active_metric", Static)
listen_metric = self.query_one("#listen_metric", Static)
bandwidth_metric = self.query_one("#bandwidth_metric", Static)
bandwidth_spark = self.query_one("#bandwidth_spark", Sparkline)

# Check terminal width for adaptive display
width = self.size.width
compact = width < 100
show_spark = width >= 80

# Emoji prefixes based on toggle state
conn_prefix = "📊 " if self.show_emojis else ""
active_prefix = "⚡ " if self.show_emojis else ""
listen_prefix = "👂 " if self.show_emojis else ""
bandwidth_prefix = "🔥 " if self.show_emojis else ""
conn_icon = "📊 " if self.show_emojis else ""
active_icon = "⚡ " if self.show_emojis else ""
listen_icon = "👂 " if self.show_emojis else ""
bw_icon = "🔥 " if self.show_emojis else ""

# Get network I/O stats for bandwidth
total_bandwidth = 0.0
interface_label = self.selected_interface
try:
if self.selected_interface == "all":
net_io = psutil.net_io_counters()
interface_label = "all"
else:
net_io_per_nic = psutil.net_io_counters(pernic=True)
net_io_temp = net_io_per_nic.get(self.selected_interface)
interface_label = self.selected_interface
if not net_io_temp:
# Interface not found, fallback to all
interface_label = "all"
self.selected_interface = "all"
net_io = psutil.net_io_counters()
else:
net_io = net_io_temp

current_time = time.time()
if (
self.last_network_stats is not None
and self.last_stats_time is not None
):
time_diff = max(
0.1, current_time - self.last_stats_time
) # Avoid division by zero
bytes_sent_diff = max(
0, net_io.bytes_sent - self.last_network_stats.bytes_sent
)
bytes_recv_diff = max(
0, net_io.bytes_recv - self.last_network_stats.bytes_recv
)
if self.last_network_stats is not None and self.last_stats_time is not None:
time_diff = max(0.1, current_time - self.last_stats_time)
bytes_sent_diff = max(0, net_io.bytes_sent - self.last_network_stats.bytes_sent)
bytes_recv_diff = max(0, net_io.bytes_recv - self.last_network_stats.bytes_recv)
total_bandwidth = (bytes_sent_diff + bytes_recv_diff) / time_diff
bandwidth_text = f"{self._format_bytes(int(total_bandwidth))}/s ({interface_label})"
else:
bandwidth_text = f"0 B/s ({interface_label})"
self.last_network_stats = net_io
self.last_stats_time = current_time
except (AttributeError, OSError):
bandwidth_text = "N/A"
pass

conn_metric.update(f"{conn_prefix}Connections: {total}")
active_metric.update(f"{active_prefix}Active: {active}")
listen_metric.update(f"{listen_prefix}Listening: {listening}")
bandwidth_metric.update(f"{bandwidth_prefix}Bandwidth: {bandwidth_text}")
# Update sparkline
self.bandwidth_history.append(total_bandwidth)
bandwidth_spark.data = list(self.bandwidth_history)
bandwidth_spark.display = show_spark

# Adaptive labels based on width
if compact:
conn_metric.update(f"{conn_icon}{total}")
active_metric.update(f"{active_icon}{active}")
listen_metric.update(f"{listen_icon}{listening}")
bandwidth_metric.update(f"{bw_icon}{self._format_bytes(int(total_bandwidth))}/s")
else:
conn_metric.update(f"{conn_icon}Conn: {total}")
active_metric.update(f"{active_icon}Active: {active}")
listen_metric.update(f"{listen_icon}Listen: {listening}")
iface = f" ({interface_label})" if width >= 120 else ""
bandwidth_metric.update(
f"{bw_icon}{self._format_bytes(int(total_bandwidth))}/s{iface}"
)
except Exception:
pass # Gracefully handle missing widgets
pass

def _format_bytes(self, bytes_val: int) -> str:
"""Format bytes into human readable format."""
Expand All @@ -380,11 +418,16 @@ def _format_bytes(self, bytes_val: int) -> str:

def _get_selected_connection_data(self, row_data: tuple) -> ConnectionData:
"""Convert row data tuple to ConnectionData dict."""
# Extract status without icon (remove first 2 characters: icon + space)
status_with_icon = row_data[5]
clean_status = (
status_with_icon[2:] if len(status_with_icon) > 2 else status_with_icon
)
# Extract status - handle both Rich Text objects and plain strings
status_cell = row_data[5]
if isinstance(status_cell, Text):
# Rich Text object - extract plain text and remove emoji prefix
status_text = status_cell.plain
# Remove emoji prefix if present (emoji + space = ~3 chars)
Comment on lines +424 to +426
Copy link

Copilot AI Dec 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states 'emoji + space = ~3 chars' but the code uses split(' ', 1)[-1] which only splits on a single space character. Emojis can be 1-4 bytes in UTF-8, but the code doesn't account for character length. The comment should be updated to accurately describe the string splitting operation being performed.

Suggested change
# Rich Text object - extract plain text and remove emoji prefix
status_text = status_cell.plain
# Remove emoji prefix if present (emoji + space = ~3 chars)
# Rich Text object - extract plain text and remove emoji/icon prefix
status_text = status_cell.plain
# Remove leading emoji/icon prefix and following space by taking text after the first space

Copilot uses AI. Check for mistakes.
clean_status = status_text.split(" ", 1)[-1] if " " in status_text else status_text
else:
# Plain string - remove first 2 characters (icon + space)
clean_status = status_cell[2:] if len(status_cell) > 2 else status_cell

return ConnectionData(
pid=row_data[0],
Expand Down Expand Up @@ -512,6 +555,11 @@ def action_toggle_emojis(self) -> None:
self.total_connections, self.active_connections, self.listening_connections
)

def action_toggle_ipv6(self) -> None:
"""Toggle IPv6 address expansion."""
self.expand_ipv6 = not self.expand_ipv6
self._update_table_display()

def _update_table_columns(self) -> None:
"""Update table column headers based on emoji setting."""
table = self.query_one("#connections_table", DataTable)
Expand Down
5 changes: 5 additions & 0 deletions src/netshow/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""CLI entry point for NetShow."""

import os
import sys

from .app import NetshowApp


def main() -> None:
"""Main CLI entry point."""
# Ensure truecolor support for Solarized theme
if "COLORTERM" not in os.environ:
os.environ["COLORTERM"] = "truecolor"

try:
NetshowApp().run()
except KeyboardInterrupt:
Expand Down
Loading
Loading