1- using Microsoft . Extensions . Logging ;
1+ using Google . Apis . Auth . OAuth2 ;
2+ using Google . Cloud . PubSub . V1 ;
3+ using Google . Cloud . Storage . V1 ;
4+ using Grpc . Auth ;
5+ using Microsoft . Extensions . Logging ;
26using System ;
37using System . Collections . Generic ;
8+ using System . IO ;
49using System . Linq ;
10+ using System . Text . Json ;
11+ using System . Threading ;
512using System . Threading . Tasks ;
613using TCC . Lib ;
714using TCC . Lib . Benchmark ;
815using TCC . Lib . Database ;
916using TCC . Lib . Helpers ;
1017using TCC . Lib . Notification ;
1118using TCC . Lib . Options ;
19+ using TCC . Parser ;
1220
1321namespace TCC ;
1422
@@ -17,6 +25,7 @@ public interface ITccController
1725 Task CompressAsync ( CompressOption option ) ;
1826 Task DecompressAsync ( DecompressOption option ) ;
1927 Task BenchmarkAsync ( BenchmarkOption option ) ;
28+ Task AutoDecompressAsync ( AutoDecompressOptionBinding option ) ;
2029}
2130
2231public class TccController : ITccController
@@ -61,6 +70,70 @@ public async Task BenchmarkAsync(BenchmarkOption option)
6170 await LogResultAsync ( operationResult , Mode . Benchmark , null ) ;
6271 }
6372
73+ public async Task AutoDecompressAsync ( AutoDecompressOptionBinding option )
74+ {
75+ var gcpCredential = await GoogleAuthHelper . GetGoogleClientAsync ( option . GoogleStorageCredential , new CancellationToken ( ) ) ;
76+ var subscriber = await GetGoogleClientAsync ( gcpCredential , option ) ;
77+ var storage = await StorageClient . CreateAsync ( gcpCredential ) ;
78+
79+ await _databaseSetup . EnsureDatabaseExistsAsync ( Mode . Decompress ) ;
80+
81+ // Use the client as you'd normally do, to listen for messages in this example.
82+ await subscriber . StartAsync ( async ( msg , cancellationToken ) =>
83+ {
84+ if ( ! msg . Attributes . Any ( kvp => kvp . Key == "eventType" && kvp . Value == "OBJECT_FINALIZE" ) )
85+ {
86+ _logger . LogDebug ( "EventType not found : {attributes}" , msg . Attributes ) ;
87+ return SubscriberClient . Reply . Ack ;
88+ }
89+ var msgData = JsonSerializer . Deserialize < ObjectStorageEvent > ( msg . Data . ToStringUtf8 ( ) , new JsonSerializerOptions { PropertyNameCaseInsensitive = true } ) ;
90+ if ( string . IsNullOrEmpty ( msgData . Name ) )
91+ {
92+ _logger . LogDebug ( "Object name not found" ) ;
93+ return SubscriberClient . Reply . Ack ;
94+ }
95+ if ( string . IsNullOrEmpty ( msgData . Bucket ) )
96+ {
97+ _logger . LogDebug ( "Bucket name not found" ) ;
98+ return SubscriberClient . Reply . Ack ;
99+ }
100+
101+ var fileName = Path . GetFileName ( msgData . Name ) ;
102+ string tempPath ;
103+ if ( string . IsNullOrEmpty ( option . TemporaryDirectory ) )
104+ {
105+ tempPath = Path . Join ( option . TemporaryDirectory , fileName ) ;
106+ }
107+ else
108+ {
109+ tempPath = Path . GetRandomFileName ( ) ;
110+ }
111+ using ( var outputFile = File . OpenWrite ( tempPath ) )
112+ {
113+ await storage . DownloadObjectAsync ( msgData . Bucket , msgData . Name , outputFile ) ;
114+ }
115+ _logger . LogDebug ( "{fileName} downloaded in {path}" , fileName , tempPath ) ;
116+
117+ option . SourceDirOrFile = tempPath ;
118+ var operationResult = await _tarCompressCrypt . DecompressAsync ( option ) ;
119+ await _databaseSetup . CleanupDatabaseAsync ( Mode . Decompress ) ;
120+ await LogResultAsync ( operationResult , Mode . Decompress , option ) ;
121+ File . Delete ( tempPath ) ;
122+
123+ return SubscriberClient . Reply . Ack ;
124+ } ) ;
125+ }
126+ private record ObjectStorageEvent ( string Bucket , string Name ) ;
127+ private static async Task < SubscriberClient > GetGoogleClientAsync ( GoogleCredential credential , AutoDecompressOptionBinding option )
128+ {
129+ var subscriptionName = new SubscriptionName ( option . GoogleProjectId , option . GoogleSubscriptionId ) ;
130+ // Create a google cloud pub/sub client that reads messages one by one
131+ return await SubscriberClient . CreateAsync (
132+ subscriptionName ,
133+ new SubscriberClient . ClientCreationSettings ( clientCount : 1 , credentials : credential . ToChannelCredentials ( ) ) ,
134+ new SubscriberClient . Settings { FlowControlSettings = new Google . Api . Gax . FlowControlSettings ( 1 , null ) }
135+ ) ;
136+ }
64137
65138 private async Task LogResultAsync ( OperationSummary operationResult , Mode mode , ISlackOption slackOption )
66139 {
@@ -142,5 +215,4 @@ private void WriteAuditFile(Mode mode, OperationSummary op)
142215 }
143216 }
144217 }
145-
146218}
0 commit comments