From 96f42009c6bb09e64117897e4efe9f63e8573b5e Mon Sep 17 00:00:00 2001 From: qxz3ea6 Date: Wed, 15 Apr 2026 16:29:47 +0800 Subject: [PATCH 1/2] Fix PySparkProcessor V3 ProcessingInput construction --- .../src/sagemaker/core/spark/processing.py | 38 ++++++--- .../tests/unit/spark/test_processing.py | 79 +++++++++++++++++++ 2 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 sagemaker-core/tests/unit/spark/test_processing.py diff --git a/sagemaker-core/src/sagemaker/core/spark/processing.py b/sagemaker-core/src/sagemaker/core/spark/processing.py index 82cdef954c..e49a9764c4 100644 --- a/sagemaker-core/src/sagemaker/core/spark/processing.py +++ b/sagemaker-core/src/sagemaker/core/spark/processing.py @@ -37,7 +37,12 @@ from sagemaker.core import image_uris from sagemaker.core import s3 from sagemaker.core.local.image import _ecr_login_if_needed, _pull_image -from sagemaker.core.processing import ProcessingInput, ProcessingOutput, ScriptProcessor +from sagemaker.core.processing import ( + ProcessingInput, + ProcessingOutput, + ProcessingS3Input, + ScriptProcessor, +) from sagemaker.core.s3 import S3Uploader from sagemaker.core.helper.session_helper import Session from sagemaker.core.network import NetworkConfig @@ -52,6 +57,18 @@ logger = logging.getLogger(__name__) +def _make_s3_processing_input(input_name: str, s3_uri: str, local_path: str) -> ProcessingInput: + """Build a V3-compatible ProcessingInput backed by an S3 channel.""" + return ProcessingInput( + input_name=input_name, + s3_input=ProcessingS3Input( + s3_uri=s3_uri, + s3_data_type="S3Prefix", + local_path=local_path, + ), + ) + + class _SparkProcessorBase(ScriptProcessor): """Handles Amazon SageMaker processing tasks for jobs using Spark. @@ -404,10 +421,10 @@ def _stage_configuration(self, configuration): sagemaker_session=self.sagemaker_session, ) - conf_input = ProcessingInput( - source=s3_uri, - destination=f"{self._conf_container_base_path}{self._conf_container_input_name}", - input_name=_SparkProcessorBase._conf_container_input_name, + conf_input = _make_s3_processing_input( + input_name=self._conf_container_input_name, + s3_uri=s3_uri, + local_path=f"{self._conf_container_base_path}{self._conf_container_input_name}", ) return conf_input @@ -505,15 +522,16 @@ def _stage_submit_deps(self, submit_deps, input_channel_name): # them to the Spark container and form the spark-submit option from a # combination of S3 URIs and container's local input path if use_input_channel: - input_channel = ProcessingInput( - source=input_channel_s3_uri, - destination=f"{self._conf_container_base_path}{input_channel_name}", + local_path = f"{self._conf_container_base_path}{input_channel_name}" + input_channel = _make_s3_processing_input( input_name=input_channel_name, + s3_uri=input_channel_s3_uri, + local_path=local_path, ) spark_opt = ( - Join(on=",", values=spark_opt_s3_uris + [input_channel.destination]) + Join(on=",", values=spark_opt_s3_uris + [local_path]) if spark_opt_s3_uris_has_pipeline_var - else ",".join(spark_opt_s3_uris + [input_channel.destination]) + else ",".join(spark_opt_s3_uris + [local_path]) ) # If no local files were uploaded, form the spark-submit option from a list of S3 URIs else: diff --git a/sagemaker-core/tests/unit/spark/test_processing.py b/sagemaker-core/tests/unit/spark/test_processing.py new file mode 100644 index 0000000000..dd9f692acf --- /dev/null +++ b/sagemaker-core/tests/unit/spark/test_processing.py @@ -0,0 +1,79 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +from unittest.mock import Mock, patch + +import pytest + +from sagemaker.core.spark.processing import PySparkProcessor + + +@pytest.fixture +def mock_session(): + session = Mock() + session.boto_session = Mock() + session.boto_session.region_name = "us-west-2" + session.boto_region_name = "us-west-2" + session.sagemaker_client = Mock() + session.default_bucket = Mock(return_value="test-bucket") + session.default_bucket_prefix = "sagemaker" + session.expand_role = Mock(side_effect=lambda x: x) + session.sagemaker_config = {} + return session + + +def _make_processor(mock_session): + processor = PySparkProcessor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + return processor + + +class TestPySparkProcessorV3ProcessingInputs: + @patch("sagemaker.core.spark.processing.S3Uploader.upload_string_as_file_body") + def test_stage_configuration_builds_v3_processing_input(self, mock_upload, mock_session): + processor = _make_processor(mock_session) + + config_input = processor._stage_configuration( + [{"Classification": "spark-defaults", "Properties": {"spark.app.name": "test"}}] + ) + + mock_upload.assert_called_once() + assert config_input.input_name == processor._conf_container_input_name + assert config_input.s3_input.s3_uri == ( + "s3://test-bucket/sagemaker/test-job/input/conf/configuration.json" + ) + assert config_input.s3_input.local_path == "/opt/ml/processing/input/conf" + + @patch("sagemaker.core.spark.processing.S3Uploader.upload") + def test_stage_submit_deps_builds_v3_processing_input_for_local_dependencies( + self, mock_upload, mock_session, tmp_path + ): + processor = _make_processor(mock_session) + dep_file = tmp_path / "dep.py" + dep_file.write_text("print('dep')", encoding="utf-8") + + input_channel, spark_opt = processor._stage_submit_deps( + [str(dep_file)], processor._submit_py_files_input_channel_name + ) + + mock_upload.assert_called_once() + assert input_channel.input_name == processor._submit_py_files_input_channel_name + assert input_channel.s3_input.s3_uri == "s3://test-bucket/sagemaker/test-job/input/py-files" + assert input_channel.s3_input.local_path == "/opt/ml/processing/input/py-files" + assert spark_opt == "/opt/ml/processing/input/py-files" From fc9aff4cb198020637705d632446d3dee31fd9de Mon Sep 17 00:00:00 2001 From: qxz3ea6 Date: Mon, 8 Jun 2026 10:21:25 +0800 Subject: [PATCH 2/2] Fix V3 processing shapes in spark and model monitor --- .../core/model_monitor/model_monitoring.py | 7 +++++- .../src/sagemaker/core/spark/processing.py | 11 +++++--- .../tests/unit/spark/test_processing.py | 20 +++++++++++++++ .../tests/unit/test_model_monitoring.py | 25 +++++++++++++++++++ 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/model_monitor/model_monitoring.py b/sagemaker-core/src/sagemaker/core/model_monitor/model_monitoring.py index be9e5619c7..f402c29c83 100644 --- a/sagemaker-core/src/sagemaker/core/model_monitor/model_monitoring.py +++ b/sagemaker-core/src/sagemaker/core/model_monitor/model_monitoring.py @@ -1453,7 +1453,12 @@ def _normalize_processing_output(self, output=None): "output", ) output = ProcessingOutput( - source=output, destination=s3_uri, output_name=_DEFAULT_OUTPUT_NAME + output_name=_DEFAULT_OUTPUT_NAME, + s3_output=ProcessingS3Output( + s3_uri=s3_uri, + local_path=output, + s3_upload_mode="EndOfJob", + ), ) return output diff --git a/sagemaker-core/src/sagemaker/core/spark/processing.py b/sagemaker-core/src/sagemaker/core/spark/processing.py index e49a9764c4..56e0d63f6f 100644 --- a/sagemaker-core/src/sagemaker/core/spark/processing.py +++ b/sagemaker-core/src/sagemaker/core/spark/processing.py @@ -43,6 +43,7 @@ ProcessingS3Input, ScriptProcessor, ) +from sagemaker.core.shapes import ProcessingS3Output from sagemaker.core.s3 import S3Uploader from sagemaker.core.helper.session_helper import Session from sagemaker.core.network import NetworkConfig @@ -92,6 +93,7 @@ class _SparkProcessorBase(ScriptProcessor): _history_server_port = "15050" _history_server_url_suffix = f"/proxy/{_history_server_port}" _spark_event_log_default_local_path = "/opt/ml/processing/spark-events/" + _spark_event_logs_output_name = "spark-event-logs" def __init__( self, @@ -308,9 +310,12 @@ def _extend_processing_args(self, inputs, outputs, **kwargs): ) output = ProcessingOutput( - source=_SparkProcessorBase._spark_event_log_default_local_path, - destination=spark_event_logs_s3_uri, - s3_upload_mode="Continuous", + output_name=self._spark_event_logs_output_name, + s3_output=ProcessingS3Output( + local_path=_SparkProcessorBase._spark_event_log_default_local_path, + s3_uri=spark_event_logs_s3_uri, + s3_upload_mode="Continuous", + ), ) extended_outputs.append(output) diff --git a/sagemaker-core/tests/unit/spark/test_processing.py b/sagemaker-core/tests/unit/spark/test_processing.py index dd9f692acf..3c22b2e051 100644 --- a/sagemaker-core/tests/unit/spark/test_processing.py +++ b/sagemaker-core/tests/unit/spark/test_processing.py @@ -14,6 +14,7 @@ from unittest.mock import Mock, patch import pytest +import tempfile from sagemaker.core.spark.processing import PySparkProcessor @@ -45,6 +46,24 @@ def _make_processor(mock_session): class TestPySparkProcessorV3ProcessingInputs: + def test_extend_processing_args_builds_v3_processing_output_for_spark_event_logs( + self, mock_session + ): + processor = _make_processor(mock_session) + + _, extended_outputs = processor._extend_processing_args( + inputs=None, + outputs=None, + spark_event_logs_s3_uri="s3://bucket/spark-logs", + ) + + assert len(extended_outputs) == 1 + output = extended_outputs[0] + assert output.output_name == "spark-event-logs" + assert output.s3_output.local_path == "/opt/ml/processing/spark-events/" + assert output.s3_output.s3_uri == "s3://bucket/spark-logs" + assert output.s3_output.s3_upload_mode == "Continuous" + @patch("sagemaker.core.spark.processing.S3Uploader.upload_string_as_file_body") def test_stage_configuration_builds_v3_processing_input(self, mock_upload, mock_session): processor = _make_processor(mock_session) @@ -59,6 +78,7 @@ def test_stage_configuration_builds_v3_processing_input(self, mock_upload, mock_ "s3://test-bucket/sagemaker/test-job/input/conf/configuration.json" ) assert config_input.s3_input.local_path == "/opt/ml/processing/input/conf" + assert config_input.s3_input.s3_data_type == "S3Prefix" @patch("sagemaker.core.spark.processing.S3Uploader.upload") def test_stage_submit_deps_builds_v3_processing_input_for_local_dependencies( diff --git a/sagemaker-core/tests/unit/test_model_monitoring.py b/sagemaker-core/tests/unit/test_model_monitoring.py index bc4725f950..30fb682eb7 100644 --- a/sagemaker-core/tests/unit/test_model_monitoring.py +++ b/sagemaker-core/tests/unit/test_model_monitoring.py @@ -306,6 +306,31 @@ def test_generate_baselining_job_name_with_custom_name(self, mock_session, test_ job_name = monitor._generate_baselining_job_name(job_name="custom-job") assert job_name == "custom-job" + def test_normalize_processing_output_string_builds_v3_processing_output( + self, mock_session, test_role + ): + with ( + patch( + "sagemaker.core.model_monitor.model_monitoring.resolve_value_from_config", + side_effect=lambda x, *args, **kwargs: x, + ), + patch( + "sagemaker.core.model_monitor.model_monitoring.resolve_class_attribute_from_config", + return_value=None, + ), + ): + monitor = ModelMonitor( + role=test_role, image_uri="test-image", sagemaker_session=mock_session + ) + monitor.latest_baselining_job_name = "baseline-job" + + output = monitor._normalize_processing_output("/opt/ml/processing/output") + + assert output.output_name == "monitoring_output" + assert output.s3_output.local_path == "/opt/ml/processing/output" + assert output.s3_output.s3_uri == "s3://test-bucket/test-prefix/baseline-job/output" + assert output.s3_output.s3_upload_mode == "EndOfJob" + def test_start_monitoring_schedule(self, mock_session, test_role): with ( patch(