Skip to content

Commit fcdacb5

Browse files
authored
Merge pull request #42 from softwaremill/fix/duplicate-token-accumulation
Fix/duplicate token accumulation
2 parents 0297661 + 4dddf81 commit fcdacb5

3 files changed

Lines changed: 21 additions & 9 deletions

File tree

crates/tracevault-server/src/repo/events.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ impl EventRepo {
9292
}
9393

9494
/// INSERT INTO transcript_chunks ... ON CONFLICT DO NOTHING.
95+
/// Returns true if the row was actually inserted (not a duplicate).
9596
pub async fn insert_transcript_chunk(
9697
pool: &PgPool,
9798
req: &InsertTranscriptChunk,
98-
) -> Result<(), AppError> {
99-
sqlx::query(
99+
) -> Result<bool, AppError> {
100+
let result = sqlx::query(
100101
"INSERT INTO transcript_chunks (session_id, chunk_index, data)
101102
VALUES ($1, $2, $3)
102103
ON CONFLICT (session_id, chunk_index) DO NOTHING",
@@ -106,7 +107,7 @@ impl EventRepo {
106107
.bind(&req.data)
107108
.execute(pool)
108109
.await?;
109-
Ok(())
110+
Ok(result.rows_affected() > 0)
110111
}
111112

112113
/// INSERT INTO user_software_usage ... ON CONFLICT DO UPDATE.

crates/tracevault-server/src/service/stream.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl StreamService {
6767

6868
for (i, line) in lines.iter().enumerate() {
6969
let chunk_index = offset as i32 + i as i32;
70-
EventRepo::insert_transcript_chunk(
70+
let was_inserted = EventRepo::insert_transcript_chunk(
7171
&state.pool,
7272
&InsertTranscriptChunk {
7373
session_id: session_db_id,
@@ -77,6 +77,12 @@ impl StreamService {
7777
)
7878
.await?;
7979

80+
// Only count tokens from newly inserted chunks to avoid
81+
// double-counting when overlapping batches are sent
82+
if !was_inserted {
83+
continue;
84+
}
85+
8086
// Extract token usage from assistant messages
8187
if let Some(msg) = line.get("message") {
8288
if let Some(usage) = msg.get("usage") {
@@ -114,8 +120,11 @@ impl StreamService {
114120
// input_tokens from the API includes cache_read and cache_write,
115121
// subtract to get fresh (non-cached) input only
116122
let fresh_input = (batch_input - batch_cache_read - batch_cache_write).max(0);
117-
let batch_cost = crate::pricing::estimate_cost(
118-
model_name,
123+
let pricing =
124+
crate::pricing::fetch_pricing_for_model(&state.pool, model_name, None)
125+
.await;
126+
let batch_cost = crate::pricing::estimate_cost_with_pricing(
127+
&pricing,
119128
fresh_input,
120129
batch_output,
121130
batch_cache_read,

crates/tracevault-server/tests/repo_events_test.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,16 @@ async fn insert_transcript_chunk_dedup(pool: sqlx::PgPool) {
127127
data: serde_json::json!({"role": "user", "content": "hello"}),
128128
};
129129

130-
EventRepo::insert_transcript_chunk(&pool, &chunk)
130+
let inserted = EventRepo::insert_transcript_chunk(&pool, &chunk)
131131
.await
132132
.unwrap();
133+
assert!(inserted, "first insert should return true");
133134

134-
// Duplicate should not error (ON CONFLICT DO NOTHING)
135-
EventRepo::insert_transcript_chunk(&pool, &chunk)
135+
// Duplicate should not error (ON CONFLICT DO NOTHING) and should return false
136+
let inserted_again = EventRepo::insert_transcript_chunk(&pool, &chunk)
136137
.await
137138
.unwrap();
139+
assert!(!inserted_again, "duplicate insert should return false");
138140

139141
let (count,): (i64,) =
140142
sqlx::query_as("SELECT COUNT(*) FROM transcript_chunks WHERE session_id = $1")

0 commit comments

Comments
 (0)