diff --git a/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointResourceImpl.java b/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointResourceImpl.java index a8f619851c..5ae592a65a 100644 --- a/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointResourceImpl.java +++ b/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointResourceImpl.java @@ -46,11 +46,10 @@ import org.openremote.model.http.RequestParams; import org.openremote.model.security.ClientRole; import org.openremote.model.syslog.SyslogCategory; +import org.openremote.model.util.UniqueIdentifierGenerator; import org.openremote.model.value.MetaItemType; -import java.io.File; -import java.io.FileInputStream; -import java.util.concurrent.ScheduledFuture; +import java.io.*; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.ZipEntry; @@ -206,47 +205,43 @@ public void getDatapointExport(AsyncResponse asyncResponse, String attributeRefs DATA_EXPORT_LOG.info("User '" + getUsername() + "' started data export for " + attributeRefsString + " from " + fromTimestamp + " to " + toTimestamp + " in format " + format); - ScheduledFuture exportFuture = assetDatapointService.exportDatapoints(attributeRefs, fromTimestamp, toTimestamp, format); + PipedInputStream pipedInputStream = assetDatapointService.exportDatapoints(attributeRefs, fromTimestamp, toTimestamp, format); - asyncResponse.register((ConnectionCallback) disconnected -> exportFuture.cancel(true)); - - File exportFile = null; - - try { - exportFile = exportFuture.get(); - - try (FileInputStream fin = new FileInputStream(exportFile); - ZipOutputStream zipOut = new ZipOutputStream(response.getOutputStream())) { - - ZipEntry zipEntry = new ZipEntry(exportFile.getName()); - zipOut.putNextEntry(zipEntry); - IOUtils.copy(fin, zipOut); - zipOut.closeEntry(); + asyncResponse.register((ConnectionCallback) disconnected -> { + try { + pipedInputStream.close(); + } catch (IOException e) { + DATA_EXPORT_LOG.log(Level.SEVERE, "Could not close input stream: ", e); + throw new RuntimeException(e); } + }); - response.setContentType("application/zip"); - response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"dataexport.zip\""); + try (InputStream fin = pipedInputStream; + ZipOutputStream zipOut = new ZipOutputStream(response.getOutputStream())) { - asyncResponse.resume( - response - ); - } catch (Exception ex) { - exportFuture.cancel(true); + String fileName = UniqueIdentifierGenerator.generateId() + ".csv"; + ZipEntry zipEntry = new ZipEntry(fileName); + zipOut.putNextEntry(zipEntry); + IOUtils.copy(fin, zipOut); + zipOut.closeEntry(); + } catch (IOException ex) { asyncResponse.resume(new WebApplicationException(Response.Status.INTERNAL_SERVER_ERROR)); - DATA_EXPORT_LOG.log(Level.SEVERE, "Exception in ScheduledFuture: ", ex); - } finally { - if (exportFile != null && exportFile.exists()) { - try { - exportFile.delete(); - } catch (Exception e) { - DATA_EXPORT_LOG.log(Level.SEVERE, "Failed to delete temporary export file: " + exportFile.getPath(), e); - } - } + DATA_EXPORT_LOG.log(Level.SEVERE, "Zip exception: ", ex); } + + response.setContentType("application/zip"); + response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"dataexport.zip\""); + + asyncResponse.resume( + response + ); } catch (JsonProcessingException ex) { asyncResponse.resume(new BadRequestException(ex)); } catch (DatapointQueryTooLargeException dqex) { asyncResponse.resume(new WebApplicationException(dqex, Response.Status.REQUEST_ENTITY_TOO_LARGE)); + } catch (IOException ex) { + asyncResponse.resume(new WebApplicationException(Response.Status.INTERNAL_SERVER_ERROR)); + DATA_EXPORT_LOG.log(Level.SEVERE, "Failed to create piped output stream: ", ex); } } } diff --git a/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointService.java b/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointService.java index c462eed875..4003bf7ad2 100644 --- a/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointService.java +++ b/manager/src/main/java/org/openremote/manager/datapoint/AssetDatapointService.java @@ -1,11 +1,10 @@ package org.openremote.manager.datapoint; +import org.hibernate.Session; import org.openremote.agent.protocol.ProtocolDatapointService; import org.openremote.container.timer.TimerService; import org.openremote.manager.asset.OutdatedAttributeEvent; import org.openremote.model.datapoint.DatapointExportFormat; -import org.openremote.model.datapoint.DatapointQueryTooLargeException; -import org.openremote.model.util.UniqueIdentifierGenerator; import org.openremote.manager.asset.AssetProcessingException; import org.openremote.manager.asset.AssetStorageService; import org.openremote.manager.event.ClientEventService; @@ -21,19 +20,19 @@ import org.openremote.model.query.AssetQuery; import org.openremote.model.query.filter.AttributePredicate; import org.openremote.model.query.filter.NameValuePredicate; +import org.openremote.model.syslog.SyslogCategory; import org.openremote.model.util.Pair; import org.openremote.model.value.MetaHolder; import org.openremote.model.value.MetaItemType; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; +import java.io.*; import java.sql.Date; import java.time.*; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,6 +43,7 @@ import static java.time.temporal.ChronoUnit.DAYS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; +import static org.openremote.model.syslog.SyslogCategory.DATA; import static org.openremote.model.util.MapAccess.getInteger; import static org.openremote.model.value.MetaItemType.STORE_DATA_POINTS; @@ -57,13 +57,10 @@ public class AssetDatapointService extends AbstractDatapointService - file.isFile() - && file.getName().endsWith("csv") - && file.lastModified() < timerService.getCurrentTimeMillis() - oneDayMillis - ); - - if (obsoleteExports != null) { - Arrays.stream(obsoleteExports) - .forEach(file -> { - boolean success = false; - try { - success = file.delete(); - } catch (SecurityException e) { - LOG.log(Level.WARNING, "Cannot access the export file to delete it", e); - } - if (!success) { - LOG.log(Level.WARNING, "Failed to delete obsolete export '" + file.getName() + "'"); - } - }); - } - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to purge old exports", e); - } } protected String buildWhereClause(List>> attributes, boolean negate) { @@ -253,9 +206,9 @@ protected String buildWhereClause(List>> attributes, b * container so it can be accessed by this process. * Backwards compatible overload with default format. */ - public ScheduledFuture exportDatapoints(AttributeRef[] attributeRefs, + public PipedInputStream exportDatapoints(AttributeRef[] attributeRefs, long fromTimestamp, - long toTimestamp) { + long toTimestamp) throws IOException { return exportDatapoints(attributeRefs, fromTimestamp, toTimestamp, DatapointExportFormat.CSV); } @@ -263,96 +216,103 @@ public ScheduledFuture exportDatapoints(AttributeRef[] attributeRefs, * Exports datapoints as CSV using SQL; the export path used in the SQL query must also be mapped into the manager * container so it can be accessed by this process. */ - public ScheduledFuture exportDatapoints(AttributeRef[] attributeRefs, + public PipedInputStream exportDatapoints(AttributeRef[] attributeRefs, long fromTimestamp, long toTimestamp, - DatapointExportFormat format) { - try { - String query = getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp); - - // Verify the query is 'legal' and can be executed - if(canQueryDatapoints(query, null, datapointExportLimit)) { - return doExportDatapoints(attributeRefs, fromTimestamp, toTimestamp, format); - } - throw new RuntimeException("Could not export datapoints."); + DatapointExportFormat format) throws IOException { + String query = getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp); - } catch (DatapointQueryTooLargeException dqex) { - String msg = "Could not export data points. It exceeds the data limit of " + datapointExportLimit + " data points."; - getLogger().log(Level.WARNING, msg, dqex); - throw dqex; + // Verify the query is 'legal' and can be executed + if (canQueryDatapoints(query, null, 0)) { + return doExportDatapoints(attributeRefs, fromTimestamp, toTimestamp, format); } + throw new RuntimeException("Could not export datapoints."); } - protected ScheduledFuture doExportDatapoints(AttributeRef[] attributeRefs, + protected PipedInputStream doExportDatapoints(AttributeRef[] attributeRefs, long fromTimestamp, long toTimestamp, - DatapointExportFormat format) { - - return scheduledExecutorService.schedule(() -> { - String fileName = UniqueIdentifierGenerator.generateId() + ".csv"; - if (format == DatapointExportFormat.CSV_CROSSTAB) { - String attributeFilter = getAttributeFilter(attributeRefs); - StringBuilder sb = new StringBuilder(String.format( - "copy (select * from crosstab( " + - "'select ad.timestamp, a.name || '' \\: '' || ad.attribute_name as header, ad.value " + - "from asset_datapoint ad " + - "join asset a on ad.entity_id = a.id " + - "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " + - "order by ad.timestamp, header', " + - "'select distinct a.name || '' \\: '' || ad.attribute_name as header " + - "from asset_datapoint ad " + - "join asset a on ad.entity_id = a.id " + - "where %s " + - "order by header') " + - "as ct(timestamp timestamp, %s) " + - ") to '/storage/" + EXPORT_STORAGE_DIR_NAME + "/" + fileName + "' delimiter ',' CSV HEADER;", - fromTimestamp / 1000, toTimestamp / 1000, attributeFilter, attributeFilter, getAttributeColumns(attributeRefs) - )); - persistenceService.doTransaction(em -> em.createNativeQuery(sb.toString()).executeUpdate()); - } else if (format == DatapointExportFormat.CSV_CROSSTAB_MINUTE) { - String attributeFilter = getAttributeFilter(attributeRefs); - StringBuilder sb = new StringBuilder(String.format( - "copy (select * from crosstab( " + - "'select public.time_bucket(''%s'', ad.timestamp) as bucket_timestamp, " + - "a.name || '' \\: '' || ad.attribute_name as header, " + - "CASE " + - " WHEN jsonb_typeof((array_agg(ad.value))[1]) = ''number'' THEN " + - " round(avg((ad.value#>>''{}'')::numeric) FILTER (WHERE jsonb_typeof(ad.value) = ''number''), 3)::text " + - " ELSE (array_agg(ad.value ORDER BY ad.timestamp DESC) FILTER (WHERE jsonb_typeof(ad.value) != ''number''))[1]#>>''{}''" + - "END as value " + - "from asset_datapoint ad " + - "join asset a on ad.entity_id = a.id " + - "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " + - "group by bucket_timestamp, header " + - "order by bucket_timestamp, header', " + - "'select distinct a.name || '' \\: '' || ad.attribute_name as header " + - "from asset_datapoint ad " + - "join asset a on ad.entity_id = a.id " + - "where %s " + - "order by header') " + - "as ct(timestamp timestamp, %s) " + - ") to '/storage/" + EXPORT_STORAGE_DIR_NAME + "/" + fileName + "' delimiter ',' CSV HEADER;", - "1 minute", fromTimestamp / 1000, toTimestamp / 1000, attributeFilter, attributeFilter, getAttributeColumns(attributeRefs) - )); - - persistenceService.doTransaction(em -> em.createNativeQuery(sb.toString()).executeUpdate()); - } - else { - StringBuilder sb = new StringBuilder("copy (") - .append(getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp)) - .append(") to '/storage/") - .append(EXPORT_STORAGE_DIR_NAME) - .append("/") - .append(fileName) - .append("' delimiter ',' CSV HEADER;"); - persistenceService.doTransaction(em -> em.createNativeQuery(sb.toString()).executeUpdate()); - } - + DatapointExportFormat format) throws IOException { + // Increase buffer size (default is only 1 KB) + PipedInputStream in = new PipedInputStream(1024 * 1024 * 4); // 4 MB + PipedOutputStream out = new PipedOutputStream(in); + + StringBuilder sb; + + final String TO_STDOUT_WITH_CSV_FORMAT = ") TO STDOUT WITH (FORMAT CSV, HEADER, DELIMITER ',');"; + + if (format == DatapointExportFormat.CSV_CROSSTAB) { + String attributeFilter = getAttributeFilter(attributeRefs); + sb = new StringBuilder(String.format( + "copy (select * from crosstab( " + + "'select ad.timestamp, a.name || '' \\: '' || ad.attribute_name as header, ad.value " + + "from asset_datapoint ad " + + "join asset a on ad.entity_id = a.id " + + "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " + + "order by ad.timestamp, header', " + + "'select distinct a.name || '' \\: '' || ad.attribute_name as header " + + "from asset_datapoint ad " + + "join asset a on ad.entity_id = a.id " + + "where %s " + + "order by header') " + + "as ct(timestamp timestamp, %s) " + + TO_STDOUT_WITH_CSV_FORMAT, + fromTimestamp / 1000, toTimestamp / 1000, attributeFilter, attributeFilter, getAttributeColumns(attributeRefs) + )); + } else if (format == DatapointExportFormat.CSV_CROSSTAB_MINUTE) { + String attributeFilter = getAttributeFilter(attributeRefs); + sb = new StringBuilder(String.format( + "copy (select * from crosstab( " + + "'select public.time_bucket(''%s'', ad.timestamp) as bucket_timestamp, " + + "a.name || '' \\: '' || ad.attribute_name as header, " + + "CASE " + + " WHEN jsonb_typeof((array_agg(ad.value))[1]) = ''number'' THEN " + + " round(avg((ad.value#>>''{}'')::numeric) FILTER (WHERE jsonb_typeof(ad.value) = ''number''), 3)::text " + + " ELSE (array_agg(ad.value ORDER BY ad.timestamp DESC) FILTER (WHERE jsonb_typeof(ad.value) != ''number''))[1]#>>''{}''" + + "END as value " + + "from asset_datapoint ad " + + "join asset a on ad.entity_id = a.id " + + "where ad.timestamp >= to_timestamp(%d) and ad.timestamp <= to_timestamp(%d) and (%s) " + + "group by bucket_timestamp, header " + + "order by bucket_timestamp, header', " + + "'select distinct a.name || '' \\: '' || ad.attribute_name as header " + + "from asset_datapoint ad " + + "join asset a on ad.entity_id = a.id " + + "where %s " + + "order by header') " + + "as ct(timestamp timestamp, %s) " + + TO_STDOUT_WITH_CSV_FORMAT, + "1 minute", fromTimestamp / 1000, toTimestamp / 1000, attributeFilter, attributeFilter, getAttributeColumns(attributeRefs) + )); + } else { + sb = new StringBuilder("copy (") + .append(getSelectExportQuery(attributeRefs, fromTimestamp, toTimestamp)) + .append(TO_STDOUT_WITH_CSV_FORMAT); + } + scheduledExecutorService.schedule(() -> persistenceService.doTransaction(em -> { + Session session = em.unwrap(Session.class); + session.doWork(connection -> { + PGConnection pgConnection = connection.unwrap(PGConnection.class); + CopyManager copyManager = pgConnection.getCopyAPI(); + try { + copyManager.copyOut(sb.toString(), out); + out.flush(); + out.close(); + } catch (IOException e) { + // Either a database connection- or output stream failure + DATA_EXPORT_LOG.log(Level.SEVERE, "Datapoint export failed", e); + try { + out.close(); + in.close(); + } catch (IOException ignored) { + DATA_EXPORT_LOG.log(Level.SEVERE, "Failed to close piped input stream", e); + } + } + }); + }), 0, TimeUnit.MILLISECONDS); - // The same path must resolve in both the postgresql container and the manager container - return exportPath.resolve(fileName).toFile(); - }, 0, TimeUnit.MILLISECONDS); + return in; } /** diff --git a/profile/deploy.yml b/profile/deploy.yml index 6385fe8e1f..7f74a5714f 100644 --- a/profile/deploy.yml +++ b/profile/deploy.yml @@ -345,9 +345,6 @@ services: # Defaults to 100.000. When set to 0, it disables the limit. # OR_DATA_POINTS_QUERY_LIMIT: 100000 - # Configure the limit of data points that can be exported to CSV. Defaults to 1 million data points. - # OR_DATA_POINTS_EXPORT_LIMIT: 10000000 - # App id for the API of OpenWeather: https://openweathermap.org # OR_OPEN_WEATHER_API_APP_ID diff --git a/test/src/test/groovy/org/openremote/test/assets/AssetDatapointExportTest.groovy b/test/src/test/groovy/org/openremote/test/assets/AssetDatapointExportTest.groovy index d7af45a893..224900a925 100644 --- a/test/src/test/groovy/org/openremote/test/assets/AssetDatapointExportTest.groovy +++ b/test/src/test/groovy/org/openremote/test/assets/AssetDatapointExportTest.groovy @@ -30,7 +30,6 @@ class AssetDatapointExportTest extends Specification implements ManagerContainer def keycloakTestSetup = container.getService(SetupService.class).getTaskOfType(KeycloakTestSetup.class) def assetStorageService = container.getService(AssetStorageService.class) def assetDatapointService = container.getService(AssetDatapointService.class) - assetDatapointService.datapointExportLimit = 1000 when: "requesting the first light asset in City realm" def asset = assetStorageService.find( @@ -68,17 +67,14 @@ class AssetDatapointExportTest extends Specification implements ManagerContainer ) then: "the default CSV export should return a file" - def csvExport1Future = assetDatapointService.exportDatapoints( + def inputStream1 = assetDatapointService.exportDatapoints( [new AttributeRef(asset.id, attributeName)] as AttributeRef[], dateTime.minusMinutes(30).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() ) - assert csvExport1Future != null - def csvExport1 = csvExport1Future.get() - assert csvExport1 != null and: "the CSV export should contain all 5 data points" - def csvExport1Lines = csvExport1.readLines() + def csvExport1Lines = inputStream1.readLines() assert csvExport1Lines.size() == 6 assert csvExport1Lines[1].endsWith("10.0") assert csvExport1Lines[2].endsWith("20.0") @@ -89,18 +85,15 @@ class AssetDatapointExportTest extends Specification implements ManagerContainer /* ------------------------- */ when: "exporting with format CSV_CROSSTAB" - def csvExport2Future = assetDatapointService.exportDatapoints( + def inputStream2 = assetDatapointService.exportDatapoints( [new AttributeRef(asset.id, attributeName)] as AttributeRef[], dateTime.minusMinutes(30).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), DatapointExportFormat.CSV_CROSSTAB ) - assert csvExport2Future != null - def csvExport2 = csvExport2Future.get() - assert csvExport2 != null then: "the crosstab CSV export should contain all 5 data points in columnar format" - def csvExport2Lines = csvExport2.readLines() + def csvExport2Lines = inputStream2.readLines() assert csvExport2Lines.size() == 6 // header + 5 data rows assert csvExport2Lines[0].contains("timestamp") assert csvExport2Lines[0].contains(assetName + ": " + attributeName) @@ -108,18 +101,15 @@ class AssetDatapointExportTest extends Specification implements ManagerContainer /* ------------------------- */ when: "exporting with format CSV_CROSSTAB_MINUTE" - def csvExport3Future = assetDatapointService.exportDatapoints( + def inputStream3 = assetDatapointService.exportDatapoints( [new AttributeRef(asset.id, attributeName)] as AttributeRef[], dateTime.minusMinutes(30).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), DatapointExportFormat.CSV_CROSSTAB_MINUTE ) - assert csvExport3Future != null - def csvExport3 = csvExport3Future.get() - assert csvExport3 != null then: "the time-bucketed CSV export should contain aggregated data with correct values" - def csvExport3Lines = csvExport3.readLines() + def csvExport3Lines = inputStream3.readLines() assert csvExport3Lines.size() >= 2 // header + at least 1 aggregated row assert csvExport3Lines[0].contains("timestamp") assert csvExport3Lines[0].contains(assetName + ": " + attributeName) @@ -142,25 +132,5 @@ class AssetDatapointExportTest extends Specification implements ManagerContainer assert values.contains("30.000") || values.contains("30") assert values.contains("40.000") || values.contains("40") assert values.contains("50.000") || values.contains("50") - - /* ------------------------- */ - - when: "the limit of data export is lowered to 4" - assetDatapointService.datapointExportLimit = 4 - - and: "we try to export the same amount of data points" - def csvExport4Future = assetDatapointService.exportDatapoints( - [new AttributeRef(asset.id, attributeName)] as AttributeRef[], - dateTime.minusMinutes(30).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), - dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - ) - - then: "the CSV export should throw an exception" - thrown(DatapointQueryTooLargeException) - - /* ------------------------- */ - - cleanup: "Remove the limit on datapoint querying" - assetDatapointService.datapointExportLimit = assetDatapointService.OR_DATA_POINTS_EXPORT_LIMIT_DEFAULT; } }