11package org .openremote .manager .datapoint ;
22
3+ import org .hibernate .Session ;
34import org .openremote .agent .protocol .ProtocolDatapointService ;
45import org .openremote .container .timer .TimerService ;
56import org .openremote .manager .asset .OutdatedAttributeEvent ;
67import org .openremote .model .datapoint .DatapointExportFormat ;
78import org .openremote .model .datapoint .DatapointQueryTooLargeException ;
8- import org .openremote .model .util .UniqueIdentifierGenerator ;
99import org .openremote .manager .asset .AssetProcessingException ;
1010import org .openremote .manager .asset .AssetStorageService ;
1111import org .openremote .manager .event .ClientEventService ;
2424import org .openremote .model .util .Pair ;
2525import org .openremote .model .value .MetaHolder ;
2626import org .openremote .model .value .MetaItemType ;
27+ import org .postgresql .PGConnection ;
28+ import org .postgresql .copy .CopyManager ;
2729
28- import java .io .File ;
30+ import java .io .* ;
2931import java .nio .file .Files ;
3032import java .nio .file .Path ;
3133import java .sql .Date ;
3234import java .time .*;
3335import java .util .Arrays ;
3436import java .util .List ;
3537import java .util .Map ;
36- import java .util .concurrent .ScheduledFuture ;
3738import java .util .concurrent .TimeUnit ;
3839import java .util .logging .Level ;
3940import java .util .logging .Logger ;
@@ -253,25 +254,25 @@ protected String buildWhereClause(List<Pair<String, Attribute<?>>> attributes, b
253254 * container so it can be accessed by this process.
254255 * Backwards compatible overload with default format.
255256 */
256- public ScheduledFuture < File > exportDatapoints (AttributeRef [] attributeRefs ,
257+ public PipedInputStream exportDatapoints (AttributeRef [] attributeRefs ,
257258 long fromTimestamp ,
258- long toTimestamp ) {
259+ long toTimestamp ) throws IOException {
259260 return exportDatapoints (attributeRefs , fromTimestamp , toTimestamp , DatapointExportFormat .CSV );
260261 }
261262
262263 /**
263264 * Exports datapoints as CSV using SQL; the export path used in the SQL query must also be mapped into the manager
264265 * container so it can be accessed by this process.
265266 */
266- public ScheduledFuture < File > exportDatapoints (AttributeRef [] attributeRefs ,
267+ public PipedInputStream exportDatapoints (AttributeRef [] attributeRefs ,
267268 long fromTimestamp ,
268269 long toTimestamp ,
269- DatapointExportFormat format ) {
270+ DatapointExportFormat format ) throws IOException {
270271 try {
271272 String query = getSelectExportQuery (attributeRefs , fromTimestamp , toTimestamp );
272273
273274 // Verify the query is 'legal' and can be executed
274- if (canQueryDatapoints (query , null , datapointExportLimit )) {
275+ if (canQueryDatapoints (query , null , datapointExportLimit )) {
275276 return doExportDatapoints (attributeRefs , fromTimestamp , toTimestamp , format );
276277 }
277278 throw new RuntimeException ("Could not export datapoints." );
@@ -283,76 +284,89 @@ public ScheduledFuture<File> exportDatapoints(AttributeRef[] attributeRefs,
283284 }
284285 }
285286
286- protected ScheduledFuture < File > doExportDatapoints (AttributeRef [] attributeRefs ,
287+ protected PipedInputStream doExportDatapoints (AttributeRef [] attributeRefs ,
287288 long fromTimestamp ,
288289 long toTimestamp ,
289- DatapointExportFormat format ) {
290-
291- return scheduledExecutorService .schedule (() -> {
292- String fileName = UniqueIdentifierGenerator .generateId () + ".csv" ;
293- if (format == DatapointExportFormat .CSV_CROSSTAB ) {
294- String attributeFilter = getAttributeFilter (attributeRefs );
295- StringBuilder sb = new StringBuilder (String .format (
296- "copy (select * from crosstab( " +
297- "'select ad.timestamp, a.name || '' \\ : '' || ad.attribute_name as header, ad.value " +
298- "from asset_datapoint ad " +
299- "join asset a on ad.entity_id = a.id " +
300- "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " +
301- "order by ad.timestamp, header', " +
302- "'select distinct a.name || '' \\ : '' || ad.attribute_name as header " +
303- "from asset_datapoint ad " +
304- "join asset a on ad.entity_id = a.id " +
305- "where %s " +
306- "order by header') " +
307- "as ct(timestamp timestamp, %s) " +
308- ") to '/storage/" + EXPORT_STORAGE_DIR_NAME + "/" + fileName + "' delimiter ',' CSV HEADER;" ,
309- fromTimestamp / 1000 , toTimestamp / 1000 , attributeFilter , attributeFilter , getAttributeColumns (attributeRefs )
310- ));
311- persistenceService .doTransaction (em -> em .createNativeQuery (sb .toString ()).executeUpdate ());
312- } else if (format == DatapointExportFormat .CSV_CROSSTAB_MINUTE ) {
313- String attributeFilter = getAttributeFilter (attributeRefs );
314- StringBuilder sb = new StringBuilder (String .format (
315- "copy (select * from crosstab( " +
316- "'select public.time_bucket(''%s'', ad.timestamp) as bucket_timestamp, " +
317- "a.name || '' \\ : '' || ad.attribute_name as header, " +
318- "CASE " +
319- " WHEN jsonb_typeof((array_agg(ad.value))[1]) = ''number'' THEN " +
320- " round(avg((ad.value#>>''{}'')::numeric) FILTER (WHERE jsonb_typeof(ad.value) = ''number''), 3)::text " +
321- " ELSE (array_agg(ad.value ORDER BY ad.timestamp DESC) FILTER (WHERE jsonb_typeof(ad.value) != ''number''))[1]#>>''{}''" +
322- "END as value " +
323- "from asset_datapoint ad " +
324- "join asset a on ad.entity_id = a.id " +
325- "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " +
326- "group by bucket_timestamp, header " +
327- "order by bucket_timestamp, header', " +
328- "'select distinct a.name || '' \\ : '' || ad.attribute_name as header " +
329- "from asset_datapoint ad " +
330- "join asset a on ad.entity_id = a.id " +
331- "where %s " +
332- "order by header') " +
333- "as ct(timestamp timestamp, %s) " +
334- ") to '/storage/" + EXPORT_STORAGE_DIR_NAME + "/" + fileName + "' delimiter ',' CSV HEADER;" ,
335- "1 minute" , fromTimestamp / 1000 , toTimestamp / 1000 , attributeFilter , attributeFilter , getAttributeColumns (attributeRefs )
336- ));
337-
338- persistenceService .doTransaction (em -> em .createNativeQuery (sb .toString ()).executeUpdate ());
339- }
340- else {
341- StringBuilder sb = new StringBuilder ("copy (" )
342- .append (getSelectExportQuery (attributeRefs , fromTimestamp , toTimestamp ))
343- .append (") to '/storage/" )
344- .append (EXPORT_STORAGE_DIR_NAME )
345- .append ("/" )
346- .append (fileName )
347- .append ("' delimiter ',' CSV HEADER;" );
348- persistenceService .doTransaction (em -> em .createNativeQuery (sb .toString ()).executeUpdate ());
349- }
350-
290+ DatapointExportFormat format ) throws IOException {
291+ // Increase buffer size (default is only 1 KB)
292+ PipedInputStream in = new PipedInputStream (1024 * 1024 * 4 ); // 4 MB
293+ PipedOutputStream out = new PipedOutputStream (in );
294+
295+ StringBuilder sb ;
296+
297+ final String TO_STDOUT_WITH_CSV_FORMAT = ") TO STDOUT WITH (FORMAT CSV, HEADER, DELIMITER ',');" ;
298+
299+ if (format == DatapointExportFormat .CSV_CROSSTAB ) {
300+ String attributeFilter = getAttributeFilter (attributeRefs );
301+ sb = new StringBuilder (String .format (
302+ "copy (select * from crosstab( " +
303+ "'select ad.timestamp, a.name || '' \\ : '' || ad.attribute_name as header, ad.value " +
304+ "from asset_datapoint ad " +
305+ "join asset a on ad.entity_id = a.id " +
306+ "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " +
307+ "order by ad.timestamp, header', " +
308+ "'select distinct a.name || '' \\ : '' || ad.attribute_name as header " +
309+ "from asset_datapoint ad " +
310+ "join asset a on ad.entity_id = a.id " +
311+ "where %s " +
312+ "order by header') " +
313+ "as ct(timestamp timestamp, %s) " +
314+ TO_STDOUT_WITH_CSV_FORMAT ,
315+ fromTimestamp / 1000 , toTimestamp / 1000 , attributeFilter , attributeFilter , getAttributeColumns (attributeRefs )
316+ ));
317+ } else if (format == DatapointExportFormat .CSV_CROSSTAB_MINUTE ) {
318+ String attributeFilter = getAttributeFilter (attributeRefs );
319+ sb = new StringBuilder (String .format (
320+ "copy (select * from crosstab( " +
321+ "'select public.time_bucket(''%s'', ad.timestamp) as bucket_timestamp, " +
322+ "a.name || '' \\ : '' || ad.attribute_name as header, " +
323+ "CASE " +
324+ " WHEN jsonb_typeof((array_agg(ad.value))[1]) = ''number'' THEN " +
325+ " round(avg((ad.value#>>''{}'')::numeric) FILTER (WHERE jsonb_typeof(ad.value) = ''number''), 3)::text " +
326+ " ELSE (array_agg(ad.value ORDER BY ad.timestamp DESC) FILTER (WHERE jsonb_typeof(ad.value) != ''number''))[1]#>>''{}''" +
327+ "END as value " +
328+ "from asset_datapoint ad " +
329+ "join asset a on ad.entity_id = a.id " +
330+ "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " +
331+ "group by bucket_timestamp, header " +
332+ "order by bucket_timestamp, header', " +
333+ "'select distinct a.name || '' \\ : '' || ad.attribute_name as header " +
334+ "from asset_datapoint ad " +
335+ "join asset a on ad.entity_id = a.id " +
336+ "where %s " +
337+ "order by header') " +
338+ "as ct(timestamp timestamp, %s) " +
339+ TO_STDOUT_WITH_CSV_FORMAT ,
340+ "1 minute" , fromTimestamp / 1000 , toTimestamp / 1000 , attributeFilter , attributeFilter , getAttributeColumns (attributeRefs )
341+ ));
342+ } else {
343+ sb = new StringBuilder ("copy (" )
344+ .append (getSelectExportQuery (attributeRefs , fromTimestamp , toTimestamp ))
345+ .append (TO_STDOUT_WITH_CSV_FORMAT );
346+ }
351347
348+ scheduledExecutorService .schedule (() -> persistenceService .doTransaction (em -> {
349+ Session session = em .unwrap (Session .class );
350+ session .doWork (connection -> {
351+ PGConnection pgConnection = connection .unwrap (PGConnection .class );
352+ CopyManager copyManager = pgConnection .getCopyAPI ();
353+ try {
354+ copyManager .copyOut (sb .toString (), out );
355+ out .flush ();
356+ out .close ();
357+ } catch (IOException e ) {
358+ // Either database connection or output stream failure
359+ getLogger ().log (Level .WARNING , "Datapoint export failed" , e );
360+ try {
361+ in .close ();
362+ } catch (IOException ignored ) {
363+ getLogger ().log (Level .SEVERE , "Failed to close piped input stream" , e );
364+ }
365+ }
366+ });
367+ }), 0 , TimeUnit .MILLISECONDS );
352368
353- // The same path must resolve in both the postgresql container and the manager container
354- return exportPath .resolve (fileName ).toFile ();
355- }, 0 , TimeUnit .MILLISECONDS );
369+ return in ;
356370 }
357371
358372 /**
0 commit comments