diff --git a/sagemaker-mlops/tests/integ/test_feature_store.py b/sagemaker-mlops/tests/integ/test_feature_store.py index 0dadd579dd..6273bc8df9 100644 --- a/sagemaker-mlops/tests/integ/test_feature_store.py +++ b/sagemaker-mlops/tests/integ/test_feature_store.py @@ -155,10 +155,26 @@ def test_delete_feature_group(feature_group_name, sample_dataframe, bucket, role fg.wait_for_status("Created") fg.delete() - time.sleep(2) - - with pytest.raises(Exception): - FeatureGroup.get(feature_group_name=feature_group_name) + + # FeatureGroup deletion is asynchronous: after delete() returns the group + # stays in "Deleting" status and is still describable for a while, so a + # fixed short sleep + single get() is racy. Poll until get() raises (the + # group is fully gone) or we hit the timeout. + deadline = time.time() + 120 + last_exc = None + while time.time() < deadline: + try: + FeatureGroup.get(feature_group_name=feature_group_name) + except Exception as e: # noqa: BLE001 - any error means it's no longer retrievable + last_exc = e + break + time.sleep(5) + else: + pytest.fail( + f"FeatureGroup {feature_group_name} was still retrievable 120s after delete()" + ) + + assert last_exc is not None # Test 7: Ingest to both OnlineStore and OfflineStore diff --git a/sagemaker-mlops/tests/integ/test_feature_store_lakeformation.py b/sagemaker-mlops/tests/integ/test_feature_store_lakeformation.py index 85880e1702..5095c5742b 100644 --- a/sagemaker-mlops/tests/integ/test_feature_store_lakeformation.py +++ b/sagemaker-mlops/tests/integ/test_feature_store_lakeformation.py @@ -160,7 +160,12 @@ def test_create_feature_group_and_enable_lake_formation(s3_uri, role, region): assert fg.feature_group_status == "Created" # Enable Lake Formation governance - result = fg.enable_lake_formation(hybrid_access_mode_enabled=False, acknowledge_risk=True) + result = fg.enable_lake_formation( + hybrid_access_mode_enabled=False, + acknowledge_risk=True, + use_service_linked_role=False, + registration_role_arn=role, + ) # Verify all phases completed successfully assert result["s3_location_registered"] is True @@ -198,6 +203,8 @@ def test_create_feature_group_with_lake_formation_enabled(s3_uri, role, region): enabled=True, hybrid_access_mode_enabled = False, acknowledge_risk=True, + use_service_linked_role=False, + registration_role_arn=role, ) fg = FeatureGroupManager.create( @@ -467,8 +474,17 @@ def test_enable_lake_formation_fails_with_nonexistent_role( # Verify we got an appropriate error error_msg = str(exc_info.value) print(exc_info) - # Should mention role-related issues (not found, invalid, access denied, etc.) - assert "EntityNotFoundException" in error_msg + # The registration must fail because the role is not usable. Depending on + # how the build/execution role's iam:PassRole policy is scoped, this surfaces + # either as Lake Formation rejecting the unknown role (EntityNotFoundException) + # or as IAM denying PassRole before the call reaches Lake Formation + # (AccessDeniedException on iam:PassRole). Both are valid "nonexistent / not + # usable role" outcomes for this negative test. + assert ( + "EntityNotFoundException" in error_msg + or "AccessDeniedException" in error_msg + or "iam:PassRole" in error_msg + ), f"Unexpected error for nonexistent role registration: {error_msg}" # ============================================================================ @@ -503,7 +519,12 @@ def test_enable_lake_formation_full_flow_with_policy_output(s3_uri, role, region # Enable Lake Formation governance with caplog.at_level(logging.WARNING, logger="sagemaker.mlops.feature_store.feature_group_manager"): - result = fg.enable_lake_formation(hybrid_access_mode_enabled=False, acknowledge_risk=True) + result = fg.enable_lake_formation( + hybrid_access_mode_enabled=False, + acknowledge_risk=True, + use_service_linked_role=False, + registration_role_arn=role, + ) # Verify all phases completed successfully assert result["s3_location_registered"] is True @@ -546,7 +567,12 @@ def test_enable_lake_formation_default_logs_recommended_policy(s3_uri, role, reg # Enable Lake Formation governance with hybrid_access_mode_enabled=False with caplog.at_level(logging.WARNING, logger="sagemaker.mlops.feature_store.feature_group_manager"): - result = fg.enable_lake_formation(hybrid_access_mode_enabled=False, acknowledge_risk=True) + result = fg.enable_lake_formation( + hybrid_access_mode_enabled=False, + acknowledge_risk=True, + use_service_linked_role=False, + registration_role_arn=role, + ) # Verify phases completed successfully assert result["s3_location_registered"] is True diff --git a/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py b/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py index c8d70fd75d..4d627059a6 100644 --- a/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py +++ b/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py @@ -645,10 +645,9 @@ def _get_checkpoint_uri_from_manifest(self) -> Optional[str]: """Get checkpoint URI from manifest.json for Nova models. Steps: - 1. Fetch S3 model artifacts from training job - 2. Construct path to manifest.json in the output directory - 3. Read and parse manifest.json - 4. Return checkpoint_s3_bucket value + 1. Build the manifest.json path from the training job output_data_config + 2. Read and parse manifest.json + 3. Return checkpoint_s3_bucket value Returns: Checkpoint URI from manifest.json. @@ -660,16 +659,15 @@ def _get_checkpoint_uri_from_manifest(self) -> Optional[str]: if not isinstance(self.model, TrainingJob): raise ValueError("Model must be a TrainingJob instance for Nova models") - s3_artifacts = self.model.model_artifacts.s3_model_artifacts - if not s3_artifacts: - raise ValueError("No S3 model artifacts found in training job") + # Nova serverless training jobs have no model_artifacts; the manifest + # lives under the job's output_data_config path. + output_data_config = getattr(self.model, "output_data_config", None) + s3_output_path = getattr(output_data_config, "s3_output_path", None) + if not s3_output_path: + raise ValueError("No S3 output path found in training job output_data_config") - logger.info("S3 artifacts path: %s", s3_artifacts) - - # Construct manifest path - # s3://bucket/path/output/model.tar.gz -> s3://bucket/path/output/output/manifest.json - parts = s3_artifacts.rstrip("/").rsplit("/", 1) - manifest_path = parts[0] + "/output/manifest.json" + output_path = s3_output_path.rstrip("/") + manifest_path = f"{output_path}/{self.model.training_job_name}/output/output/manifest.json" logger.info("Manifest path: %s", manifest_path) diff --git a/sagemaker-serve/src/sagemaker/serve/model_builder.py b/sagemaker-serve/src/sagemaker/serve/model_builder.py index 3a21f3fd72..27eaaa8fa3 100644 --- a/sagemaker-serve/src/sagemaker/serve/model_builder.py +++ b/sagemaker-serve/src/sagemaker/serve/model_builder.py @@ -4699,6 +4699,9 @@ def _resolve_nova_escrow_uri(self) -> str: training_job = self.model elif isinstance(self.model, ModelTrainer): training_job = self.model._latest_training_job + elif isinstance(self.model, BaseTrainer) and hasattr(self.model, "_latest_training_job"): + # SFTTrainer / RLVRTrainer / DPOTrainer expose the job via _latest_training_job. + training_job = self.model._latest_training_job else: raise ValueError("Nova escrow URI resolution requires a TrainingJob or ModelTrainer") diff --git a/sagemaker-serve/tests/integ/test_model_customization_deployment.py b/sagemaker-serve/tests/integ/test_model_customization_deployment.py index 5b22c16851..3a9fc33058 100644 --- a/sagemaker-serve/tests/integ/test_model_customization_deployment.py +++ b/sagemaker-serve/tests/integ/test_model_customization_deployment.py @@ -13,14 +13,24 @@ """Integration tests for ModelBuilder model customization deployment.""" from __future__ import absolute_import +import os +import json import boto3 +import time import pytest import random +import logging +from botocore.config import Config +from datetime import datetime, timezone, timedelta + + +logger = logging.getLogger(__name__) from sagemaker.core.helper.session_helper import Session # This test relies on resources in a specific region AWS_REGION = "us-west-2" +os.environ.setdefault("AWS_DEFAULT_REGION", AWS_REGION) @pytest.fixture(scope="module") @@ -135,6 +145,38 @@ def test_deploy_from_training_job(self, training_job_name, endpoint_name, cleanu adapter_ic = InferenceComponent.get(inference_component_name=adapter_name, region=AWS_REGION) assert adapter_ic is not None + # Invoke verification + time.sleep(10) # brief buffer for IC readiness + + invoke_ic_name = adapter_name if peft_type == "LORA" else f"{endpoint_name}-inference-component" + + test_payload = { + "inputs": "What is machine learning?", + "parameters": {"max_new_tokens": 32}, + } + + invoke_response = endpoint.invoke( + body=json.dumps(test_payload), + content_type="application/json", + accept="application/json", + inference_component_name=invoke_ic_name, + ) + + response_body = json.loads(invoke_response.body.read()) + + # Validate response structure + assert response_body is not None, f"Empty response from invoke on {invoke_ic_name}" + if isinstance(response_body, list): + assert len(response_body) > 0 + assert "generated_text" in response_body[0] or "generation" in response_body[0] + elif isinstance(response_body, dict): + assert ( + "generated_text" in response_body + or "generation" in response_body + or "outputs" in response_body + ) + + def test_fetch_endpoint_names_for_base_model(self, training_job_name, sagemaker_session): """Test fetching endpoint names for base model.""" from sagemaker.core.resources import TrainingJob @@ -300,9 +342,6 @@ def test_dpo_trainer_build(self, training_job_name, sagemaker_session): - Improved test assertions to work with new object structures """ -import json -import time -import pytest from sagemaker.core.resources import TrainingJob, ModelPackage from sagemaker.serve.bedrock_model_builder import BedrockModelBuilder @@ -317,7 +356,7 @@ def setup_config(self, training_job_name): from sagemaker.core.helper.session_helper import get_execution_role return { "training_job_name": training_job_name, - "region": "us-west-2", + "region": AWS_REGION, "bucket": "models-sdk-testing-pdx", "role_arn": get_execution_role() } @@ -337,29 +376,48 @@ def s3_client(self, setup_config): @pytest.fixture(scope="class") def bedrock_client(self, setup_config): - """Create Bedrock client.""" + """Create Bedrock client. Eagerly cleans up test import jobs older than 24h.""" + client = boto3.client('bedrock', region_name=setup_config["region"]) - # Cleanup existing import jobs + try: + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) jobs = client.list_model_import_jobs() for job in jobs.get('modelImportJobSummaries', []): - if job['jobName'].startswith('test-bedrock-'): + if not job['jobName'].startswith('test-bedrock-'): + continue + created = job.get('creationTime') or job.get('lastModifiedTime') + if created and created < cutoff: try: - client.stop_model_import_job(jobIdentifier=job['jobArn']) - except Exception: - pass - except Exception: - pass + status = job.get('status') + if status in ('InProgress', 'Pending'): + client.stop_model_import_job(jobIdentifier=job['jobArn']) + elif status == 'Completed' and job.get('importedModelArn'): + client.delete_imported_model( + modelIdentifier=job['importedModelArn'] + ) + except Exception as e: + logger.warning(f"Eager cleanup failed for {job['jobName']}: {e}") + except Exception as e: + logger.warning(f"Failed to list import jobs for eager cleanup: {e}") + return client @pytest.fixture(scope="class") def bedrock_runtime(self, setup_config): """Create Bedrock runtime client.""" - return boto3.client('bedrock-runtime', region_name=setup_config["region"]) + # Adding config based on: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html#handle-model-not-ready-exception + config = Config( + retries={ + 'total_max_attempts': 10, + 'mode': 'standard' + } + ) + return boto3.client('bedrock-runtime', region_name=setup_config["region"], config=config) @pytest.fixture(scope="class") def deployed_model_arn(self, training_job, bedrock_client, s3_client, setup_config): - """Deploy model and return ARN.""" + """Deploy model and return ARN. Cleans up the imported model after tests.""" self._setup_model_files(training_job, s3_client, setup_config) job_name = f"test-bedrock-{random.randint(1000, 9999)}-{int(time.time())}" @@ -374,21 +432,37 @@ def deployed_model_arn(self, training_job, bedrock_client, s3_client, setup_conf job_arn = deployment_result['jobArn'] - # Wait for completion - while True: + # Wait for completion (max 1 hour wait) + max_wait = 60 * 60 # 60 minutes + start = time.time() + while time.time() - start < max_wait: response = bedrock_client.get_model_import_job(jobIdentifier=job_arn) status = response['status'] if status in ['Completed', 'Failed']: break time.sleep(30) + else: + pytest.fail(f"Model import job timed out after {max_wait}s") - model_arn = response['importedModelName'] - return model_arn + if status == 'Failed': + pytest.fail( + f"Model import job failed: {response.get('failureMessage', 'unknown reason')}") + + model_arn = response['importedModelArn'] + + yield model_arn + + # Cleanup: delete the imported model + try: + logger.info(f"Cleaning up imported model: {model_arn}") + bedrock_client.delete_imported_model(modelIdentifier=model_arn) + logger.info(f"Successfully deleted imported model: {model_arn}") + except Exception as e: + logger.warning(f"Failed to delete imported model {model_arn}: {e}") except Exception as e: - # If there's an issue with the new sagemaker-core integration, provide helpful error info pytest.fail( - f"Deployment failed with error: {str(e)}.") + f"Bedrock deployment failed with error: {str(e)}.") def _setup_model_files(self, training_job, s3_client, setup_config): """Setup required model files for Bedrock deployment.""" @@ -505,24 +579,79 @@ def test_bedrock_job_created(self, deployed_model_arn): """Test that Bedrock import job was created successfully.""" assert deployed_model_arn is not None - def test_zzz_cleanup_deployed_model(self, bedrock_client): - """Cleanup deployed model and import jobs (runs last due to zzz prefix).""" - if hasattr(self, 'model_arn_for_cleanup'): + # Note: Below test is flaky and fails due to model not ready exception. + # Documentation recommends retries: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html#handle-model-not-ready-exception. + # TODO: Fix using provisioned throughput or better wait mechanism + @pytest.mark.slow + def test_bedrock_model_invoke(self, deployed_model_arn, bedrock_runtime): + logger.warning( + "This test is known to be flaky due to 'model not ready' exceptions from Bedrock. " + "See: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html" + "#handle-model-not-ready-exception" + ) + """Test invoking the imported Bedrock model to ensure it works end-to-end. + + Retries on failure since models can take several minutes + to become ready after import. + """ + max_retries = 2 + base_delay = 10 + + for attempt in range(max_retries): try: - bedrock_client.delete_imported_model(modelIdentifier=self.model_arn_for_cleanup) - except Exception: - pass - # Cleanup all test import jobs + response = bedrock_runtime.invoke_model( + modelId=deployed_model_arn, + body=json.dumps({ + "prompt": "What is the capital of France?", + "max_gen_len": 100, + "temperature": 0.7, + "top_p": 0.9 + }) + ) + + result = json.loads(response['body'].read().decode()) + + # Validate response structure + assert "generation" in result, "Response missing 'generation' field" + assert isinstance(result["generation"], str), "'generation' should be a string" + assert len(result["generation"]) > 0, "'generation' should not be empty" + return # Success + + except Exception as e: + if attempt < max_retries - 1: + logger.info( + f"Invoke failed (attempt {attempt + 1}/{max_retries}): {e}. " + f"Retrying in {base_delay}s..." + ) + time.sleep(base_delay) + else: + pytest.fail( + f"Invoke failed after {max_retries} attempts. " + f"Last error: {e}" + ) + + + @pytest.fixture(scope="class", autouse=True) + def cleanup_import_jobs(self, bedrock_client): + """Cleanup any leftover test import jobs after all tests in this class.""" + yield try: jobs = bedrock_client.list_model_import_jobs() for job in jobs.get('modelImportJobSummaries', []): if job['jobName'].startswith('test-bedrock-'): try: - bedrock_client.stop_model_import_job(jobIdentifier=job['jobArn']) - except Exception: - pass - except Exception: - pass + # Stop in-progress jobs + if job.get('status') in ('InProgress', 'Pending'): + bedrock_client.stop_model_import_job(jobIdentifier=job['jobArn']) + # Delete completed imported models + elif job.get('status') == 'Completed' and job.get('importedModelArn'): + bedrock_client.delete_imported_model( + modelIdentifier=job['importedModelArn'] + ) + except Exception as e: + logger.warning(f"Cleanup failed for job {job['jobName']}: {e}") + except Exception as e: + logger.warning(f"Failed to list/cleanup import jobs: {e}") def test_model_customization_workflow(training_job_name): diff --git a/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py b/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py new file mode 100644 index 0000000000..d4247774c4 --- /dev/null +++ b/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py @@ -0,0 +1,517 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Integration tests for Nova model customization deployment. + +Covers deploying a fine-tuned Nova model to SageMaker endpoints (via +ModelBuilder) and to Amazon Bedrock custom models (via BedrockModelBuilder). +""" +from __future__ import absolute_import + +import boto3 +import json +import logging +import os +import re +import time +import pytest +import random +from sagemaker.serve import ModelBuilder +from sagemaker.core.resources import TrainingJob + +logger = logging.getLogger(__name__) + +from sagemaker.core.helper.session_helper import Session + +# This test relies on resources in a specific region +AWS_REGION = "us-east-1" +os.environ.setdefault("AWS_DEFAULT_REGION", AWS_REGION) + +# Model package group shared with the Nova SFT/RLVR trainer integ tests. +MODEL_PACKAGE_GROUP = "sdk-test-finetuned-models" + +NOVA_MODEL_ID = "nova-textgeneration-lite-v2" +NOVA_INSTANCE_TYPE = "ml.g6.48xlarge" + + +def _deploy_or_skip_on_capacity(model_builder, **deploy_kwargs): + """Deploy via ModelBuilder, skipping on transient InsufficientInstanceCapacity.""" + try: + return model_builder.deploy(**deploy_kwargs) + except Exception as e: + msg = str(e) + if "InsufficientInstanceCapacity" in msg or "InsufficientInstance" in msg: + pytest.skip(f"Skipping due to transient instance capacity shortage: {msg}") + raise + + +def _latest_model_package_arn(region=AWS_REGION): + """Return the ARN of the most recently created Completed model package in + the Nova model package group, or None if the group has no usable package. + """ + sm_client = boto3.client("sagemaker", region_name=region) + packages = sm_client.list_model_packages( + ModelPackageGroupName=MODEL_PACKAGE_GROUP, + ModelApprovalStatus="Approved", + SortBy="CreationTime", + SortOrder="Descending", + MaxResults=10, + ) + summaries = packages.get("ModelPackageSummaryList", []) + if not summaries: + packages = sm_client.list_model_packages( + ModelPackageGroupName=MODEL_PACKAGE_GROUP, + SortBy="CreationTime", + SortOrder="Descending", + MaxResults=10, + ) + summaries = packages.get("ModelPackageSummaryList", []) + + for summary in summaries: + if summary.get("ModelPackageStatus") == "Completed": + return summary["ModelPackageArn"] + return None + + +@pytest.fixture(scope="module") +def sagemaker_session(): + """Create a SageMaker session with explicit region.""" + boto_session = boto3.Session(region_name=AWS_REGION) + return Session(boto_session=boto_session) + + +@pytest.fixture(scope="module") +def training_job_name(): + """A completed Nova SFT training job whose output model package still exists. + + The training job's output model package must exist because ModelBuilder.build + fetches it. Instead of picking a job and hoping its package survived resource + cleanup, we go the other way: start from a model package that currently exists + in the group and resolve the training job that produced it (encoded in the + package's escrow S3 URI). The cleaner always retains the oldest package, so + this reliably yields a usable job. + """ + sm_client = boto3.client("sagemaker", region_name=AWS_REGION) + packages = sm_client.list_model_packages( + ModelPackageGroupName=MODEL_PACKAGE_GROUP, + SortBy="CreationTime", + SortOrder="Descending", + MaxResults=20, + ).get("ModelPackageSummaryList", []) + + sft_fallback = None + for pkg in packages: + if pkg.get("ModelPackageStatus") != "Completed": + continue + detail = sm_client.describe_model_package(ModelPackageName=pkg["ModelPackageArn"]) + containers = detail.get("InferenceSpecification", {}).get("Containers", []) + if not containers: + continue + s3_uri = containers[0].get("ModelDataSource", {}).get("S3DataSource", {}).get("S3Uri", "") + # Escrow URI looks like s3://...//step_N/ + match = re.search(r"/((?:sft|rlvr)-nova-integ-[^/]+)/", s3_uri) + if not match: + continue + job_name = match.group(1) + try: + job = sm_client.describe_training_job(TrainingJobName=job_name) + except sm_client.exceptions.ClientError: + continue + if job.get("TrainingJobStatus") != "Completed": + continue + if job_name.startswith("sft-nova-integ"): + return job_name + sft_fallback = sft_fallback or job_name + + if sft_fallback: + return sft_fallback + + pytest.skip( + "No existing Nova model package with a resolvable completed training job " + "was found. Ensure the scheduled Nova SFT/RLVR workflow has run." + ) + + +@pytest.fixture(scope="module") +def model_package_arn(): + """Latest Completed Nova model package ARN from the shared group. + + Skips the dependent test if no usable model package exists yet (e.g. before + any Nova SFT/RLVR training job has registered one). + """ + arn = _latest_model_package_arn() + if arn is None: + pytest.skip( + f"No Completed model package available in {MODEL_PACKAGE_GROUP}. " + "Run a Nova SFT/RLVR training job first." + ) + return arn + + +@pytest.fixture +def endpoint_name(): + """Generate unique endpoint name.""" + return f"e2e-nova-{int(time.time())}-{random.randint(100, 10000)}" + + +@pytest.fixture(scope="module") +def cleanup_endpoints(): + """Track endpoints to cleanup after tests.""" + endpoints_to_cleanup = [] + yield endpoints_to_cleanup + + for ep_name in endpoints_to_cleanup: + try: + from sagemaker.core.resources import Endpoint + endpoint = Endpoint.get(endpoint_name=ep_name, region=AWS_REGION) + endpoint.delete() + except Exception: + pass + + +@pytest.mark.us_east_1 +class TestModelCustomizationFromTrainingJob: + """Test Nova model customization deployment from TrainingJob.""" + + def test_build_from_training_job(self, training_job_name, sagemaker_session): + """Test building a Nova model from a training job.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder( + model=training_job, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model_builder.accept_eula = True + model = model_builder.build( + model_name=f"test-model-{int(time.time())}-{random.randint(100, 10000)}", + region=AWS_REGION, + ) + + assert model is not None + assert model.model_arn is not None + assert model_builder.image_uri is not None + assert model_builder.instance_type is not None + + def test_deploy_from_training_job(self, training_job_name, endpoint_name, cleanup_endpoints, sagemaker_session): + """Test deploying a Nova model from a training job and invoking it.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder( + model=training_job, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model_builder.accept_eula = True + model_builder.build( + model_name=f"test-model-{int(time.time())}-{random.randint(100, 10000)}", + region=AWS_REGION, + ) + + endpoint = _deploy_or_skip_on_capacity( + model_builder, + endpoint_name=endpoint_name, + ) + + cleanup_endpoints.append(endpoint_name) + + assert endpoint is not None + assert endpoint.endpoint_arn is not None + assert endpoint.endpoint_status == "InService" + + time.sleep(10) # brief buffer for inference component readiness + + invoke_response = endpoint.invoke( + body=json.dumps({ + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "What is 7+7?"}]} + ] + }), + content_type="application/json", + accept="application/json", + ) + + response_body = json.loads(invoke_response.body.read()) + + assert response_body is not None, f"Empty response from invoke on {endpoint_name}" + assert isinstance(response_body, dict) + + def test_fetch_endpoint_names_for_base_model(self, training_job_name, sagemaker_session): + """Test fetching endpoint names for base model.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + endpoint_names = model_builder.fetch_endpoint_names_for_base_model() + + assert isinstance(endpoint_names, set) + + +@pytest.mark.us_east_1 +class TestModelCustomizationFromModelPackage: + """Test Nova model customization deployment via the registered model package. + + A fine-tuned Nova model is built and deployed from its TrainingJob; the + registered model package is validated along the way. + """ + + def test_build_from_model_package(self, training_job_name, sagemaker_session): + """Build a Nova model from the training job and validate its model package.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder( + model=training_job, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model_builder.accept_eula = True + model = model_builder.build(region=AWS_REGION) + + assert model is not None + assert model.model_arn is not None + assert model_builder._fetch_model_package_arn() is not None + + def test_deploy_from_model_package(self, training_job_name, endpoint_name, cleanup_endpoints, sagemaker_session): + """Deploy a Nova model via the training-job path and validate the endpoint.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder( + model=training_job, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model_builder.accept_eula = True + model_builder.build(region=AWS_REGION) + endpoint = _deploy_or_skip_on_capacity(model_builder, endpoint_name=endpoint_name) + + cleanup_endpoints.append(endpoint_name) + + assert endpoint is not None + assert endpoint.endpoint_arn is not None + + +@pytest.mark.us_east_1 +class TestInstanceTypeAutoDetection: + """Test instance type handling for Nova models.""" + + def test_instance_type_from_recipe(self, training_job_name, sagemaker_session): + """Nova requires an explicit supported instance type (no auto-detection).""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder( + model=training_job, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model_builder.accept_eula = True + model_builder.build(region=AWS_REGION) + + assert model_builder.instance_type == NOVA_INSTANCE_TYPE + assert "ml." in model_builder.instance_type + + +@pytest.mark.us_east_1 +class TestModelCustomizationDetection: + """Test model customization detection logic for Nova models.""" + + def test_is_model_customization_training_job(self, training_job_name, sagemaker_session): + """Test detection from a Nova training job.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + + assert model_builder._is_model_customization() is True + + def test_is_model_customization_model_package(self, model_package_arn, sagemaker_session): + """Test detection from a Nova model package.""" + from sagemaker.core.resources import ModelPackage + + model_package = ModelPackage.get(model_package_name=model_package_arn, region=AWS_REGION) + model_builder = ModelBuilder(model=model_package, sagemaker_session=sagemaker_session) + + assert model_builder._is_model_customization() is True + + def test_fetch_model_package_arn(self, training_job_name, sagemaker_session): + """Test fetching the model package ARN for a Nova training job.""" + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + + arn = model_builder._fetch_model_package_arn() + + assert arn is not None + assert "model-package" in arn + + +@pytest.mark.us_east_1 +class TestTrainerIntegration: + """Test ModelBuilder integration with Nova SFTTrainer and RLVRTrainer. + + Nova has no DPO recipe, so RLVR is used in place of the open-weights DPO test. + """ + + def test_sft_trainer_build(self, training_job_name, sagemaker_session): + """Test building a model from a Nova SFTTrainer object.""" + from sagemaker.train.sft_trainer import SFTTrainer + + training_job = TrainingJob.get( + training_job_name=training_job_name, region=AWS_REGION + ) + + trainer = SFTTrainer( + model=NOVA_MODEL_ID, + training_dataset="s3://dummy/data.jsonl", + accept_eula=True, + model_package_group=MODEL_PACKAGE_GROUP, + sagemaker_session=sagemaker_session, + ) + trainer._latest_training_job = training_job + + model_builder = ModelBuilder( + model=trainer, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model = model_builder.build(region=AWS_REGION) + + assert model is not None + assert model.model_arn is not None + + def test_rlvr_trainer_build(self, training_job_name, sagemaker_session): + """Test building a model from a Nova RLVRTrainer object.""" + from sagemaker.train.rlvr_trainer import RLVRTrainer + + training_job = TrainingJob.get( + training_job_name=training_job_name, region=AWS_REGION + ) + + trainer = RLVRTrainer( + model=NOVA_MODEL_ID, + training_dataset="s3://dummy/data.jsonl", + accept_eula=True, + model_package_group=MODEL_PACKAGE_GROUP, + sagemaker_session=sagemaker_session, + ) + trainer._latest_training_job = training_job + + model_builder = ModelBuilder( + model=trainer, + instance_type=NOVA_INSTANCE_TYPE, + sagemaker_session=sagemaker_session, + ) + model = model_builder.build(region=AWS_REGION) + + assert model is not None + assert model.model_arn is not None + + +@pytest.mark.us_east_1 +class TestNovaBedrockDeployment: + """Test deploying a fine-tuned Nova model to Amazon Bedrock as a custom model.""" + + @pytest.fixture(scope="class") + def role_arn(self): + """Execution role ARN with Bedrock permissions.""" + from sagemaker.core.helper.session_helper import get_execution_role + return get_execution_role() + + @pytest.fixture(scope="class") + def bedrock_client(self): + """Create a Bedrock control-plane client.""" + return boto3.client("bedrock", region_name=AWS_REGION) + + @pytest.fixture(scope="class") + def bedrock_runtime(self): + """Bedrock runtime client with retries for not-yet-ready custom models.""" + from botocore.config import Config + config = Config(retries={"total_max_attempts": 10, "mode": "standard"}) + return boto3.client("bedrock-runtime", region_name=AWS_REGION, config=config) + + @pytest.fixture(scope="class") + def deployed_nova_model(self, training_job_name, role_arn, bedrock_client): + """Deploy a Nova model to Bedrock from its TrainingJob and yield the + deployment details, cleaning up the custom model and deployment after. + """ + from sagemaker.core.resources import TrainingJob + from sagemaker.serve.bedrock_model_builder import BedrockModelBuilder + + unique = f"{int(time.time())}-{random.randint(1000, 9999)}" + custom_model_name = f"nova-integ-{unique}" + deployment_name = f"nova-integ-{unique}-deployment" + + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + bedrock_builder = BedrockModelBuilder(model=training_job) + + deployment_arn = None + model_arn = None + try: + response = bedrock_builder.deploy( + custom_model_name=custom_model_name, + deployment_name=deployment_name, + role_arn=role_arn, + ) + + assert response is not None + deployment_arn = response.get("customModelDeploymentArn") + assert deployment_arn is not None, f"No deployment ARN in response: {response}" + + deployment = bedrock_client.get_custom_model_deployment( + customModelDeploymentIdentifier=deployment_arn + ) + model_arn = deployment.get("modelArn") + + yield { + "deployment_arn": deployment_arn, + "model_arn": model_arn, + "custom_model_name": custom_model_name, + } + except Exception as e: + pytest.fail(f"Nova Bedrock deployment failed: {e}") + finally: + if deployment_arn: + try: + bedrock_client.delete_custom_model_deployment( + customModelDeploymentIdentifier=deployment_arn + ) + logger.info("Deleted custom model deployment: %s", deployment_arn) + except Exception as e: + logger.warning("Failed to delete deployment %s: %s", deployment_arn, e) + if model_arn: + try: + bedrock_client.delete_custom_model(modelIdentifier=model_arn) + logger.info("Deleted custom model: %s", model_arn) + except Exception as e: + logger.warning("Failed to delete custom model %s: %s", model_arn, e) + + def test_nova_bedrock_deployment_active(self, deployed_nova_model, bedrock_client): + """The Nova custom model deployment should be Active after deploy().""" + deployment_arn = deployed_nova_model["deployment_arn"] + deployment = bedrock_client.get_custom_model_deployment( + customModelDeploymentIdentifier=deployment_arn + ) + assert deployment.get("status") == "Active" + + @pytest.mark.slow + def test_nova_bedrock_invoke(self, deployed_nova_model, bedrock_runtime): + """Invoke the deployed Nova model on Bedrock end-to-end.""" + deployment_arn = deployed_nova_model["deployment_arn"] + + response = bedrock_runtime.invoke_model( + modelId=deployment_arn, + body=json.dumps({ + "schemaVersion": "messages-v1", + "messages": [ + {"role": "user", "content": [{"text": "What is 7+7?"}]} + ], + "inferenceConfig": {"maxTokens": 100, "temperature": 0.0, "topP": 0.9}, + }), + contentType="application/json", + accept="application/json", + ) + + result = json.loads(response["body"].read().decode()) + + assert result is not None, "Empty response from Bedrock invoke" + assert isinstance(result, dict) + text = result["output"]["message"]["content"][0]["text"] + assert isinstance(text, str) and len(text) > 0 diff --git a/sagemaker-serve/tests/unit/test_bedrock_model_builder.py b/sagemaker-serve/tests/unit/test_bedrock_model_builder.py index 6fb0e8bfb1..83af5cc099 100644 --- a/sagemaker-serve/tests/unit/test_bedrock_model_builder.py +++ b/sagemaker-serve/tests/unit/test_bedrock_model_builder.py @@ -244,10 +244,12 @@ def test_nova_non_training_job_falls_through(self): class TestGetCheckpointUri: - def _make_builder(self, s3_artifacts, manifest_body=None, s3_error=None): + def _make_builder(self, s3_output_path, manifest_body=None, s3_error=None, + job_name="myjob"): mock_job = Mock() - mock_job.model_artifacts = Mock() - mock_job.model_artifacts.s3_model_artifacts = s3_artifacts + mock_job.output_data_config = Mock() + mock_job.output_data_config.s3_output_path = s3_output_path + mock_job.training_job_name = job_name mock_s3 = Mock() # Always set exceptions.NoSuchKey to a real exception class so @@ -272,19 +274,20 @@ def _make_builder(self, s3_artifacts, manifest_body=None, s3_error=None): def test_success(self): b, s3 = self._make_builder( - "s3://bucket/path/output/model.tar.gz", + "s3://bucket/path/", manifest_body={"checkpoint_s3_bucket": "s3://bucket/ckpt/step_4"}, + job_name="myjob", ) with patch(f"{MODULE}.TrainingJob", type(b.model)): result = b._get_checkpoint_uri_from_manifest() assert result == "s3://bucket/ckpt/step_4" s3.get_object.assert_called_once_with( - Bucket="bucket", Key="path/output/output/manifest.json" + Bucket="bucket", Key="path/myjob/output/output/manifest.json" ) def test_missing_checkpoint_key(self): b, _ = self._make_builder( - "s3://bucket/path/output/model.tar.gz", + "s3://bucket/path/", manifest_body={"other_key": "value"}, ) with patch(f"{MODULE}.TrainingJob", type(b.model)): @@ -293,7 +296,7 @@ def test_missing_checkpoint_key(self): def test_manifest_not_found(self): err = ClientError({"Error": {"Code": "NoSuchKey"}}, "GetObject") - b, _ = self._make_builder("s3://bucket/path/output/model.tar.gz", s3_error=err) + b, _ = self._make_builder("s3://bucket/path/", s3_error=err) with patch(f"{MODULE}.TrainingJob", type(b.model)): with pytest.raises(ValueError, match="manifest.json not found"): b._get_checkpoint_uri_from_manifest() @@ -304,16 +307,17 @@ def test_not_training_job_raises(self): with pytest.raises(ValueError, match="TrainingJob"): b._get_checkpoint_uri_from_manifest() - def test_no_s3_artifacts_raises(self): + def test_no_s3_output_path_raises(self): b, _ = self._make_builder(None) with patch(f"{MODULE}.TrainingJob", type(b.model)): - with pytest.raises(ValueError, match="No S3 model artifacts"): + with pytest.raises(ValueError, match="No S3 output path"): b._get_checkpoint_uri_from_manifest() def test_invalid_json_raises(self): mock_job = Mock() - mock_job.model_artifacts = Mock() - mock_job.model_artifacts.s3_model_artifacts = "s3://bucket/path/output/m.tar.gz" + mock_job.output_data_config = Mock() + mock_job.output_data_config.s3_output_path = "s3://bucket/path/" + mock_job.training_job_name = "myjob" body = Mock() body.read.return_value = b"not-json" diff --git a/sagemaker-serve/tox.ini b/sagemaker-serve/tox.ini index 99cc473588..258cef4112 100644 --- a/sagemaker-serve/tox.ini +++ b/sagemaker-serve/tox.ini @@ -63,6 +63,8 @@ markers = release image_uris_unit_test timeout: mark a test as a timeout. + gpu_intensive: mark a test as GPU resource intensive (runs on scheduled CI, not PR checks). + us_east_1: mark a test that requires us-east-1 test account credentials (784379639078). [testenv] setenv =