Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/features/checkpoint-forking.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import art
from art.local import LocalBackend

async def train():
with LocalBackend() as backend:
async with LocalBackend() as backend:
# Create a new model that will fork from an existing checkpoint
model = art.TrainableModel(
name="my-model-v2",
Expand Down Expand Up @@ -115,7 +115,7 @@ low_lr_model = art.TrainableModel(
)

async def experiment():
with LocalBackend() as backend:
async with LocalBackend() as backend:
# Fork the model from the base model
await backend._experimental_fork_checkpoint(
low_lr_model,
Expand Down
24 changes: 24 additions & 0 deletions docs/fundamentals/art-backend.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ backend = LocalBackend(
)
```

If you're using `PipelineTrainer`, `LocalBackend` is currently supported only in dedicated mode, where training and inference run on separate GPUs.

```python
from art import TrainableModel
from art.dev import InternalModelConfig
from art.local import LocalBackend

backend = LocalBackend(path="./.art")
model = TrainableModel(
name="pipeline-localbackend",
project="my-project",
base_model="Qwen/Qwen3-0.6B",
_internal_config=InternalModelConfig(
trainer_gpu_ids=[0],
inference_gpu_ids=[1],
),
)
```

Shared `LocalBackend` still pauses inference during training, so ART rejects that configuration for `PipelineTrainer`.

In dedicated mode, a new checkpoint becomes the default inference target only after its LoRA has been reloaded into vLLM. That checkpoint publication flow is backend-specific, so `save_checkpoint` does not have identical semantics across every ART backend.
Requests that are already in flight keep using the adapter they started with; the reload only affects subsequent routing to the latest served step.

## Using a backend

Once initialized, a backend can be used in the same way regardless of whether it runs locally or remotely.
Expand Down
2 changes: 2 additions & 0 deletions docs/fundamentals/training-loop.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ ART's functionality is divided into a [**client**](/fundamentals/art-client) and

This training loop runs until a specified number of inference and training iterations have completed.

This describes the default shared-resource loop. `PipelineTrainer` can also run with `LocalBackend` in dedicated mode, where training and inference stay on separate GPUs and the latest served step advances only after vLLM reloads the new LoRA.

Training and inference use both the ART **client** and **backend**. Learn more by following the links below!

<div className="cards-container">
Expand Down
88 changes: 63 additions & 25 deletions src/art/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def _allocated_gpu_count(self, model: Model) -> int:
def __enter__(self) -> Self:
return self

async def __aenter__(self) -> Self:
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
Expand All @@ -167,14 +170,30 @@ def __exit__(
) -> None:
self._close()

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
await self.close()

async def close(self) -> None:
"""
If running vLLM in a separate process, this will kill that process and close the communication threads.
"""
self._close()
for service in self._services.values():
aclose = getattr(service, "aclose", None)
if aclose is None:
close = getattr(service, "close", None)
if close is not None:
close()
else:
await aclose()
close_proxy(service)

def _close(self) -> None:
for _, service in self._services.items():
for service in self._services.values():
close = getattr(service, "close", None)
if close is not None:
close()
Expand Down Expand Up @@ -219,25 +238,27 @@ def _model_inference_name(self, model: Model, step: int | None = None) -> str:
If None, returns name for latest checkpoint (step 0 initially).
"""

# For LocalBackend, vLLM always serves LoRA adapters with @step suffix
# Default to step 0 when not specified (the initial checkpoint created at registration)
if step is not None:
actual_step = step
elif model.name in self._services and self._in_process:
# In dedicated mode the service tracks which adapter vLLM has
# actually loaded. Reading the filesystem would race: the
# checkpoint directory appears before the HTTP reload completes.
svc = self._services[model.name]
loaded_step = getattr(svc, "_latest_step", None)
actual_step = (
loaded_step if loaded_step is not None else self.__get_step(model)
)
else:
actual_step = self.__get_step(model)
name = f"{model.name}@{actual_step}"
requested_step = step

if step is None and isinstance(model, TrainableModel):
from ..dev.validate import is_dedicated_mode

service = self._services.get(model.name)
if service is not None and is_dedicated_mode(
model._internal_config or dev.InternalModelConfig()
):
loaded_step = getattr(service, "_latest_step", None)
if isinstance(loaded_step, int):
step = loaded_step

if step is None:
# The checkpoint directory is written before dedicated-mode
# vLLM finishes reloading the new adapter.
step = self.__get_step(model)
name = f"{model.name}@{step}"
logger.debug(
f"[BACKEND] _model_inference_name: step_arg={step} "
f"actual_step={actual_step} -> {name}"
f"[BACKEND] _model_inference_name: step_arg={requested_step} "
f"actual_step={step} -> {name}"
)
return name

Expand Down Expand Up @@ -502,12 +523,14 @@ async def train( # type: ignore[override]
*,
# Core training parameters
learning_rate: float = 5e-6,
loss_fn: Literal["cispo", "ppo"] = "cispo",
loss_fn_config: dict | None = None,
normalize_advantages: bool = True,
adam_params: object | None = None,
# KL-penalized advantage adjustment
kl_penalty_coef: float = 0.0,
kl_penalty_reference_step: int | None = None,
kl_ref_adapter_path: str | None = None,
# RL algorithm settings
ppo: bool = False,
epsilon: float | None = None,
epsilon_high: float | None = None,
# Advantage computation
Expand Down Expand Up @@ -544,6 +567,14 @@ async def train( # type: ignore[override]
model: The trainable model to train.
trajectory_groups: Batches of trajectories to train on.
learning_rate: Learning rate for training. Defaults to 5e-6.
loss_fn: RL loss function. LocalBackend currently supports
"cispo" and "ppo".
loss_fn_config: Additional loss-function config. Not supported by
LocalBackend.
normalize_advantages: Whether to normalize advantages. LocalBackend
currently requires True.
adam_params: Custom optimizer params. Not supported by
LocalBackend.
kl_penalty_coef: Coefficient for KL-penalized advantage adjustment.
Tokens diverging more from the reference get reduced advantages.
Defaults to 0.0 (disabled).
Expand All @@ -553,8 +584,7 @@ async def train( # type: ignore[override]
kl_ref_adapter_path: Direct filesystem path to a LoRA adapter
checkpoint to use as the KL reference. Alternative to
kl_penalty_reference_step.
ppo: Whether to use PPO clipping. Defaults to False.
epsilon: Clip epsilon for importance sampling. Defaults based on ppo.
epsilon: Clip epsilon for importance sampling. Defaults based on loss_fn.
epsilon_high: Asymmetric upper clip bound. Defaults to epsilon.
advantage_balance: Balance between negative and positive advantages
in range [-1.0, 1.0]. Defaults to 0.0 (balanced).
Expand Down Expand Up @@ -597,6 +627,14 @@ async def train( # type: ignore[override]
# await model.log(metrics=result.metrics, step=result.step)
"""
groups_list = list(trajectory_groups)
if loss_fn not in {"cispo", "ppo"}:
raise ValueError("LocalBackend only supports loss_fn='cispo' or 'ppo'.")
if loss_fn_config is not None:
raise ValueError("LocalBackend requires loss_fn_config=None.")
if not normalize_advantages:
raise ValueError("LocalBackend requires normalize_advantages=True.")
if adam_params is not None:
raise ValueError("LocalBackend requires adam_params=None.")

# Build config objects from explicit kwargs
config = TrainConfig(
Expand All @@ -609,7 +647,7 @@ async def train( # type: ignore[override]
"kl_penalty_coef": kl_penalty_coef,
"mask_prob_ratio": mask_prob_ratio,
"plot_tensors": plot_tensors,
"ppo": ppo,
"ppo": loss_fn == "ppo",
"precalculate_logprobs": precalculate_logprobs,
"scale_learning_rate_by_reward_std_dev": scale_learning_rate_by_reward_std_dev,
"scale_rewards": scale_rewards,
Expand Down
37 changes: 37 additions & 0 deletions src/art/pipeline_trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def __init__(
total_scenarios=total_scenarios,
num_workers=num_rollout_workers,
)
self._validate_backend_support()

async def train(self, *, handle_signals: bool = True) -> None:
"""Run the training pipeline over the configured scenario iterator."""
Expand Down Expand Up @@ -277,6 +278,42 @@ async def _notify_policy() -> None:
except asyncio.QueueFull:
loop.create_task(self._output_queue.put(None))

def _validate_backend_support(self) -> None:
from art.dev.validate import is_dedicated_mode
from art.local.backend import LocalBackend

if not isinstance(self.backend, LocalBackend):
return

model_config = self.model._internal_config or art.dev.InternalModelConfig()
if not is_dedicated_mode(model_config):
raise ValueError(
"PipelineTrainer only supports LocalBackend in dedicated mode. "
"Shared LocalBackend pauses inference during training and is not "
"a supported async PipelineTrainer path. Set both "
"trainer_gpu_ids and inference_gpu_ids on the TrainableModel "
"_internal_config to use LocalBackend with PipelineTrainer."
)
if self.loss_fn not in {"cispo", "ppo"}:
raise ValueError(
"PipelineTrainer + LocalBackend(dedicated) only supports "
"loss_fn='cispo' or loss_fn='ppo'."
)
if self.loss_fn_config is not None:
raise ValueError(
"PipelineTrainer + LocalBackend(dedicated) requires "
"loss_fn_config=None."
)
if not self.normalize_advantages:
raise ValueError(
"PipelineTrainer + LocalBackend(dedicated) requires "
"normalize_advantages=True."
)
if self.adam_params is not None:
raise ValueError(
"PipelineTrainer + LocalBackend(dedicated) requires adam_params=None."
)

async def _skip_scenarios(
self, scenarios: AsyncIterator[ScenarioT], count: int
) -> int:
Expand Down
2 changes: 1 addition & 1 deletion src/art/test/test_step_skipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def test_step_skipping():
# Set up backend with custom art path
art_path = os.path.join(tmpdir, ".art")

with LocalBackend(path=art_path) as backend:
async with LocalBackend(path=art_path) as backend:
# Create a test model
model = TrainableModel(
name=f"test-step-skip-{uuid.uuid4()}",
Expand Down
44 changes: 37 additions & 7 deletions src/art/unsloth/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ..utils.get_model_step import get_step_from_dir
from ..utils.output_dirs import get_step_checkpoint_dir
from ..vllm import get_llm, get_worker, openai_server_task, run_on_workers
from .train import gc_and_empty_cuda_cache, train
from .train import StopTrainingLoop, gc_and_empty_cuda_cache, train

logger = logging.getLogger(__name__)

Expand All @@ -55,6 +55,15 @@ class SupportsLoadLora(Protocol):
def load_lora(self, lora_path: str, load_tensors: bool = True) -> LoRARequest: ...


class _StopTrainInputs:
"""Dedicated sentinel for stopping the background trainer loop."""


_STOP_TRAIN_INPUT = _StopTrainInputs()
_TRAIN_TASK_SHUTDOWN_TIMEOUT_S = 5.0
_TrainLoopInput = TrainInputs | _StopTrainInputs


def precalculate_new_logprobs(
trainer: "GRPOTrainer",
peft_model: "PeftModelForCausalLM",
Expand Down Expand Up @@ -91,7 +100,7 @@ async def process_train_batch(
packed_tensors: PackedTensors,
config: types.TrainConfig,
_config: dev.TrainConfig,
inputs_queue: asyncio.Queue[TrainInputs],
inputs_queue: asyncio.Queue[_TrainLoopInput],
results_queue: asyncio.Queue[dict[str, float]],
train_task: asyncio.Task[None],
trainer: "GRPOTrainer",
Expand Down Expand Up @@ -215,7 +224,7 @@ class UnslothState:
tokenizer: PreTrainedTokenizerBase
peft_model: peft.peft_model.PeftModelForCausalLM
trainer: GRPOTrainer
inputs_queue: asyncio.Queue[TrainInputs]
inputs_queue: asyncio.Queue[_TrainLoopInput]
results_queue: asyncio.Queue[dict[str, float]]
_is_offloaded: bool = False
_pinned_buffers: dict[str, torch.Tensor] | None = None
Expand Down Expand Up @@ -316,6 +325,7 @@ class UnslothService:
_vllm_log_file: Any = field(default=None, repr=False)
_vllm_host: str = "127.0.0.1"
_vllm_port: int = 0
_train_task: asyncio.Task[None] | None = field(default=None, init=False, repr=False)

@property
def is_dedicated(self) -> bool:
Expand All @@ -326,6 +336,24 @@ def _next_lora_id(self) -> int:
self._lora_id_counter += 1
return self._lora_id_counter

async def aclose(self) -> None:
train_task = self._train_task
self._train_task = None
if train_task is None or train_task.done():
self.close()
return

# `_state` is a cached_property. Read from __dict__ directly so
# closing does not instantiate trainer state only to stop a task.
state = self.__dict__.get("_state")
assert isinstance(state, UnslothState)
state.inputs_queue.put_nowait(_STOP_TRAIN_INPUT)
try:
await asyncio.wait_for(train_task, timeout=_TRAIN_TASK_SHUTDOWN_TIMEOUT_S)
except asyncio.TimeoutError:
train_task.cancel()
self.close()

# =========================================================================
# Dedicated mode: vLLM subprocess lifecycle
# =========================================================================
Expand Down Expand Up @@ -595,7 +623,7 @@ async def _train_dedicated(

await self._state.results_queue.join()

if not hasattr(self, "_train_task") or self._train_task is None:
if self._train_task is None:
self._train_task = asyncio.create_task(
train(
trainer=self._state.trainer,
Expand Down Expand Up @@ -685,7 +713,7 @@ async def _train_shared(
await self._state.results_queue.join()

# If we haven't already, start the training task
if not hasattr(self, "_train_task") or self._train_task is None:
if self._train_task is None:
self._train_task = asyncio.create_task(
train(
trainer=self._state.trainer,
Expand Down Expand Up @@ -981,17 +1009,19 @@ def _state(self) -> UnslothState:
trainer.create_optimizer()

# Initialize queues
inputs_queue: asyncio.Queue[TrainInputs] = asyncio.Queue()
inputs_queue: asyncio.Queue[_TrainLoopInput] = asyncio.Queue()
results_queue: asyncio.Queue[dict[str, float]] = asyncio.Queue()

# Patch trainer _prepare_inputs() to pull from queue
def _async_prepare_inputs(*_: Any, **__: Any) -> dict[str, torch.Tensor]:
async def get_inputs() -> TrainInputs:
async def get_inputs() -> _TrainLoopInput:
return await inputs_queue.get()

# Force otherwise synchronous _prepare_inputs() to yield
# with nested asyncio.run() call
inputs = asyncio.run(get_inputs())
if isinstance(inputs, _StopTrainInputs):
raise StopTrainingLoop()

return cast(dict[str, torch.Tensor], inputs)

Expand Down
Loading
Loading