Skip to content

Commit 7e7f839

Browse files
committed
Add Rust corpus-session replacement lifecycle API
1 parent 89ba415 commit 7e7f839

6 files changed

Lines changed: 151 additions & 18 deletions

File tree

docs/modernization-handoff.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ Delivered:
9797
- unified Rust session orchestration API (`run_em_on_session`) that applies settings + runs EM in one call, now preferred by `Lda::Backends::Rust`
9898
- `Lda::Backends::Rust` now routes all start modes through unified session orchestration when available (`run_em_on_session`); when sessions are unavailable it now prefers direct Rust non-session orchestration (`run_em_with_start_seed`) before legacy Ruby-side beta-input fallback (`run_em`)
9999
- Rust managed-session orchestration API (`run_em_on_session_with_corpus`) added to recreate missing sessions and run EM in one Rust call
100+
- Rust session lifecycle replacement API (`replace_corpus_session`) added so corpus reassignment can update existing Rust sessions in place (config reset + corpus swap) instead of Ruby-side drop/recreate
100101
- `Lda::Backends::Rust` now retries missing-session runs through `run_em_on_session_with_corpus` to preserve session-based orchestration when sessions are dropped externally
101102
- parity/compatibility test coverage and rust runtime CI
102103

docs/porting-strategy.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ Completed in `codex/experiment-ruby3-modernization`:
6464
- Unified Rust session API added (`run_em_on_session`) to apply settings and execute EM in one call; `Lda::Backends::Rust` now prefers this single-call session path before non-session fallbacks.
6565
- `Lda::Backends::Rust` now prefers direct Rust non-session orchestration (`run_em_with_start_seed`) before legacy `run_em(initial_beta, ...)` compatibility fallback when a session path is unavailable.
6666
- Rust managed-session orchestration API added (`run_em_on_session_with_corpus`) to recreate missing sessions and execute EM in one Rust call.
67+
- Rust session lifecycle replacement API added (`replace_corpus_session`) so corpus reassignment can update existing Rust sessions in place (config reset + corpus swap) instead of Ruby-side drop/recreate.
6768
- `Lda::Backends::Rust` now retries missing-session runs through `run_em_on_session_with_corpus`, reducing fallback to non-session orchestration when sessions are externally dropped.
6869
- Dockerized rust runtime workflow added for local parity with CI (`Dockerfile.rust`, `bin/docker-test-rust`).
6970
- Gem packaging now excludes local Rust cargo build artifacts (`target/**`) for clean release builds.

docs/rust-orchestration-guardrails.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Current parity expectations:
1616
- `Lda::Backends::Rust` non-session fallback should prefer Rust start-aware orchestration (`run_em_with_start_seed`) before legacy beta-input orchestration (`run_em`).
1717
- Rust backend corpus/session lifecycle must not leak session count across corpus replacement.
1818
- Missing-session recovery in managed session orchestration (`run_em_on_session_with_corpus`) must recreate a usable session and keep parity with direct orchestration.
19+
- Corpus reassignment through Rust session replacement lifecycle (`replace_corpus_session`) must preserve stable session count and route subsequent EM runs over updated corpus data.
1920
- Unknown start-mode handling in seed-aware Rust orchestration must match Ruby's non-seeded fallback behavior when given the same explicit seed.
2021

2122
## Benchmark guardrail

ext/lda-ruby-rust/src/lib.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,18 +406,26 @@ fn random_topic_term_probabilities(
406406
matrix
407407
}
408408

409+
fn corpus_session_data(
410+
document_words: Vec<Vec<usize>>,
411+
document_counts: Vec<Vec<f64>>,
412+
terms: usize,
413+
) -> Arc<CorpusSessionData> {
414+
Arc::new(CorpusSessionData {
415+
document_words,
416+
document_counts,
417+
terms,
418+
})
419+
}
420+
409421
fn create_corpus_session(
410422
document_words: Vec<Vec<usize>>,
411423
document_counts: Vec<Vec<f64>>,
412424
terms: usize,
413425
) -> i64 {
414426
let session_id = NEXT_CORPUS_SESSION_ID.fetch_add(1, Ordering::Relaxed);
415427
let session = CorpusSession {
416-
data: Arc::new(CorpusSessionData {
417-
document_words,
418-
document_counts,
419-
terms,
420-
}),
428+
data: corpus_session_data(document_words, document_counts, terms),
421429
config: None,
422430
};
423431

@@ -430,6 +438,42 @@ fn create_corpus_session(
430438
}
431439
}
432440

441+
fn replace_corpus_session(
442+
session_id: i64,
443+
document_words: Vec<Vec<usize>>,
444+
document_counts: Vec<Vec<f64>>,
445+
terms: usize,
446+
) -> i64 {
447+
if terms == 0 {
448+
return 0;
449+
}
450+
451+
let replacement_data = corpus_session_data(document_words, document_counts, terms);
452+
match corpus_sessions().lock() {
453+
Ok(mut sessions) => {
454+
if session_id > 0 {
455+
let session_key = session_id as u64;
456+
if let Some(session) = sessions.get_mut(&session_key) {
457+
session.data = replacement_data;
458+
session.config = None;
459+
return session_id;
460+
}
461+
}
462+
463+
let new_session_id = NEXT_CORPUS_SESSION_ID.fetch_add(1, Ordering::Relaxed);
464+
sessions.insert(
465+
new_session_id,
466+
CorpusSession {
467+
data: replacement_data,
468+
config: None,
469+
},
470+
);
471+
new_session_id as i64
472+
}
473+
Err(_) => 0,
474+
}
475+
}
476+
433477
fn ensure_corpus_session(
434478
session_id: i64,
435479
document_words: Vec<Vec<usize>>,
@@ -1146,6 +1190,8 @@ fn init() -> Result<(), Error> {
11461190
)?;
11471191
rust_backend_module
11481192
.define_singleton_method("create_corpus_session", function!(create_corpus_session, 3))?;
1193+
rust_backend_module
1194+
.define_singleton_method("replace_corpus_session", function!(replace_corpus_session, 4))?;
11491195
rust_backend_module
11501196
.define_singleton_method("drop_corpus_session", function!(drop_corpus_session, 1))?;
11511197
rust_backend_module

lib/lda-ruby/backends/rust.rb

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ def name
5656
end
5757

5858
def corpus=(corpus)
59-
release_rust_corpus_session
59+
previous_session_id = @rust_corpus_session_id
6060
@corpus = corpus
6161
@fallback.corpus = corpus
62-
register_rust_corpus_session
62+
register_rust_corpus_session(previous_session_id)
6363
true
6464
end
6565

@@ -291,31 +291,52 @@ def max_term_index
291291
.max || -1
292292
end
293293

294-
def register_rust_corpus_session
294+
def register_rust_corpus_session(previous_session_id = nil)
295295
@rust_corpus_session_id = nil
296296
@rust_corpus_terms = nil
297297
@rust_document_lengths = nil
298298
@rust_document_words = nil
299299
@rust_document_counts = nil
300300

301+
if @corpus.nil?
302+
drop_rust_corpus_session_by_id(previous_session_id)
303+
return
304+
end
305+
301306
return unless defined?(::Lda::RustBackend)
302307

303308
em_input = rust_em_corpus_input
304-
return if em_input.nil?
309+
if em_input.nil?
310+
drop_rust_corpus_session_by_id(previous_session_id)
311+
return
312+
end
305313

306314
@rust_corpus_terms = Integer(em_input.fetch(:terms))
307315
@rust_document_lengths = em_input.fetch(:document_lengths)
308316
@rust_document_words = em_input.fetch(:document_words)
309317
@rust_document_counts = em_input.fetch(:document_counts)
310-
return unless ::Lda::RustBackend.respond_to?(:create_corpus_session)
311318

312-
session_id = ::Lda::RustBackend.create_corpus_session(
313-
@rust_document_words,
314-
@rust_document_counts,
315-
Integer(@rust_corpus_terms)
316-
)
317-
return unless session_id.is_a?(Numeric)
318-
return unless session_id.positive?
319+
session_id =
320+
if ::Lda::RustBackend.respond_to?(:replace_corpus_session)
321+
::Lda::RustBackend.replace_corpus_session(
322+
Integer(previous_session_id || 0),
323+
@rust_document_words,
324+
@rust_document_counts,
325+
Integer(@rust_corpus_terms)
326+
)
327+
elsif ::Lda::RustBackend.respond_to?(:create_corpus_session)
328+
drop_rust_corpus_session_by_id(previous_session_id)
329+
::Lda::RustBackend.create_corpus_session(
330+
@rust_document_words,
331+
@rust_document_counts,
332+
Integer(@rust_corpus_terms)
333+
)
334+
end
335+
336+
unless session_id.is_a?(Numeric) && session_id.positive?
337+
drop_rust_corpus_session_by_id(previous_session_id)
338+
return
339+
end
319340

320341
@rust_corpus_session_id = Integer(session_id)
321342
rescue StandardError
@@ -324,13 +345,14 @@ def register_rust_corpus_session
324345
@rust_document_lengths = nil
325346
@rust_document_words = nil
326347
@rust_document_counts = nil
348+
drop_rust_corpus_session_by_id(previous_session_id)
327349
end
328350

329351
def ensure_rust_corpus_session
330352
has_session_data = @rust_corpus_terms && @rust_document_lengths && @rust_document_words && @rust_document_counts
331353
return true if has_session_data
332354

333-
register_rust_corpus_session
355+
register_rust_corpus_session(@rust_corpus_session_id)
334356
@rust_corpus_terms && @rust_document_lengths && @rust_document_words && @rust_document_counts
335357
rescue StandardError
336358
false
@@ -345,6 +367,12 @@ def release_rust_corpus_session
345367
@rust_document_words = nil
346368
@rust_document_counts = nil
347369

370+
drop_rust_corpus_session_by_id(session_id)
371+
rescue StandardError
372+
nil
373+
end
374+
375+
def drop_rust_corpus_session_by_id(session_id)
348376
return unless session_id
349377
return unless defined?(::Lda::RustBackend)
350378
return unless ::Lda::RustBackend.respond_to?(:drop_corpus_session)

test/rust_orchestration_test.rb

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,55 @@ def test_run_em_on_session_with_corpus_recreates_missing_session
508508
end
509509
end
510510

511+
def test_replace_corpus_session_updates_existing_session_in_place
512+
omit("create_corpus_session unavailable") unless Lda::RustBackend.respond_to?(:create_corpus_session)
513+
omit("replace_corpus_session unavailable") unless Lda::RustBackend.respond_to?(:replace_corpus_session)
514+
omit("drop_corpus_session unavailable") unless Lda::RustBackend.respond_to?(:drop_corpus_session)
515+
omit("run_em_on_session unavailable") unless Lda::RustBackend.respond_to?(:run_em_on_session)
516+
517+
session_id = Lda::RustBackend.create_corpus_session(@document_words, @document_counts, @terms)
518+
assert_operator session_id, :>, 0
519+
520+
replacement_words = [[0, 1, 2], [2, 3]]
521+
replacement_counts = [[2.0, 1.0, 1.0], [1.0, 4.0]]
522+
replacement_terms = 5
523+
starting_count = Lda::RustBackend.corpus_session_count if Lda::RustBackend.respond_to?(:corpus_session_count)
524+
525+
replaced_session_id = Lda::RustBackend.replace_corpus_session(
526+
session_id,
527+
replacement_words,
528+
replacement_counts,
529+
replacement_terms
530+
)
531+
532+
assert_equal session_id, replaced_session_id
533+
if !starting_count.nil?
534+
assert_equal starting_count, Lda::RustBackend.corpus_session_count
535+
end
536+
537+
output = Lda::RustBackend.run_em_on_session(
538+
replaced_session_id,
539+
"seeded",
540+
@topics,
541+
@max_iter,
542+
@convergence,
543+
@em_max_iter,
544+
@em_convergence,
545+
@init_alpha,
546+
@min_probability,
547+
91_919
548+
)
549+
550+
assert_equal replacement_terms, output[0].first.size
551+
assert_equal replacement_words.size, output[2].size
552+
ensure
553+
if defined?(replaced_session_id) && replaced_session_id.is_a?(Numeric) && replaced_session_id.positive?
554+
Lda::RustBackend.drop_corpus_session(replaced_session_id)
555+
elsif defined?(session_id) && session_id.is_a?(Numeric) && session_id.positive?
556+
Lda::RustBackend.drop_corpus_session(session_id)
557+
end
558+
end
559+
511560
def test_rust_backend_corpus_session_lifecycle_no_leak
512561
omit("corpus_session_count unavailable") unless Lda::RustBackend.respond_to?(:corpus_session_count)
513562

@@ -516,9 +565,16 @@ def test_rust_backend_corpus_session_lifecycle_no_leak
516565

517566
backend.corpus = Lda::TextCorpus.new(FIXTURE_DOCUMENTS)
518567
assert_equal starting_count + 1, Lda::RustBackend.corpus_session_count
568+
first_session_id = backend.instance_variable_get(:@rust_corpus_session_id)
569+
assert_operator first_session_id, :>, 0
519570

520571
backend.corpus = Lda::TextCorpus.new(FIXTURE_DOCUMENTS.reverse)
521572
assert_equal starting_count + 1, Lda::RustBackend.corpus_session_count
573+
second_session_id = backend.instance_variable_get(:@rust_corpus_session_id)
574+
assert_operator second_session_id, :>, 0
575+
if Lda::RustBackend.respond_to?(:replace_corpus_session)
576+
assert_equal first_session_id, second_session_id
577+
end
522578

523579
backend.corpus = nil
524580
assert_equal starting_count, Lda::RustBackend.corpus_session_count

0 commit comments

Comments
 (0)