Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ print(model.available_voices)
# ['Bella', 'Jasper', 'Luna', 'Bruno', 'Rosie', 'Hugo', 'Kiki', 'Leo']
```

Kitten TTS sends anonymous generation analytics; see [`docs/analytics.md`](docs/analytics.md) for details and opt-out.

### Using with GPU

```
Expand Down
22 changes: 22 additions & 0 deletions docs/analytics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Analytics

Kitten TTS sends anonymous generation analytics to the KittenTTS ingest API at
`https://kittenmlanalytics.com/v1/track`. The SDK does not include PostHog or
any analytics-provider SDK, and it does not send input text or generated audio.

Events include SDK version, SDK type, platform, runtime version, selected model,
model version, selected/default voice, generation type (`wav`, `speak`, or
`stream`), asset source, and SDK error code for failed calls. IP address and
location are added server-side by Cloudflare.

Streaming calls send one `stream` analytics event per stream invocation, not one
event per generated chunk.

Disable analytics at model creation:

```python
model = KittenTTS("KittenML/kitten-tts-mini-0.8", analytics=False)
```

Analytics runs in the background with a short timeout. Network failures are
swallowed and do not block or fail TTS generation.
194 changes: 194 additions & 0 deletions kittentts/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Small, dependency-free analytics client for KittenTTS SDK events."""

import json
import os
import platform as platform_module
import re
import sys
import threading
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, Optional
from urllib import request

ANALYTICS_ENDPOINT = "https://kittenmlanalytics.com/v1/track"
SDK_TYPE = "python"
DEFAULT_TIMEOUT_SECONDS = 3.0

_MODEL_VERSION_RE = re.compile(r"^(?P<model>.+?)-(?P<version>\d+(?:\.\d+)*(?:-[A-Za-z0-9]+)*)$")


def analytics_enabled(value=True) -> bool:
if value is False:
return False
env_value = os.environ.get("KITTENTTS_ANALYTICS")
if env_value and env_value.strip().lower() in {"0", "false", "off", "no"}:
return False
return True


def current_platform() -> str:
system = platform_module.system().lower()
if system == "darwin":
return "macos"
if system == "windows":
return "windows"
if system == "linux":
return "linux"
return "unknown"


def runtime_version() -> str:
return f"python {sys.version_info.major}.{sys.version_info.minor}"


def parse_model_name(model_name: str) -> Dict[str, str]:
repo_name = str(model_name).rstrip("/").split("/")[-1] or str(model_name)
match = _MODEL_VERSION_RE.match(repo_name)
if not match:
return {"selected_model": repo_name, "model_version": "unknown"}
return {
"selected_model": match.group("model"),
"model_version": match.group("version"),
}


def error_code(error: BaseException) -> str:
name = error.__class__.__name__
words = re.sub(r"(?<!^)(?=[A-Z])", "_", name).upper()
return words or "UNKNOWN_ERROR"


class AnalyticsClient:
def __init__(
self,
sdk_version: str,
selected_model: str,
model_version: str,
asset_source: str,
enabled: bool = True,
endpoint: str = ANALYTICS_ENDPOINT,
anonymous_id_path: Optional[Path] = None,
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
post_json: Optional[Callable[[str, Dict[str, str], float], None]] = None,
async_delivery: bool = True,
):
self.sdk_version = sdk_version
self.selected_model = selected_model
self.model_version = model_version
self.asset_source = asset_source
self.enabled = analytics_enabled(enabled)
self.endpoint = endpoint
self.timeout_seconds = timeout_seconds
self._post_json = post_json or post_json_request
self._async_delivery = async_delivery
self._anonymous_id_path = anonymous_id_path or default_anonymous_id_path()
self._anonymous_id = None

@property
def anonymous_id(self) -> str:
if not self._anonymous_id:
self._anonymous_id = load_or_create_anonymous_id(self._anonymous_id_path)
return self._anonymous_id

def track_generation(
self,
selected_voice: str,
generation: str,
sdk_error_code: Optional[str] = None,
) -> None:
try:
self._track_generation(selected_voice, generation, sdk_error_code=sdk_error_code)
except Exception:
return

def _track_generation(
self,
selected_voice: str,
generation: str,
sdk_error_code: Optional[str] = None,
) -> None:
if not self.enabled:
return

payload = {
"anonymous_id": self.anonymous_id,
"client_event_id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"sdk_version": self.sdk_version,
"sdk_type": SDK_TYPE,
"platform": current_platform(),
"runtime_version": runtime_version(),
"selected_model": self.selected_model,
"model_version": self.model_version,
"selected_voice": str(selected_voice),
"generation": generation,
"asset_source": self.asset_source,
}
if sdk_error_code:
payload["sdk_error_code"] = sdk_error_code

if self._async_delivery:
thread = threading.Thread(target=self._send, args=(payload,), daemon=True)
thread.start()
else:
self._send(payload)

def _send(self, payload: Dict[str, str]) -> None:
try:
self._post_json(self.endpoint, payload, self.timeout_seconds)
except Exception:
return


def post_json_request(endpoint: str, payload: Dict[str, str], timeout_seconds: float) -> None:
body = json.dumps(payload).encode("utf-8")
sdk_version = str(payload.get("sdk_version") or "unknown").replace("\n", " ").replace("\r", " ")
req = request.Request(
endpoint,
data=body,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": f"KittenTTS-Python/{sdk_version}",
},
method="POST",
)
with request.urlopen(req, timeout=timeout_seconds) as response:
response.read()


def default_anonymous_id_path() -> Path:
configured_home = os.environ.get("KITTENTTS_ANALYTICS_HOME")
if configured_home:
return Path(configured_home).expanduser() / "anonymous_id"
try:
return Path.home() / ".kittentts" / "analytics_id"
except RuntimeError:
return Path(os.environ.get("TMPDIR", "/tmp")) / "kittentts" / "analytics_id"


def load_or_create_anonymous_id(path: Path) -> str:
try:
existing = path.read_text(encoding="utf-8").strip()
if is_uuid(existing):
return existing
except OSError:
pass

anonymous_id = str(uuid.uuid4())
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(anonymous_id, encoding="utf-8")
except OSError:
pass
return anonymous_id


def is_uuid(value: str) -> bool:
try:
uuid.UUID(value)
except (TypeError, ValueError):
return False
return True
81 changes: 73 additions & 8 deletions kittentts/get_model.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import json
import os
from huggingface_hub import hf_hub_download
from .onnx_model import KittenTTS_1_Onnx
from importlib import metadata

from .analytics import AnalyticsClient, error_code, parse_model_name
from .preprocess import normalize_text


class KittenTTS:
"""Main KittenTTS class for text-to-speech synthesis."""

def __init__(self, model_name="KittenML/kitten-tts-nano-0.8", cache_dir=None, backend=None):
def __init__(self, model_name="KittenML/kitten-tts-nano-0.8", cache_dir=None, backend=None, analytics=True):
"""Initialize KittenTTS with a model from Hugging Face.

Args:
model_name: Hugging Face repository ID or model name
cache_dir: Directory to cache downloaded files
analytics: Set to False to disable anonymous generation analytics
"""
# Handle different model name formats
if "/" not in model_name:
Expand All @@ -23,6 +25,14 @@ def __init__(self, model_name="KittenML/kitten-tts-nano-0.8", cache_dir=None, ba
repo_id = model_name

self.model = download_from_huggingface(repo_id=repo_id, cache_dir=cache_dir, backend=backend)
model_info = parse_model_name(repo_id)
self.analytics = AnalyticsClient(
sdk_version=_sdk_version(),
selected_model=model_info["selected_model"],
model_version=model_info["model_version"],
asset_source=getattr(self.model, "analytics_asset_source", "runtime-download"),
enabled=analytics,
)

def normalize_text(self, text, locale="en-US", return_spans=False):
"""Normalize text for TTS without generating audio."""
Expand All @@ -40,15 +50,26 @@ def generate(self, text, voice="expr-voice-5-m", speed=1.0, clean_text=False):
Audio data as numpy array
"""
print(f"Generating audio for text: {text}")
return self.model.generate(text, voice=voice, speed=speed, clean_text=clean_text)
try:
audio = self.model.generate(text, voice=voice, speed=speed, clean_text=clean_text)
except Exception as exc:
self._track_generation(voice, generation="wav", sdk_error_code=error_code(exc))
raise
self._track_generation(voice, generation="wav")
return audio

def generate_stream(self, text, voice="expr-voice-5-m", speed=1.0, clean_text=False):
"""Generate audio as a stream of chunks.

Yields:
numpy.ndarray: Audio data for each text chunk.
"""
yield from self.model.generate_stream(text, voice=voice, speed=speed, clean_text=clean_text)
try:
yield from self.model.generate_stream(text, voice=voice, speed=speed, clean_text=clean_text)
except Exception as exc:
self._track_generation(voice, generation="stream", sdk_error_code=error_code(exc))
raise
self._track_generation(voice, generation="stream")

def generate_to_file(self, text, output_path, voice="expr-voice-5-m", speed=1.0, sample_rate=24000):
"""Generate audio from text and save to file.
Expand All @@ -60,13 +81,29 @@ def generate_to_file(self, text, output_path, voice="expr-voice-5-m", speed=1.0,
speed: Speech speed (1.0 = normal)
sample_rate: Audio sample rate
"""
return self.model.generate_to_file(text, output_path, voice=voice, speed=speed, sample_rate=sample_rate)
try:
result = self.model.generate_to_file(text, output_path, voice=voice, speed=speed, sample_rate=sample_rate)
except Exception as exc:
self._track_generation(voice, generation="wav", sdk_error_code=error_code(exc))
raise
self._track_generation(voice, generation="wav")
return result

@property
def available_voices(self):
"""Get list of available voices."""
return self.model.all_voice_names

def _track_generation(self, voice, generation, sdk_error_code=None):
try:
self.analytics.track_generation(
selected_voice=voice,
generation=generation,
sdk_error_code=sdk_error_code,
)
except Exception:
return


def download_from_huggingface(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=None, backend=None):
"""Download model files from Hugging Face repository.
Expand All @@ -78,7 +115,10 @@ def download_from_huggingface(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=
Returns:
KittenTTS_1_Onnx: Instantiated model ready for use
"""
from huggingface_hub import hf_hub_download

# Download config file first
config_was_cached = _is_cached(repo_id, "config.json", cache_dir)
config_path = hf_hub_download(
repo_id=repo_id,
filename="config.json",
Expand All @@ -92,6 +132,9 @@ def download_from_huggingface(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=
if config.get("type") not in ["ONNX1", "ONNX2"]:
raise ValueError("Unsupported model type.")

model_was_cached = _is_cached(repo_id, config["model_file"], cache_dir)
voices_were_cached = _is_cached(repo_id, config["voices"], cache_dir)

# Download model and voices files based on config
model_path = hf_hub_download(
repo_id=repo_id,
Expand All @@ -105,12 +148,34 @@ def download_from_huggingface(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=
cache_dir=cache_dir
)

from .onnx_model import KittenTTS_1_Onnx

# Instantiate and return model
model = KittenTTS_1_Onnx(model_path=model_path, voices_path=voices_path, speed_priors=config.get("speed_priors", {}) , voice_aliases=config.get("voice_aliases", {}), backend=backend)
model.analytics_asset_source = "cache" if config_was_cached and model_was_cached and voices_were_cached else "runtime-download"

return model


def get_model(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=None, backend=None):
def get_model(repo_id="KittenML/kitten-tts-nano-0.1", cache_dir=None, backend=None, analytics=True):
"""Get a KittenTTS model (legacy function for backward compatibility)."""
return KittenTTS(repo_id, cache_dir, backend=backend)
return KittenTTS(repo_id, cache_dir, backend=backend, analytics=analytics)


def _is_cached(repo_id, filename, cache_dir):
try:
from huggingface_hub import try_to_load_from_cache
except ImportError:
return False
try:
cached_path = try_to_load_from_cache(repo_id=repo_id, filename=filename, cache_dir=cache_dir)
except Exception:
return False
return isinstance(cached_path, str) and os.path.exists(cached_path)


def _sdk_version():
try:
return metadata.version("kittentts")
except metadata.PackageNotFoundError:
return "unknown"
Loading