Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions src/kimi_cli/ui/shell/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from kimi_cli.ui.shell.metacmd import get_meta_commands
from kimi_cli.utils.clipboard import is_clipboard_available
from kimi_cli.utils.logging import logger
from kimi_cli.utils.message import LARGE_PASTE_LINE_THRESHOLD
from kimi_cli.utils.string import random_string

PROMPT_SYMBOL = "✨"
Expand Down Expand Up @@ -393,8 +394,11 @@ def toast(message: str, duration: float = 5.0) -> None:
_toast_queue.put_nowait((message, duration))


# Matches both image and text placeholders:
# [image:id,640x480] or [text:id,60 lines]
_ATTACHMENT_PLACEHOLDER_RE = re.compile(
r"\[(?P<type>image):(?P<id>[a-zA-Z0-9_\-\.]+)(?:,(?P<width>\d+)x(?P<height>\d+))?\]"
r"\[(?P<type>image|text):(?P<id>[a-zA-Z0-9_\-\.]+)"
r"(?:,(?P<width>\d+)x(?P<height>\d+)|,(?P<line_count>\d+) lines)?\]"
)


Expand Down Expand Up @@ -462,16 +466,53 @@ def _insert_newline(event: KeyPressEvent) -> None:

shortcut_hints.append("ctrl-j: newline")

def _delete_placeholder_at_cursor(buff, is_backspace: bool) -> bool:
"""Delete placeholder at cursor position if present, returning True if deleted."""
doc = buff.document
cursor = doc.cursor_position

for match in _ATTACHMENT_PLACEHOLDER_RE.finditer(doc.text):
start, end = match.span()

# Backspace: delete if cursor is inside OR right after placeholder
# Delete: delete if cursor is inside OR right before placeholder
should_delete = start < cursor <= end if is_backspace else start <= cursor < end

if should_delete:
attachment_id = match.group("id")
buff.text = doc.text[:start] + doc.text[end:]
buff.cursor_position = start
self._attachment_parts.pop(attachment_id, None)
return True

return False

@_kb.add("backspace", eager=True)
def _smart_backspace(event: KeyPressEvent) -> None:
"""Delete entire placeholder if cursor is within/after one, else backspace normally."""
if not _delete_placeholder_at_cursor(event.current_buffer, is_backspace=True):
event.current_buffer.delete_before_cursor(1)
self._last_buffer_text = event.current_buffer.text

@_kb.add("delete", eager=True)
def _smart_delete(event: KeyPressEvent) -> None:
"""Delete entire placeholder if cursor is within/before one, else delete normally."""
if not _delete_placeholder_at_cursor(event.current_buffer, is_backspace=False):
event.current_buffer.delete(1)
self._last_buffer_text = event.current_buffer.text

if is_clipboard_available():

@_kb.add("c-v", eager=True)
def _paste(event: KeyPressEvent) -> None:
if self._try_paste_image(event):
return
# Large text detection happens in on_text_insert handler
clipboard_data = event.app.clipboard.get_data()
event.current_buffer.paste_clipboard_data(clipboard_data)

shortcut_hints.append("ctrl-v: paste")
shortcut_hints.append("ctrl-v / cmd-v: paste")

clipboard = PyperclipClipboard()
else:
clipboard = None
Expand All @@ -498,6 +539,16 @@ def _switch_thinking(event: KeyPressEvent) -> None:
bottom_toolbar=self._render_bottom_toolbar,
)

# This works for any paste method (Cmd-V, Ctrl-V, right-click, etc.)
self._pending_large_paste_replacement = False
self._last_buffer_text = ""

def _on_text_insert_handler(buffer):
self._check_and_replace_large_paste(buffer)
self._last_buffer_text = buffer.document.text

self._session.default_buffer.on_text_insert += _on_text_insert_handler

self._status_refresh_task: asyncio.Task | None = None
self._current_toast: str | None = None
self._current_toast_duration: float = 0.0
Expand Down Expand Up @@ -599,15 +650,54 @@ def _try_paste_image(self, event: KeyPressEvent) -> bool:
event.app.invalidate()
return True

def _check_and_replace_large_paste(self, buffer) -> None:
"""Check if large text was pasted and replace only the pasted part with placeholder."""
if self._pending_large_paste_replacement:
return

current_text = buffer.document.text
previous_text = self._last_buffer_text

# Only handle appends (includes empty buffer case since "".startswith("") is True)
if not current_text.startswith(previous_text):
return

inserted_text = current_text[len(previous_text) :]
inserted_line_count = inserted_text.count("\n") + 1

if inserted_line_count <= LARGE_PASTE_LINE_THRESHOLD:
return

try:
self._pending_large_paste_replacement = True

paste_id = random_string(8)
self._attachment_parts[paste_id] = TextPart(text=inserted_text)

logger.debug(
"Detected large paste: {paste_id}, {line_count} lines",
paste_id=paste_id,
line_count=inserted_line_count,
)

placeholder = f"[text:{paste_id},{inserted_line_count} lines]"
buffer.text = previous_text + placeholder
buffer.cursor_position = len(buffer.text)

finally:
self._pending_large_paste_replacement = False

async def prompt(self) -> UserInput:
with patch_stdout():
command = str(await self._session.prompt_async()).strip()
command = command.replace("\x00", "") # just in case null bytes are somehow inserted
self._append_history_entry(command)

# Parse rich content parts
self._last_buffer_text = ""

content: list[ContentPart] = []
remaining_command = command

while match := _ATTACHMENT_PLACEHOLDER_RE.search(remaining_command):
start, end = match.span()
if start > 0:
Expand Down
5 changes: 4 additions & 1 deletion src/kimi_cli/ui/shell/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ async def replay_recent_history(history: Sequence[Message]) -> None:

for run in runs:
wire = Wire()
console.print(f"{getpass.getuser()}{PROMPT_SYMBOL} {message_stringify(run.user_message)}")
console.print(
f"{getpass.getuser()}{PROMPT_SYMBOL} "
f"{message_stringify(run.user_message, context='replay')}"
)
ui_task = asyncio.create_task(
visualize(wire.ui_side, initial_status=StatusSnapshot(context_usage=0.0))
)
Expand Down
21 changes: 17 additions & 4 deletions src/kimi_cli/utils/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from kosong.base.message import Message, TextPart

# Collapse text pastes with more than this many lines
LARGE_PASTE_LINE_THRESHOLD = 50


def message_extract_text(message: Message) -> str:
"""Extract text from a message."""
Expand All @@ -8,15 +11,25 @@ def message_extract_text(message: Message) -> str:
return "\n".join(part.text for part in message.content if isinstance(part, TextPart))


def message_stringify(message: Message) -> str:
"""Get a string representation of a message."""
def message_stringify(message: Message, context: str = "default") -> str:
"""Return a string view of a message, collapsing large pastes outside replay context."""

def _maybe_collapse(text: str) -> str:
"""Collapse text if it exceeds threshold (except in replay context)."""
if context == "replay":
return text
line_count = text.count("\n") + 1
if line_count > LARGE_PASTE_LINE_THRESHOLD:
return f"[pasted {line_count} lines]"
return text

parts: list[str] = []
if isinstance(message.content, str):
parts.append(message.content)
parts.append(_maybe_collapse(message.content))
else:
for part in message.content:
if isinstance(part, TextPart):
parts.append(part.text)
parts.append(_maybe_collapse(part.text))
else:
parts.append(f"[{part.type}]")
return "".join(parts)
18 changes: 18 additions & 0 deletions tests/test_message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,21 @@ def test_extract_text_from_empty_string():
result = message_extract_text(message)

assert result == ""


def test_stringify_collapses_large_text_by_default():
"""Test that large text is collapsed in default context."""
large_text = "\n".join(["line"] * 60)
message = Message(role="user", content=large_text)
result = message_stringify(message)

assert result == "[pasted 60 lines]"


def test_stringify_respects_replay_context():
"""Test that large text is shown in full in replay context."""
large_text = "\n".join(["line"] * 60)
message = Message(role="user", content=large_text)
result = message_stringify(message, context="replay")

assert result == large_text
47 changes: 47 additions & 0 deletions tests/test_prompt_placeholders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Unit tests for prompt placeholder deletion logic (cursor boundary conditions)."""

import re

# The regex pattern from prompt.py
_ATTACHMENT_PLACEHOLDER_RE = re.compile(
r"\[(?P<type>image|text):(?P<id>[a-zA-Z0-9_\-\.]+)"
r"(?:,(?P<width>\d+)x(?P<height>\d+)|,(?P<line_count>\d+) lines)?\]"
)


class TestCursorBoundaryLogic:
"""Test cursor boundary conditions for placeholder deletion (the main bug we fixed)."""

def test_backspace_boundary_conditions(self):
"""Test backspace deletes when cursor is after placeholder, not before."""
text = "prefix [text:abc12345,60 lines]"
match = _ATTACHMENT_PLACEHOLDER_RE.search(text)
assert match is not None
start, end = match.span()

# Backspace with cursor right after placeholder should delete (the bug fix)
cursor = end
should_delete = start < cursor <= end
assert should_delete

# Backspace with cursor before placeholder should not delete
cursor = start
should_delete = start < cursor <= end
assert not should_delete

def test_delete_boundary_conditions(self):
"""Test delete key deletes when cursor is before placeholder, not after."""
text = "prefix [text:abc12345,60 lines]"
match = _ATTACHMENT_PLACEHOLDER_RE.search(text)
assert match is not None
start, end = match.span()

# Delete with cursor before placeholder should delete
cursor = start
should_delete = start <= cursor < end
assert should_delete

# Delete with cursor after placeholder should not delete
cursor = end
should_delete = start <= cursor < end
assert not should_delete