diff --git a/src/kimi_cli/ui/shell/prompt.py b/src/kimi_cli/ui/shell/prompt.py index ea6a276c..7dac3805 100644 --- a/src/kimi_cli/ui/shell/prompt.py +++ b/src/kimi_cli/ui/shell/prompt.py @@ -41,6 +41,7 @@ from kimi_cli.ui.shell.metacmd import get_meta_commands from kimi_cli.utils.clipboard import is_clipboard_available from kimi_cli.utils.logging import logger +from kimi_cli.utils.message import LARGE_PASTE_LINE_THRESHOLD from kimi_cli.utils.string import random_string PROMPT_SYMBOL = "✨" @@ -393,8 +394,11 @@ def toast(message: str, duration: float = 5.0) -> None: _toast_queue.put_nowait((message, duration)) +# Matches both image and text placeholders: +# [image:id,640x480] or [text:id,60 lines] _ATTACHMENT_PLACEHOLDER_RE = re.compile( - r"\[(?Pimage):(?P[a-zA-Z0-9_\-\.]+)(?:,(?P\d+)x(?P\d+))?\]" + r"\[(?Pimage|text):(?P[a-zA-Z0-9_\-\.]+)" + r"(?:,(?P\d+)x(?P\d+)|,(?P\d+) lines)?\]" ) @@ -462,16 +466,53 @@ def _insert_newline(event: KeyPressEvent) -> None: shortcut_hints.append("ctrl-j: newline") + def _delete_placeholder_at_cursor(buff, is_backspace: bool) -> bool: + """Delete placeholder at cursor position if present, returning True if deleted.""" + doc = buff.document + cursor = doc.cursor_position + + for match in _ATTACHMENT_PLACEHOLDER_RE.finditer(doc.text): + start, end = match.span() + + # Backspace: delete if cursor is inside OR right after placeholder + # Delete: delete if cursor is inside OR right before placeholder + should_delete = start < cursor <= end if is_backspace else start <= cursor < end + + if should_delete: + attachment_id = match.group("id") + buff.text = doc.text[:start] + doc.text[end:] + buff.cursor_position = start + self._attachment_parts.pop(attachment_id, None) + return True + + return False + + @_kb.add("backspace", eager=True) + def _smart_backspace(event: KeyPressEvent) -> None: + """Delete entire placeholder if cursor is within/after one, else backspace normally.""" + if not _delete_placeholder_at_cursor(event.current_buffer, is_backspace=True): + event.current_buffer.delete_before_cursor(1) + self._last_buffer_text = event.current_buffer.text + + @_kb.add("delete", eager=True) + def _smart_delete(event: KeyPressEvent) -> None: + """Delete entire placeholder if cursor is within/before one, else delete normally.""" + if not _delete_placeholder_at_cursor(event.current_buffer, is_backspace=False): + event.current_buffer.delete(1) + self._last_buffer_text = event.current_buffer.text + if is_clipboard_available(): @_kb.add("c-v", eager=True) def _paste(event: KeyPressEvent) -> None: if self._try_paste_image(event): return + # Large text detection happens in on_text_insert handler clipboard_data = event.app.clipboard.get_data() event.current_buffer.paste_clipboard_data(clipboard_data) - shortcut_hints.append("ctrl-v: paste") + shortcut_hints.append("ctrl-v / cmd-v: paste") + clipboard = PyperclipClipboard() else: clipboard = None @@ -498,6 +539,16 @@ def _switch_thinking(event: KeyPressEvent) -> None: bottom_toolbar=self._render_bottom_toolbar, ) + # This works for any paste method (Cmd-V, Ctrl-V, right-click, etc.) + self._pending_large_paste_replacement = False + self._last_buffer_text = "" + + def _on_text_insert_handler(buffer): + self._check_and_replace_large_paste(buffer) + self._last_buffer_text = buffer.document.text + + self._session.default_buffer.on_text_insert += _on_text_insert_handler + self._status_refresh_task: asyncio.Task | None = None self._current_toast: str | None = None self._current_toast_duration: float = 0.0 @@ -599,15 +650,54 @@ def _try_paste_image(self, event: KeyPressEvent) -> bool: event.app.invalidate() return True + def _check_and_replace_large_paste(self, buffer) -> None: + """Check if large text was pasted and replace only the pasted part with placeholder.""" + if self._pending_large_paste_replacement: + return + + current_text = buffer.document.text + previous_text = self._last_buffer_text + + # Only handle appends (includes empty buffer case since "".startswith("") is True) + if not current_text.startswith(previous_text): + return + + inserted_text = current_text[len(previous_text) :] + inserted_line_count = inserted_text.count("\n") + 1 + + if inserted_line_count <= LARGE_PASTE_LINE_THRESHOLD: + return + + try: + self._pending_large_paste_replacement = True + + paste_id = random_string(8) + self._attachment_parts[paste_id] = TextPart(text=inserted_text) + + logger.debug( + "Detected large paste: {paste_id}, {line_count} lines", + paste_id=paste_id, + line_count=inserted_line_count, + ) + + placeholder = f"[text:{paste_id},{inserted_line_count} lines]" + buffer.text = previous_text + placeholder + buffer.cursor_position = len(buffer.text) + + finally: + self._pending_large_paste_replacement = False + async def prompt(self) -> UserInput: with patch_stdout(): command = str(await self._session.prompt_async()).strip() command = command.replace("\x00", "") # just in case null bytes are somehow inserted self._append_history_entry(command) - # Parse rich content parts + self._last_buffer_text = "" + content: list[ContentPart] = [] remaining_command = command + while match := _ATTACHMENT_PLACEHOLDER_RE.search(remaining_command): start, end = match.span() if start > 0: diff --git a/src/kimi_cli/ui/shell/replay.py b/src/kimi_cli/ui/shell/replay.py index 50420dad..d63078de 100644 --- a/src/kimi_cli/ui/shell/replay.py +++ b/src/kimi_cli/ui/shell/replay.py @@ -41,7 +41,10 @@ async def replay_recent_history(history: Sequence[Message]) -> None: for run in runs: wire = Wire() - console.print(f"{getpass.getuser()}{PROMPT_SYMBOL} {message_stringify(run.user_message)}") + console.print( + f"{getpass.getuser()}{PROMPT_SYMBOL} " + f"{message_stringify(run.user_message, context='replay')}" + ) ui_task = asyncio.create_task( visualize(wire.ui_side, initial_status=StatusSnapshot(context_usage=0.0)) ) diff --git a/src/kimi_cli/utils/message.py b/src/kimi_cli/utils/message.py index 83bba974..9933246a 100644 --- a/src/kimi_cli/utils/message.py +++ b/src/kimi_cli/utils/message.py @@ -1,5 +1,8 @@ from kosong.base.message import Message, TextPart +# Collapse text pastes with more than this many lines +LARGE_PASTE_LINE_THRESHOLD = 50 + def message_extract_text(message: Message) -> str: """Extract text from a message.""" @@ -8,15 +11,25 @@ def message_extract_text(message: Message) -> str: return "\n".join(part.text for part in message.content if isinstance(part, TextPart)) -def message_stringify(message: Message) -> str: - """Get a string representation of a message.""" +def message_stringify(message: Message, context: str = "default") -> str: + """Return a string view of a message, collapsing large pastes outside replay context.""" + + def _maybe_collapse(text: str) -> str: + """Collapse text if it exceeds threshold (except in replay context).""" + if context == "replay": + return text + line_count = text.count("\n") + 1 + if line_count > LARGE_PASTE_LINE_THRESHOLD: + return f"[pasted {line_count} lines]" + return text + parts: list[str] = [] if isinstance(message.content, str): - parts.append(message.content) + parts.append(_maybe_collapse(message.content)) else: for part in message.content: if isinstance(part, TextPart): - parts.append(part.text) + parts.append(_maybe_collapse(part.text)) else: parts.append(f"[{part.type}]") return "".join(parts) diff --git a/tests/test_message_utils.py b/tests/test_message_utils.py index f2c8da73..01c5edec 100644 --- a/tests/test_message_utils.py +++ b/tests/test_message_utils.py @@ -101,3 +101,21 @@ def test_extract_text_from_empty_string(): result = message_extract_text(message) assert result == "" + + +def test_stringify_collapses_large_text_by_default(): + """Test that large text is collapsed in default context.""" + large_text = "\n".join(["line"] * 60) + message = Message(role="user", content=large_text) + result = message_stringify(message) + + assert result == "[pasted 60 lines]" + + +def test_stringify_respects_replay_context(): + """Test that large text is shown in full in replay context.""" + large_text = "\n".join(["line"] * 60) + message = Message(role="user", content=large_text) + result = message_stringify(message, context="replay") + + assert result == large_text diff --git a/tests/test_prompt_placeholders.py b/tests/test_prompt_placeholders.py new file mode 100644 index 00000000..0dd04676 --- /dev/null +++ b/tests/test_prompt_placeholders.py @@ -0,0 +1,47 @@ +"""Unit tests for prompt placeholder deletion logic (cursor boundary conditions).""" + +import re + +# The regex pattern from prompt.py +_ATTACHMENT_PLACEHOLDER_RE = re.compile( + r"\[(?Pimage|text):(?P[a-zA-Z0-9_\-\.]+)" + r"(?:,(?P\d+)x(?P\d+)|,(?P\d+) lines)?\]" +) + + +class TestCursorBoundaryLogic: + """Test cursor boundary conditions for placeholder deletion (the main bug we fixed).""" + + def test_backspace_boundary_conditions(self): + """Test backspace deletes when cursor is after placeholder, not before.""" + text = "prefix [text:abc12345,60 lines]" + match = _ATTACHMENT_PLACEHOLDER_RE.search(text) + assert match is not None + start, end = match.span() + + # Backspace with cursor right after placeholder should delete (the bug fix) + cursor = end + should_delete = start < cursor <= end + assert should_delete + + # Backspace with cursor before placeholder should not delete + cursor = start + should_delete = start < cursor <= end + assert not should_delete + + def test_delete_boundary_conditions(self): + """Test delete key deletes when cursor is before placeholder, not after.""" + text = "prefix [text:abc12345,60 lines]" + match = _ATTACHMENT_PLACEHOLDER_RE.search(text) + assert match is not None + start, end = match.span() + + # Delete with cursor before placeholder should delete + cursor = start + should_delete = start <= cursor < end + assert should_delete + + # Delete with cursor after placeholder should not delete + cursor = end + should_delete = start <= cursor < end + assert not should_delete