2828import com .google .common .base .Splitter ;
2929import com .google .common .base .VerifyException ;
3030import com .google .common .collect .ImmutableList ;
31+ import com .google .common .collect .Streams ;
3132import com .google .genai .types .FileData ;
3233import com .google .genai .types .Part ;
3334import io .reactivex .rxjava3 .core .Completable ;
3435import io .reactivex .rxjava3 .core .Maybe ;
3536import io .reactivex .rxjava3 .core .Single ;
36- import java .util .ArrayList ;
3737import java .util .HashSet ;
3838import java .util .List ;
3939import java .util .Optional ;
@@ -135,22 +135,25 @@ public Maybe<Part> loadArtifact(
135135 .flatMapMaybe (
136136 versions -> versions .isEmpty () ? Maybe .empty () : Maybe .just (max (versions ))))
137137 .flatMap (
138- versionToLoad -> {
139- String blobName = getBlobName (appName , userId , sessionId , filename , versionToLoad );
140- BlobId blobId = BlobId .of (bucketName , blobName );
138+ versionToLoad ->
139+ Maybe .fromCallable (
140+ () -> {
141+ String blobName =
142+ getBlobName (appName , userId , sessionId , filename , versionToLoad );
143+ BlobId blobId = BlobId .of (bucketName , blobName );
141144
142- try {
143- Blob blob = storageClient .get (blobId );
144- if (blob == null || !blob .exists ()) {
145- return Maybe . empty () ;
146- }
147- byte [] data = blob .getContent ();
148- String mimeType = blob .getContentType ();
149- return Maybe . just ( Part .fromBytes (data , mimeType ) );
150- } catch (StorageException e ) {
151- return Maybe . empty () ;
152- }
153- } );
145+ try {
146+ Blob blob = storageClient .get (blobId );
147+ if (blob == null || !blob .exists ()) {
148+ return null ;
149+ }
150+ byte [] data = blob .getContent ();
151+ String mimeType = blob .getContentType ();
152+ return Part .fromBytes (data , mimeType );
153+ } catch (StorageException e ) {
154+ return null ;
155+ }
156+ }) );
154157 }
155158
156159 /**
@@ -164,34 +167,38 @@ public Maybe<Part> loadArtifact(
164167 @ Override
165168 public Single <ListArtifactsResponse > listArtifactKeys (
166169 String appName , String userId , String sessionId ) {
167- Set <String > filenames = new HashSet <>();
170+ return Single .fromCallable (
171+ () -> {
172+ Set <String > filenames = new HashSet <>();
168173
169- // List session-specific files
170- String sessionPrefix = String .format ("%s/%s/%s/" , appName , userId , sessionId );
171- try {
172- for (Blob blob :
173- storageClient .list (bucketName , BlobListOption .prefix (sessionPrefix )).iterateAll ()) {
174- List <String > parts = Splitter .on ('/' ).splitToList (blob .getName ());
175- filenames .add (parts .get (3 )); // appName/userId/sessionId/filename/version
176- }
177- } catch (StorageException e ) {
178- throw new VerifyException ("Failed to list session artifacts from GCS" , e );
179- }
174+ // List session-specific files
175+ String sessionPrefix = String .format ("%s/%s/%s/" , appName , userId , sessionId );
176+ try {
177+ for (Blob blob :
178+ storageClient .list (bucketName , BlobListOption .prefix (sessionPrefix )).iterateAll ()) {
179+ List <String > parts = Splitter .on ('/' ).splitToList (blob .getName ());
180+ filenames .add (parts .get (3 )); // appName/userId/sessionId/filename/version
181+ }
182+ } catch (StorageException e ) {
183+ throw new VerifyException ("Failed to list session artifacts from GCS" , e );
184+ }
180185
181- // List user-namespace files
182- String userPrefix = String .format ("%s/%s/user/" , appName , userId );
183- try {
184- for (Blob blob :
185- storageClient .list (bucketName , BlobListOption .prefix (userPrefix )).iterateAll ()) {
186- List <String > parts = Splitter .on ('/' ).splitToList (blob .getName ());
187- filenames .add (parts .get (3 )); // appName/userId/user/filename/version
188- }
189- } catch (StorageException e ) {
190- throw new VerifyException ("Failed to list user artifacts from GCS" , e );
191- }
186+ // List user-namespace files
187+ String userPrefix = String .format ("%s/%s/user/" , appName , userId );
188+ try {
189+ for (Blob blob :
190+ storageClient .list (bucketName , BlobListOption .prefix (userPrefix )).iterateAll ()) {
191+ List <String > parts = Splitter .on ('/' ).splitToList (blob .getName ());
192+ filenames .add (parts .get (3 )); // appName/userId/user/filename/version
193+ }
194+ } catch (StorageException e ) {
195+ throw new VerifyException ("Failed to list user artifacts from GCS" , e );
196+ }
192197
193- return Single .just (
194- ListArtifactsResponse .builder ().filenames (ImmutableList .sortedCopyOf (filenames )).build ());
198+ return ListArtifactsResponse .builder ()
199+ .filenames (ImmutableList .sortedCopyOf (filenames ))
200+ .build ();
201+ });
195202 }
196203
197204 /**
@@ -206,22 +213,30 @@ public Single<ListArtifactsResponse> listArtifactKeys(
206213 @ Override
207214 public Completable deleteArtifact (
208215 String appName , String userId , String sessionId , String filename ) {
209- ImmutableList <Integer > versions =
210- listVersions (appName , userId , sessionId , filename ).blockingGet ();
211- List <BlobId > blobIdsToDelete = new ArrayList <>();
212- for (int version : versions ) {
213- String blobName = getBlobName (appName , userId , sessionId , filename , version );
214- blobIdsToDelete .add (BlobId .of (bucketName , blobName ));
215- }
216+ return listVersions (appName , userId , sessionId , filename )
217+ .flatMapCompletable (
218+ versions -> {
219+ if (versions .isEmpty ()) {
220+ return Completable .complete ();
221+ }
222+ ImmutableList <BlobId > blobIdsToDelete =
223+ versions .stream ()
224+ .map (
225+ version ->
226+ BlobId .of (
227+ bucketName ,
228+ getBlobName (appName , userId , sessionId , filename , version )))
229+ .collect (ImmutableList .toImmutableList ());
216230
217- if (!blobIdsToDelete .isEmpty ()) {
218- try {
219- var unused = storageClient .delete (blobIdsToDelete );
220- } catch (StorageException e ) {
221- throw new VerifyException ("Failed to delete artifact versions from GCS" , e );
222- }
223- }
224- return Completable .complete ();
231+ return Completable .fromAction (
232+ () -> {
233+ try {
234+ var unused = storageClient .delete (blobIdsToDelete );
235+ } catch (StorageException e ) {
236+ throw new VerifyException ("Failed to delete artifact versions from GCS" , e );
237+ }
238+ });
239+ });
225240 }
226241
227242 /**
@@ -236,20 +251,29 @@ public Completable deleteArtifact(
236251 @ Override
237252 public Single <ImmutableList <Integer >> listVersions (
238253 String appName , String userId , String sessionId , String filename ) {
239- String prefix = getBlobPrefix (appName , userId , sessionId , filename );
240- List <Integer > versions = new ArrayList <>();
241- try {
242- for (Blob blob : storageClient .list (bucketName , BlobListOption .prefix (prefix )).iterateAll ()) {
243- String name = blob .getName ();
244- int versionDelimiterIndex = name .lastIndexOf ('/' ); // immediately before the version number
245- if (versionDelimiterIndex != -1 && versionDelimiterIndex < name .length () - 1 ) {
246- versions .add (Integer .parseInt (name .substring (versionDelimiterIndex + 1 )));
247- }
248- }
249- return Single .just (ImmutableList .sortedCopyOf (versions ));
250- } catch (StorageException e ) {
251- return Single .just (ImmutableList .of ());
252- }
254+ return Single .fromCallable (
255+ () -> {
256+ String prefix = getBlobPrefix (appName , userId , sessionId , filename );
257+ try {
258+ return Streams .stream (
259+ storageClient .list (bucketName , BlobListOption .prefix (prefix )).iterateAll ())
260+ .map (Blob ::getName )
261+ .map (
262+ name -> {
263+ int versionDelimiterIndex = name .lastIndexOf ('/' );
264+ return versionDelimiterIndex != -1
265+ && versionDelimiterIndex < name .length () - 1
266+ ? Optional .of (name .substring (versionDelimiterIndex + 1 ))
267+ : Optional .<String >empty ();
268+ })
269+ .flatMap (Optional ::stream )
270+ .map (Integer ::parseInt )
271+ .sorted ()
272+ .collect (ImmutableList .toImmutableList ());
273+ } catch (StorageException e ) {
274+ return ImmutableList .of ();
275+ }
276+ });
253277 }
254278
255279 @ Override
@@ -291,35 +315,39 @@ private Single<SaveResult> saveArtifactAndReturnBlob(
291315 String appName , String userId , String sessionId , String filename , Part artifact ) {
292316 return listVersions (appName , userId , sessionId , filename )
293317 .map (versions -> versions .isEmpty () ? 0 : max (versions ) + 1 )
294- .map (
295- nextVersion -> {
296- if (artifact .inlineData ().isEmpty ()) {
297- throw new IllegalArgumentException ("Saveable artifact must have inline data." );
298- }
318+ .flatMap (
319+ nextVersion ->
320+ Single .fromCallable (
321+ () -> {
322+ if (artifact .inlineData ().isEmpty ()) {
323+ throw new IllegalArgumentException (
324+ "Saveable artifact must have inline data." );
325+ }
299326
300- String blobName = getBlobName (appName , userId , sessionId , filename , nextVersion );
301- BlobId blobId = BlobId .of (bucketName , blobName );
327+ String blobName =
328+ getBlobName (appName , userId , sessionId , filename , nextVersion );
329+ BlobId blobId = BlobId .of (bucketName , blobName );
302330
303- BlobInfo blobInfo =
304- BlobInfo .newBuilder (blobId )
305- .setContentType (artifact .inlineData ().get ().mimeType ().orElse (null ))
306- .build ();
331+ BlobInfo blobInfo =
332+ BlobInfo .newBuilder (blobId )
333+ .setContentType (artifact .inlineData ().get ().mimeType ().orElse (null ))
334+ .build ();
307335
308- try {
309- byte [] dataToSave =
310- artifact
311- .inlineData ()
312- .get ()
313- .data ()
314- .orElseThrow (
315- () ->
316- new IllegalArgumentException (
317- "Saveable artifact data must be non-empty." ));
318- Blob blob = storageClient .create (blobInfo , dataToSave );
319- return SaveResult .create (blob , nextVersion );
320- } catch (StorageException e ) {
321- throw new VerifyException ("Failed to save artifact to GCS" , e );
322- }
323- } );
336+ try {
337+ byte [] dataToSave =
338+ artifact
339+ .inlineData ()
340+ .get ()
341+ .data ()
342+ .orElseThrow (
343+ () ->
344+ new IllegalArgumentException (
345+ "Saveable artifact data must be non-empty." ));
346+ Blob blob = storageClient .create (blobInfo , dataToSave );
347+ return SaveResult .create (blob , nextVersion );
348+ } catch (StorageException e ) {
349+ throw new VerifyException ("Failed to save artifact to GCS" , e );
350+ }
351+ }) );
324352 }
325353}
0 commit comments