1- from kfp .dsl import Input , Model , component , Artifact
2- from ml_pipelines_kfp .iris_xgboost .constants import IMAGE_NAME
1+ from kfp .dsl import Input , Model , component , Artifact , Output
2+ from ml_pipelines_kfp .iris_xgboost .constants import IMAGE_NAME , FASTAPI_IMAGE_NAME , BUCKET , PROJECT_ID
33
44@component (
55 base_image = IMAGE_NAME ,
66 packages_to_install = [
77 "google-cloud-aiplatform>=1.59.0" ,
8- "pandas>=2.2.2 " ,
9- "scikit-learn>=1.5.1 " ,
10- "numpy >=2.0 .0" ,
8+ "google-cloud-run>=0.10.0 " ,
9+ "google-cloud-storage>=2.10.0 " ,
10+ "requests >=2.31 .0" ,
1111 "joblib>=1.4.2"
1212 ]
1313)
14- def deploy_model (
14+ def deploy_blessed_model_to_fastapi (
1515 project_id : str ,
1616 location : str ,
17- model : Input [Model ],
18- vertex_model : Input [Artifact ],
19- endpoint_name : str ,
20- model_name : str
17+ model_name : str ,
18+ service_name : str ,
19+ service_endpoint : Output [Artifact ]
2120):
22- from google .cloud import aiplatform , aiplatform_v1
23- import pandas
24- import numpy
21+ from google .cloud import aiplatform , aiplatform_v1 , run_v2 , storage
2522 import joblib
23+ import tempfile
24+ import os
25+ import requests
26+ import time
2627
27- print (f"Pandas version: { pandas .__version__ } " )
28- print (f"NumPy version: { numpy .__version__ } " )
29- print (f"Joblib version: { joblib .__version__ } " )
28+ print (f"Starting FastAPI deployment for blessed model: { model_name } " )
29+ print (f"Service name: { service_name } " )
3030
31+ # 1. Initialize Vertex AI and find blessed model
3132 aiplatform .init (project = project_id , location = location )
32-
33+
3334 client = aiplatform_v1 .ModelServiceClient (
3435 client_options = {"api_endpoint" : f"{ location } -aiplatform.googleapis.com" }
3536 )
@@ -38,37 +39,187 @@ def deploy_model(
3839 "filter" : f"display_name={ model_name } "
3940 }
4041
41- parent_models = list (client .list_models (request = request ))
42- print (f"Parent models: { parent_models } " )
43-
44- parent_model = parent_models [0 ] if parent_models else None
45- if not parent_model :
46- raise ValueError ("No parent model found with the specified name." )
47-
48- model_name = parent_model .name .split ('/' )[- 1 ]
49- model = aiplatform .Model (model_name = model_name )
50-
51- print (f"Model type: { type (model )} " )
52- print (f"Model details: { model } " )
53-
54- endpoint = aiplatform .Endpoint .create (display_name = endpoint_name )
55- print (f"Endpoint created: { endpoint } " )
56-
57- endpoint .deploy (
58- model = model ,
59- machine_type = "n1-standard-2" ,
60- traffic_percentage = 100
61- )
62-
63- print (f"Model deployed to endpoint { endpoint .display_name } " )
64-
65- print ("Cleaning up legacy deployments with no traffic assigned..." )
66- traffic_split = endpoint .traffic_split
67- for deployed_model in endpoint .list_models ():
68- print (f"Checking deployed model: { deployed_model .id } @{ deployed_model .model_version_id } " )
69- if deployed_model .id not in traffic_split or traffic_split [deployed_model .id ] == 0 :
70- try :
71- endpoint .undeploy (deployed_model .id )
72- print (f"Successfully undeployed model { deployed_model .id } @{ deployed_model .model_version_id } " )
73- except Exception as e :
74- print (f"Failed to undeploy model { deployed_model .id } @{ deployed_model .model_version_id } : { e } " )
42+ models = list (client .list_models (request = request ))
43+ blessed_model = None
44+
45+ print (f"Found { len (models )} models with name { model_name } " )
46+
47+ for model in models :
48+ print (f"Model: { model .name } , Aliases: { list (model .version_aliases )} " )
49+ if "blessed" in model .version_aliases :
50+ blessed_model = model
51+ break
52+
53+ if not blessed_model :
54+ raise ValueError (f"No blessed version found for model { model_name } . Available models: { [(m .name , list (m .version_aliases )) for m in models ]} " )
55+
56+ print (f"Found blessed model: { blessed_model .name } " )
57+ print (f"Model URI: { blessed_model .artifact_uri } " )
58+
59+ # 2. Download joblib model from blessed version
60+ gcs_uri = blessed_model .artifact_uri
61+ if not gcs_uri .startswith ('gs://' ):
62+ raise ValueError (f"Expected GCS URI, got: { gcs_uri } " )
63+
64+ bucket_name = gcs_uri .replace ('gs://' , '' ).split ('/' )[0 ]
65+ model_path = '/' .join (gcs_uri .replace ('gs://' , '' ).split ('/' )[1 :])
66+
67+ print (f"Downloading model from gs://{ bucket_name } /{ model_path } " )
68+
69+ storage_client = storage .Client ()
70+ bucket = storage_client .bucket (bucket_name )
71+
72+ # Download and validate the model
73+ model_blob_path = f"{ model_path } /model.joblib"
74+ blob = bucket .blob (model_blob_path )
75+
76+ if not blob .exists ():
77+ raise ValueError (f"Model file not found at gs://{ bucket_name } /{ model_blob_path } " )
78+
79+ with tempfile .NamedTemporaryFile (suffix = '.joblib' , delete = False ) as temp_file :
80+ blob .download_to_filename (temp_file .name )
81+ local_model_path = temp_file .name
82+
83+ print (f"Downloaded model to: { local_model_path } " )
84+
85+ # 3. Validate model can be loaded
86+ try :
87+ model_obj = joblib .load (local_model_path )
88+ print (f"Model type: { type (model_obj )} " )
89+ print (f"Model validation successful" )
90+ except Exception as e :
91+ os .unlink (local_model_path )
92+ raise ValueError (f"Model validation failed: { e } " )
93+
94+ # 4. Copy model to standard deployment location
95+ deployment_model_path = f"deployed-models/{ service_name } /model.joblib"
96+ deployment_blob = bucket .blob (deployment_model_path )
97+
98+ print (f"Copying model to deployment location: gs://{ bucket_name } /{ deployment_model_path } " )
99+ deployment_blob .upload_from_filename (local_model_path )
100+
101+ model_gcs_path = f"gs://{ bucket_name } /{ deployment_model_path } "
102+ print (f"Model available at: { model_gcs_path } " )
103+
104+ # 5. Deploy to Cloud Run using pre-built generic image
105+ print (f"Deploying to Cloud Run service: { service_name } " )
106+
107+ run_client = run_v2 .ServicesClient ()
108+
109+ # Use pre-built generic FastAPI image from CI/CD
110+ generic_image = FASTAPI_IMAGE_NAME
111+
112+ service_config = {
113+ "parent" : f"projects/{ project_id } /locations/{ location } " ,
114+ "service_id" : service_name ,
115+ "service" : {
116+ "template" : {
117+ "containers" : [{
118+ "image" : generic_image ,
119+ "ports" : [{"container_port" : 8080 }],
120+ "resources" : {
121+ "limits" : {
122+ "memory" : "2Gi" ,
123+ "cpu" : "2"
124+ }
125+ },
126+ "env" : [
127+ {"name" : "PORT" , "value" : "8080" },
128+ {"name" : "MODEL_GCS_PATH" , "value" : model_gcs_path },
129+ {"name" : "MODEL_NAME" , "value" : model_name },
130+ {"name" : "GOOGLE_CLOUD_PROJECT" , "value" : project_id }
131+ ]
132+ }],
133+ "scaling" : {
134+ "min_instance_count" : 0 ,
135+ "max_instance_count" : 10
136+ },
137+ "service_account" : f"kfp-mlops@{ project_id } .iam.gserviceaccount.com"
138+ },
139+ "traffic" : [{"percent" : 100 , "type" : "TRAFFIC_TARGET_ALLOCATION_TYPE_LATEST" }]
140+ }
141+ }
142+
143+ try :
144+ # Check if service already exists
145+ try :
146+ existing_service = run_client .get_service (
147+ name = f"projects/{ project_id } /locations/{ location } /services/{ service_name } "
148+ )
149+ print (f"Service { service_name } already exists, updating..." )
150+
151+ # Update existing service
152+ update_service = service_config ["service" ]
153+ update_service ["name" ] = existing_service .name
154+
155+ operation = run_client .update_service (service = update_service )
156+ result = operation .result (timeout = 600 )
157+
158+ except Exception as get_error :
159+ print (f"Service doesn't exist, creating new one: { get_error } " )
160+ # Create new service
161+ operation = run_client .create_service (request = service_config )
162+ result = operation .result (timeout = 600 )
163+
164+ service_url = result .uri
165+ print (f"Service deployed successfully to: { service_url } " )
166+
167+ # 6. Test deployment
168+ print ("Testing deployment..." )
169+ time .sleep (30 ) # Wait for service to be ready
170+
171+ test_payload = {
172+ "instances" : [
173+ {"sepal_length" : 5.1 , "sepal_width" : 3.5 , "petal_length" : 1.4 , "petal_width" : 0.2 }
174+ ]
175+ }
176+
177+ try :
178+ # Test health endpoint first
179+ health_response = requests .get (f"{ service_url } /health" , timeout = 30 )
180+ print (f"Health check status: { health_response .status_code } " )
181+ if health_response .status_code == 200 :
182+ print (f"Health check response: { health_response .json ()} " )
183+
184+ # Test prediction endpoint
185+ response = requests .post (
186+ f"{ service_url } /predict" ,
187+ json = test_payload ,
188+ timeout = 30
189+ )
190+ if response .status_code == 200 :
191+ print ("Deployment test successful!" )
192+ print (f"Prediction: { response .json ()} " )
193+ else :
194+ print (f"Prediction test failed: { response .status_code } - { response .text } " )
195+
196+ except Exception as test_e :
197+ print (f"Test request failed: { test_e } " )
198+
199+ # 7. Set output artifact
200+ service_endpoint .uri = service_url
201+ service_endpoint .metadata = {
202+ "service_name" : service_name ,
203+ "model_version" : blessed_model .version_id ,
204+ "model_name" : model_name ,
205+ "deployment_type" : "cloud_run_fastapi" ,
206+ "model_gcs_path" : model_gcs_path ,
207+ "image" : generic_image
208+ }
209+
210+ print (f"Deployment completed successfully!" )
211+ print (f"Service URL: { service_url } " )
212+ print (f"Health check: { service_url } /health" )
213+ print (f"Prediction endpoint: { service_url } /predict" )
214+ print (f"Vertex AI compatible endpoint: { service_url } /v1/models/model:predict" )
215+
216+ except Exception as deploy_e :
217+ print (f"Cloud Run deployment failed: { deploy_e } " )
218+ raise
219+ finally :
220+ # 8. Cleanup temporary file
221+ try :
222+ os .unlink (local_model_path )
223+ print ("Temporary model file cleaned up" )
224+ except Exception as cleanup_e :
225+ print (f"Cleanup warning: { cleanup_e } " )
0 commit comments