From b61afc658d5e5dbde25189d0305f06b4dde7982c Mon Sep 17 00:00:00 2001 From: brunomaury Date: Thu, 22 Jan 2026 17:18:37 +0100 Subject: [PATCH] feat(query): add streaming query with a timeunit --- src/main/java/org/influxdb/InfluxDB.java | 25 +- .../java/org/influxdb/impl/InfluxDBImpl.java | 286 ++-- .../org/influxdb/impl/InfluxDBService.java | 48 +- src/test/java/org/influxdb/InfluxDBTest.java | 1335 +++++++++-------- 4 files changed, 939 insertions(+), 755 deletions(-) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 56f842f35..9ae2aa413 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -388,7 +388,7 @@ public void write(final String database, final String retentionPolicy, * the points in the correct lineprotocol. */ public void write(final String database, final String retentionPolicy, - final ConsistencyLevel consistency, final TimeUnit precision, final String records); + final ConsistencyLevel consistency, final TimeUnit precision, final String records); /** * Write a set of Points to the influxdb database with the list of string records. @@ -424,13 +424,13 @@ public void write(final String database, final String retentionPolicy, * the List of points in the correct lineprotocol. */ public void write(final String database, final String retentionPolicy, - final ConsistencyLevel consistency, final TimeUnit precision, final List records); + final ConsistencyLevel consistency, final TimeUnit precision, final List records); /** * Write a set of Points to the influxdb database with the string records through UDP. * * @param udpPort - * the udpPort where influxdb is listening + * the udpPort where influxdb is listening * @param records * the content will be encoded by UTF-8 before sent. */ @@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy, public void query(Query query, int chunkSize, BiConsumer onNext, Runnable onComplete, Consumer onFailure); + /** + * Execute a streaming query against a database. + * + * @param query + * the query to execute. + * @param timeUnit + * the time unit of the results. + * @param chunkSize + * the number of QueryResults to process in one chunk. + * @param onNext + * the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query + * @param onComplete + * the onComplete to invoke for successfully end of stream + * @param onFailure + * the consumer for error handling + */ + public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer onNext, Runnable onComplete, + Consumer onFailure); + /** * Execute a query against a database. * diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 23427a23d..42a49f9d8 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -147,48 +147,48 @@ public InfluxDBImpl(final String url, final String username, final String passwo this.gzipRequestInterceptor = new GzipRequestInterceptor(); OkHttpClient.Builder clonedOkHttpBuilder = okHttpBuilder.build().newBuilder() - .addInterceptor(loggingInterceptor) - .addInterceptor(gzipRequestInterceptor); + .addInterceptor(loggingInterceptor) + .addInterceptor(gzipRequestInterceptor); if (username != null && password != null) { clonedOkHttpBuilder.addInterceptor(new BasicAuthInterceptor(username, password)); } Factory converterFactory = null; switch (responseFormat) { - case MSGPACK: - clonedOkHttpBuilder.addInterceptor(chain -> { - Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build(); - return chain.proceed(request); - }); - - converterFactory = MessagePackConverterFactory.create(); - chunkProccesor = new MessagePackChunkProccesor(); - break; - case JSON: - default: - converterFactory = MoshiConverterFactory.create(); - - Moshi moshi = new Moshi.Builder().build(); - JsonAdapter adapter = moshi.adapter(QueryResult.class); - chunkProccesor = new JSONChunkProccesor(adapter); - break; + case MSGPACK: + clonedOkHttpBuilder.addInterceptor(chain -> { + Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build(); + return chain.proceed(request); + }); + + converterFactory = MessagePackConverterFactory.create(); + chunkProccesor = new MessagePackChunkProccesor(); + break; + case JSON: + default: + converterFactory = MoshiConverterFactory.create(); + + Moshi moshi = new Moshi.Builder().build(); + JsonAdapter adapter = moshi.adapter(QueryResult.class); + chunkProccesor = new JSONChunkProccesor(adapter); + break; } this.client = clonedOkHttpBuilder.build(); Retrofit.Builder clonedRetrofitBuilder = retrofitBuilder.baseUrl(url).build().newBuilder(); this.retrofit = clonedRetrofitBuilder.client(this.client) - .addConverterFactory(converterFactory).build(); + .addConverterFactory(converterFactory).build(); this.influxDBService = this.retrofit.create(InfluxDBService.class); } public InfluxDBImpl(final String url, final String username, final String password, - final OkHttpClient.Builder client) { + final OkHttpClient.Builder client) { this(url, username, password, client, ResponseFormat.JSON); } InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client, - final InfluxDBService influxDBService, final JsonAdapter adapter) { + final InfluxDBService influxDBService, final JsonAdapter adapter) { super(); this.messagePack = false; this.hostName = parseHost(url); @@ -198,9 +198,9 @@ public InfluxDBImpl(final String url, final String username, final String passwo this.gzipRequestInterceptor = new GzipRequestInterceptor(); OkHttpClient.Builder clonedBuilder = client.build().newBuilder() - .addInterceptor(loggingInterceptor) - .addInterceptor(gzipRequestInterceptor) - .addInterceptor(new BasicAuthInterceptor(username, password)); + .addInterceptor(loggingInterceptor) + .addInterceptor(gzipRequestInterceptor) + .addInterceptor(new BasicAuthInterceptor(username, password)); this.client = clonedBuilder.build(); this.retrofit = new Retrofit.Builder().baseUrl(url) .client(this.client) @@ -211,7 +211,7 @@ public InfluxDBImpl(final String url, final String username, final String passwo } public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client, - final String database, final String retentionPolicy, final ConsistencyLevel consistency) { + final String database, final String retentionPolicy, final ConsistencyLevel consistency) { this(url, username, password, client); setConsistency(consistency); @@ -243,20 +243,20 @@ private String parseHost(final String url) { @Override public InfluxDB setLogLevel(final LogLevel logLevel) { switch (logLevel) { - case NONE: - this.loggingInterceptor.setLevel(Level.NONE); - break; - case BASIC: - this.loggingInterceptor.setLevel(Level.BASIC); - break; - case HEADERS: - this.loggingInterceptor.setLevel(Level.HEADERS); - break; - case FULL: - this.loggingInterceptor.setLevel(Level.BODY); - break; - default: - break; + case NONE: + this.loggingInterceptor.setLevel(Level.NONE); + break; + case BASIC: + this.loggingInterceptor.setLevel(Level.BASIC); + break; + case HEADERS: + this.loggingInterceptor.setLevel(Level.HEADERS); + break; + case FULL: + this.loggingInterceptor.setLevel(Level.BODY); + break; + default: + break; } this.logLevel = logLevel; return this; @@ -301,17 +301,17 @@ public InfluxDB enableBatch(final BatchOptions batchOptions) { throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor - .builder(this) - .actions(batchOptions.getActions()) - .exceptionHandler(batchOptions.getExceptionHandler()) - .interval(batchOptions.getFlushDuration(), batchOptions.getJitterDuration(), TimeUnit.MILLISECONDS) - .threadFactory(batchOptions.getThreadFactory()) - .bufferLimit(batchOptions.getBufferLimit()) - .consistencyLevel(batchOptions.getConsistency()) - .precision(batchOptions.getPrecision()) - .dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion()) - .droppedActionHandler(batchOptions.getDroppedActionHandler()) - .build(); + .builder(this) + .actions(batchOptions.getActions()) + .exceptionHandler(batchOptions.getExceptionHandler()) + .interval(batchOptions.getFlushDuration(), batchOptions.getJitterDuration(), TimeUnit.MILLISECONDS) + .threadFactory(batchOptions.getThreadFactory()) + .bufferLimit(batchOptions.getBufferLimit()) + .consistencyLevel(batchOptions.getConsistency()) + .precision(batchOptions.getPrecision()) + .dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion()) + .droppedActionHandler(batchOptions.getDroppedActionHandler()) + .build(); this.batchEnabled.set(true); return this; } @@ -356,15 +356,15 @@ private InfluxDB enableBatch(final int actions, final int flushDuration, final i throw new IllegalStateException("BatchProcessing is already enabled."); } this.batchProcessor = BatchProcessor - .builder(this) - .actions(actions) - .exceptionHandler(exceptionHandler) - .interval(flushDuration, jitterDuration, durationTimeUnit) - .threadFactory(threadFactory) - .consistencyLevel(consistency) - .dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion) - .droppedActionHandler(droppedActionHandler) - .build(); + .builder(this) + .actions(actions) + .exceptionHandler(exceptionHandler) + .interval(flushDuration, jitterDuration, durationTimeUnit) + .threadFactory(threadFactory) + .consistencyLevel(consistency) + .dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion) + .droppedActionHandler(droppedActionHandler) + .build(); this.batchEnabled.set(true); return this; } @@ -435,7 +435,7 @@ public void write(final String database, final String retentionPolicy, final Poi this.batchProcessor.put(batchEntry); } else { BatchPoints batchPoints = BatchPoints.database(database) - .retentionPolicy(retentionPolicy).build(); + .retentionPolicy(retentionPolicy).build(); batchPoints.point(point); this.write(batchPoints); this.unBatchedCount.increment(); @@ -464,7 +464,7 @@ public void write(final BatchPoints batchPoints) { RequestBody lineProtocol = RequestBody.create(MEDIA_TYPE_STRING, batchPoints.lineProtocol()); String db = batchPoints.getDatabase(); if (db == null) { - db = this.database; + db = this.database; } execute(this.influxDBService.writePoints( db, @@ -485,7 +485,7 @@ public void writeWithRetry(final BatchPoints batchPoints) { @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, - final TimeUnit precision, final String records) { + final TimeUnit precision, final String records) { execute(this.influxDBService.writePoints( database, retentionPolicy, @@ -496,20 +496,20 @@ public void write(final String database, final String retentionPolicy, final Con @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, - final String records) { + final String records) { write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records); } @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, - final List records) { + final List records) { write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records); } @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, - final TimeUnit precision, final List records) { + final TimeUnit precision, final List records) { write(database, retentionPolicy, consistency, precision, String.join("\n", records)); } @@ -522,25 +522,25 @@ public void write(final int udpPort, final String records) { initialDatagramSocket(); byte[] bytes = records.getBytes(StandardCharsets.UTF_8); try { - datagramSocket.send(new DatagramPacket(bytes, bytes.length, new InetSocketAddress(hostName, udpPort))); + datagramSocket.send(new DatagramPacket(bytes, bytes.length, new InetSocketAddress(hostName, udpPort))); } catch (IOException e) { - throw new InfluxDBIOException(e); + throw new InfluxDBIOException(e); } } private void initialDatagramSocket() { if (datagramSocket == null) { - synchronized (InfluxDBImpl.class) { - if (datagramSocket == null) { - try { - datagramSocket = new DatagramSocket(); - } catch (SocketException e) { - throw new InfluxDBIOException(e); - } - } + synchronized (InfluxDBImpl.class) { + if (datagramSocket == null) { + try { + datagramSocket = new DatagramSocket(); + } catch (SocketException e) { + throw new InfluxDBIOException(e); + } } + } } -} + } /** * {@inheritDoc} @@ -639,10 +639,10 @@ public void query(final Query query, final int chunkSize, final BiConsumer call, final Throwable t) { }); } + /** + * {@inheritDoc} + */ + @Override + public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer onNext, + final Runnable onComplete, final Consumer onFailure) { + Call call; + if (query.hasBoundParameters()) { + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize, + query.getParameterJsonWithUrlEncoded()); + } else { + call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize, + query.getParameterJsonWithUrlEncoded()); + } + } else { + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize); + } else { + call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize); + } + } + + call.enqueue(new Callback() { + @Override + public void onResponse(final Call call, final Response response) { + + Cancellable cancellable = new Cancellable() { + @Override + public void cancel() { + call.cancel(); + } + + @Override + public boolean isCanceled() { + return call.isCanceled(); + } + }; + + try { + if (response.isSuccessful()) { + ResponseBody chunkedBody = response.body(); + chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete); + } else { + // REVIEW: must be handled consistently with IOException. + ResponseBody errorBody = response.errorBody(); + if (errorBody != null) { + InfluxDBException influxDBException = new InfluxDBException(errorBody.string()); + if (onFailure == null) { + throw influxDBException; + } else { + onFailure.accept(influxDBException); + } + } + } + } catch (IOException e) { + QueryResult queryResult = new QueryResult(); + queryResult.setError(e.toString()); + onNext.accept(cancellable, queryResult); + //passing null onFailure consumer is here for backward compatibility + //where the empty queryResult containing error is propagating into onNext consumer + if (onFailure != null) { + onFailure.accept(e); + } + } catch (Exception e) { + call.cancel(); + if (onFailure != null) { + onFailure.accept(e); + } + } + + } + + @Override + public void onFailure(final Call call, final Throwable t) { + if (onFailure == null) { + throw new InfluxDBException(t); + } else { + onFailure.accept(t); + } + } + }); + } + /** * {@inheritDoc} */ @@ -722,19 +806,19 @@ public QueryResult query(final Query query, final TimeUnit timeUnit) { if (query.hasBoundParameters()) { if (query.requiresPost()) { call = this.influxDBService.postQuery(getDatabase(query), TimeUtil.toTimePrecision(timeUnit), - query.getCommandWithUrlEncoded(), query.getParameterJsonWithUrlEncoded()); + query.getCommandWithUrlEncoded(), query.getParameterJsonWithUrlEncoded()); } else { call = this.influxDBService.query(getDatabase(query), TimeUtil.toTimePrecision(timeUnit), - query.getCommandWithUrlEncoded(), query.getParameterJsonWithUrlEncoded()); + query.getCommandWithUrlEncoded(), query.getParameterJsonWithUrlEncoded()); } } else { - if (query.requiresPost()) { - call = this.influxDBService.postQuery(getDatabase(query), - TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded()); - } else { - call = this.influxDBService.query(getDatabase(query), - TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(), null); - } + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), + TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded()); + } else { + call = this.influxDBService.query(getDatabase(query), + TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(), null); + } } return executeQuery(call); } @@ -797,17 +881,17 @@ private Call callQuery(final Query query) { if (query.hasBoundParameters()) { if (query.requiresPost()) { call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), - query.getParameterJsonWithUrlEncoded()); + query.getParameterJsonWithUrlEncoded()); } else { call = this.influxDBService.query(getDatabase(query), null, query.getCommandWithUrlEncoded(), - query.getParameterJsonWithUrlEncoded()); + query.getParameterJsonWithUrlEncoded()); } } else { - if (query.requiresPost()) { - call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded()); - } else { - call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded()); - } + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded()); + } else { + call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded()); + } } return call; } @@ -878,11 +962,11 @@ public void flush() { @Override public void close() { try { - this.disableBatch(); + this.disableBatch(); } finally { - if (datagramSocket != null && !datagramSocket.isClosed()) { - datagramSocket.close(); - } + if (datagramSocket != null && !datagramSocket.isClosed()) { + datagramSocket.close(); + } } this.client.dispatcher().executorService().shutdown(); this.client.connectionPool().evictAll(); @@ -991,7 +1075,7 @@ private class MessagePackChunkProccesor implements ChunkProccesor { @Override public void process(final ResponseBody chunkedBody, final Cancellable cancellable, final BiConsumer consumer, final Runnable onComplete) - throws IOException { + throws IOException { MessagePackTraverser traverser = new MessagePackTraverser(); try (InputStream is = chunkedBody.byteStream()) { for (Iterator it = traverser.traverse(is).iterator(); it.hasNext() && !cancellable.isCanceled();) { @@ -1015,7 +1099,7 @@ public JSONChunkProccesor(final JsonAdapter adapter) { @Override public void process(final ResponseBody chunkedBody, final Cancellable cancellable, final BiConsumer consumer, final Runnable onComplete) - throws IOException { + throws IOException { try { BufferedSource source = chunkedBody.source(); while (!cancellable.isCanceled()) { diff --git a/src/main/java/org/influxdb/impl/InfluxDBService.java b/src/main/java/org/influxdb/impl/InfluxDBService.java index 061a76615..4501a6bc7 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBService.java +++ b/src/main/java/org/influxdb/impl/InfluxDBService.java @@ -29,7 +29,7 @@ interface InfluxDBService { @GET("ping") public Call ping(); - /** + /** * @param username u: optional The username for authentication * @param password p: optional The password for authentication * @param database db: required The database to write points @@ -42,44 +42,56 @@ interface InfluxDBService { */ @POST("write") public Call writePoints(@Query(DB) String database, - @Query(RP) String retentionPolicy, @Query(PRECISION) String precision, - @Query(CONSISTENCY) String consistency, @Body RequestBody batchPoints); + @Query(RP) String retentionPolicy, @Query(PRECISION) String precision, + @Query(CONSISTENCY) String consistency, @Body RequestBody batchPoints); @GET("query") public Call query(@Query(DB) String db, - @Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query, - @Query(value = PARAMS, encoded = true) String params); + @Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query, + @Query(value = PARAMS, encoded = true) String params); @GET("query") public Call query(@Query(DB) String db, - @Query(value = Q, encoded = true) String query); + @Query(value = Q, encoded = true) String query); @POST("query") @FormUrlEncoded public Call postQuery(@Query(DB) String db, - @Field(value = Q, encoded = true) String query); + @Field(value = Q, encoded = true) String query); @POST("query") @FormUrlEncoded public Call postQuery(@Query(DB) String db, @Query(EPOCH) String epoch, - @Field(value = Q, encoded = true) String query); + @Field(value = Q, encoded = true) String query); @POST("query") @FormUrlEncoded public Call postQuery(@Query(DB) String db, @Query(EPOCH) String epoch, - @Field(value = Q, encoded = true) String query, @Query(value = PARAMS, encoded = true) String params); + @Field(value = Q, encoded = true) String query, @Query(value = PARAMS, encoded = true) String params); @Streaming @POST("query?chunked=true") @FormUrlEncoded public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, - @Query(CHUNK_SIZE) int chunkSize); + @Query(CHUNK_SIZE) int chunkSize); + + @Streaming + @POST("query?chunked=true") + @FormUrlEncoded + public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize); @Streaming @POST("query?chunked=true") @FormUrlEncoded public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, - @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + + @Streaming + @POST("query?chunked=true") + @FormUrlEncoded + public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); @POST("query") @FormUrlEncoded @@ -88,10 +100,20 @@ public Call postQuery(@Query(DB) String db, @Field(value = Q, enco @Streaming @GET("query?chunked=true") public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, - @Query(CHUNK_SIZE) int chunkSize); + @Query(CHUNK_SIZE) int chunkSize); + + @Streaming + @GET("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize); @Streaming @GET("query?chunked=true") public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, - @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + + @Streaming + @GET("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); } diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 2598b9f23..987e863a1 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -24,11 +24,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -37,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -105,119 +102,119 @@ public void testQuery() { this.influxDB.query(new Query("DROP DATABASE mydb2", "mydb")); } - /** - * Simple Test for a query. - */ - @Test - public void testQueryWithoutDatabase() { - influxDB.setDatabase(UDP_DATABASE); - influxDB.query(new Query("CREATE DATABASE mydb2")); - Point point = Point - .measurement("cpu") - .tag("atag", "test") - .addField("idle", 90L) - .addField("usertime", 9L) - .addField("system", 1L) - .build(); - influxDB.write(point); - - Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag") - .bind("atag", "test") - .create(); - QueryResult result = influxDB.query(query); - Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); - Series series = result.getResults().get(0).getSeries().get(0); - Assertions.assertTrue(series.getValues().size() == 1); - - influxDB.query(new Query("DROP DATABASE mydb2")); - } - - @Test - public void testBoundParameterQuery() throws InterruptedException { - // set up - Point point = Point - .measurement("cpu") - .tag("atag", "test") - .addField("idle", 90L) - .addField("usertime", 9L) - .addField("system", 1L) - .build(); - this.influxDB.setDatabase(UDP_DATABASE); - this.influxDB.write(point); - - // test - Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag") - .forDatabase(UDP_DATABASE) - .bind("atag", "test") - .create(); - QueryResult result = this.influxDB.query(query); - Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); - Series series = result.getResults().get(0).getSeries().get(0); - Assertions.assertTrue(series.getValues().size() == 1); - - result = this.influxDB.query(query, TimeUnit.SECONDS); - Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); - series = result.getResults().get(0).getSeries().get(0); - Assertions.assertTrue(series.getValues().size() == 1); - - Object waitForTestresults = new Object(); - Consumer check = (queryResult) -> { - if (!"DONE".equals(queryResult.getError())) { - Assertions.assertTrue(queryResult.getResults().get(0).getSeries().size() == 1); - Series s = queryResult.getResults().get(0).getSeries().get(0); - Assertions.assertTrue(s.getValues().size() == 1); - synchronized (waitForTestresults) { - waitForTestresults.notifyAll(); - } - } - }; - this.influxDB.query(query, 10, check); - synchronized (waitForTestresults) { - waitForTestresults.wait(2000); - } - } - - /** - * Tests for callback query. - */ - @Test - public void testCallbackQuery() throws Throwable { - final AsyncResult result = new AsyncResult<>(); - final Consumer firstQueryConsumer = new Consumer() { - @Override - public void accept(QueryResult queryResult) { - influxDB.query(new Query("DROP DATABASE mydb2", "mydb"), result.resultConsumer, result.errorConsumer); - } - }; - - this.influxDB.query(new Query("CREATE DATABASE mydb2", "mydb"), firstQueryConsumer, result.errorConsumer); - - // Will throw exception in case of error. - result.result(); - } - - /** - * Tests for callback query with a failure. - * see Issue #602 - */ - @Test - public void testCallbackQueryFailureHandling() throws Throwable { - final AsyncResult res = new AsyncResult<>(); - - this.influxDB.query(new Query("SHOW SERRIES"), res.resultConsumer, res.errorConsumer); - - try{ - res.result(); - Assertions.fail("Malformed query should throw InfluxDBException"); - } - catch (InfluxDBException e){ - Pattern errorPattern = Pattern.compile("Bad Request.*error parsing query: found SERRIES, expected.*", - Pattern.DOTALL); - - Assertions.assertTrue(errorPattern.matcher(e.getMessage()).matches(), - "Error string \"" + e.getMessage() + "\" does not match error pattern"); - } - } + /** + * Simple Test for a query. + */ + @Test + public void testQueryWithoutDatabase() { + influxDB.setDatabase(UDP_DATABASE); + influxDB.query(new Query("CREATE DATABASE mydb2")); + Point point = Point + .measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); + influxDB.write(point); + + Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag") + .bind("atag", "test") + .create(); + QueryResult result = influxDB.query(query); + Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); + Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertTrue(series.getValues().size() == 1); + + influxDB.query(new Query("DROP DATABASE mydb2")); + } + + @Test + public void testBoundParameterQuery() throws InterruptedException { + // set up + Point point = Point + .measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); + this.influxDB.setDatabase(UDP_DATABASE); + this.influxDB.write(point); + + // test + Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag") + .forDatabase(UDP_DATABASE) + .bind("atag", "test") + .create(); + QueryResult result = this.influxDB.query(query); + Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); + Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertTrue(series.getValues().size() == 1); + + result = this.influxDB.query(query, TimeUnit.SECONDS); + Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1); + series = result.getResults().get(0).getSeries().get(0); + Assertions.assertTrue(series.getValues().size() == 1); + + Object waitForTestresults = new Object(); + Consumer check = (queryResult) -> { + if (!"DONE".equals(queryResult.getError())) { + Assertions.assertTrue(queryResult.getResults().get(0).getSeries().size() == 1); + Series s = queryResult.getResults().get(0).getSeries().get(0); + Assertions.assertTrue(s.getValues().size() == 1); + synchronized (waitForTestresults) { + waitForTestresults.notifyAll(); + } + } + }; + this.influxDB.query(query, 10, check); + synchronized (waitForTestresults) { + waitForTestresults.wait(2000); + } + } + + /** + * Tests for callback query. + */ + @Test + public void testCallbackQuery() throws Throwable { + final AsyncResult result = new AsyncResult<>(); + final Consumer firstQueryConsumer = new Consumer() { + @Override + public void accept(QueryResult queryResult) { + influxDB.query(new Query("DROP DATABASE mydb2", "mydb"), result.resultConsumer, result.errorConsumer); + } + }; + + this.influxDB.query(new Query("CREATE DATABASE mydb2", "mydb"), firstQueryConsumer, result.errorConsumer); + + // Will throw exception in case of error. + result.result(); + } + + /** + * Tests for callback query with a failure. + * see Issue #602 + */ + @Test + public void testCallbackQueryFailureHandling() throws Throwable { + final AsyncResult res = new AsyncResult<>(); + + this.influxDB.query(new Query("SHOW SERRIES"), res.resultConsumer, res.errorConsumer); + + try{ + res.result(); + Assertions.fail("Malformed query should throw InfluxDBException"); + } + catch (InfluxDBException e){ + Pattern errorPattern = Pattern.compile("Bad Request.*error parsing query: found SERRIES, expected.*", + Pattern.DOTALL); + + Assertions.assertTrue(errorPattern.matcher(e.getMessage()).matches(), + "Error string \"" + e.getMessage() + "\" does not match error pattern"); + } + } /** * Test that describe Databases works. @@ -267,12 +264,12 @@ public void testWrite() { String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build(); Point point1 = Point - .measurement("cpu") - .tag("atag", "test") - .addField("idle", 90L) - .addField("usertime", 9L) - .addField("system", 1L) - .build(); + .measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); Point point2 = Point.measurement("disk").tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); batchPoints.point(point1); batchPoints.point(point2); @@ -283,40 +280,40 @@ public void testWrite() { this.influxDB.query(new Query("DROP DATABASE " + dbName)); } - /** - * Test that writing to the new lineprotocol, using {@link InfluxDB#setDatabase(String)} and not - * {@link BatchPoints#database(String)}. - */ - @Test - public void testWriteNoDatabase() { - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - this.influxDB.setDatabase(dbName); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - BatchPoints batchPoints = BatchPoints.builder().tag("async", "true").retentionPolicy(rp).build(); - Point point1 = Point - .measurement("cpu") - .tag("atag", "test") - .addField("idle", 90L) - .addField("usertime", 9L) - .addField("system", 1L) - .build(); - Point point2 = Point.measurement("disk").tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); - batchPoints.point(point1); - batchPoints.point(point2); - this.influxDB.write(batchPoints); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - QueryResult result = this.influxDB.query(query); - Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - } + /** + * Test that writing to the new lineprotocol, using {@link InfluxDB#setDatabase(String)} and not + * {@link BatchPoints#database(String)}. + */ + @Test + public void testWriteNoDatabase() { + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + this.influxDB.setDatabase(dbName); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.builder().tag("async", "true").retentionPolicy(rp).build(); + Point point1 = Point + .measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); + Point point2 = Point.measurement("disk").tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + batchPoints.point(point1); + batchPoints.point(point2); + this.influxDB.write(batchPoints); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + } /** * Tests that database information is used from {@link InfluxDB} when database information * is not present in query. */ - @Test - public void testQueryWithNoDatabase() { + @Test + public void testQueryWithNoDatabase() { String dbName = "write_unittest_" + System.currentTimeMillis(); this.influxDB.query(new Query("CREATE DATABASE " + dbName)); this.influxDB.setDatabase(dbName); // Set db here, then after write query should pass. @@ -467,39 +464,39 @@ public void testQueryNoDatabaseWithTimeFormat() { this.influxDB.query(new Query("DROP DATABASE " + dbName)); } - /** - * Test the implementation of {@link InfluxDB#write(int, Point)}'s async support. - */ - @Test - public void testAsyncWritePointThroughUDPFail() { - this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS); - try{ - Assertions.assertTrue(this.influxDB.isBatchEnabled()); - String measurement = TestUtils.getRandomMeasurement(); - Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); - Thread.currentThread().interrupt(); - Assertions.assertThrows(RuntimeException.class, () -> { + /** + * Test the implementation of {@link InfluxDB#write(int, Point)}'s async support. + */ + @Test + public void testAsyncWritePointThroughUDPFail() { + this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS); + try{ + Assertions.assertTrue(this.influxDB.isBatchEnabled()); + String measurement = TestUtils.getRandomMeasurement(); + Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + Thread.currentThread().interrupt(); + Assertions.assertThrows(RuntimeException.class, () -> { this.influxDB.write(UDP_PORT, point); }); - }finally{ - this.influxDB.disableBatch(); - } - } - - /** - * Test writing to the database using string protocol. - */ - @Test - public void testWriteStringData() { - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, "cpu,atag=test idle=90,usertime=9,system=1"); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - QueryResult result = this.influxDB.query(query); - Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - } + }finally{ + this.influxDB.disableBatch(); + } + } + + /** + * Test writing to the database using string protocol. + */ + @Test + public void testWriteStringData() { + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, "cpu,atag=test idle=90,usertime=9,system=1"); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + } /** * Test writing to the database using string protocol with simpler interface. @@ -518,52 +515,52 @@ public void testWriteStringDataSimple() { this.influxDB.query(new Query("DROP DATABASE " + dbName)); } - /** - * When batch of points' size is over UDP limit, the expected exception - * is java.lang.RuntimeException: java.net.SocketException: - * The message is larger than the maximum supported by the underlying transport: Datagram send failed - * @throws Exception - */ - @Test - public void testWriteMultipleStringDataLinesOverUDPLimit() throws Exception { - //prepare data - List lineProtocols = new ArrayList(); - int i = 0; - int length = 0; - while ( true ) { - Point point = Point.measurement("udp_single_poit").addField("v", i).build(); - String lineProtocol = point.lineProtocol(); - length += (lineProtocol.getBytes("utf-8")).length; - lineProtocols.add(lineProtocol); - if( length > 65535 ){ - break; - } - } - //write batch of string which size is over 64K - Assertions.assertThrows(RuntimeException.class, () -> { + /** + * When batch of points' size is over UDP limit, the expected exception + * is java.lang.RuntimeException: java.net.SocketException: + * The message is larger than the maximum supported by the underlying transport: Datagram send failed + * @throws Exception + */ + @Test + public void testWriteMultipleStringDataLinesOverUDPLimit() throws Exception { + //prepare data + List lineProtocols = new ArrayList(); + int i = 0; + int length = 0; + while ( true ) { + Point point = Point.measurement("udp_single_poit").addField("v", i).build(); + String lineProtocol = point.lineProtocol(); + length += (lineProtocol.getBytes("utf-8")).length; + lineProtocols.add(lineProtocol); + if( length > 65535 ){ + break; + } + } + //write batch of string which size is over 64K + Assertions.assertThrows(RuntimeException.class, () -> { this.influxDB.write(UDP_PORT, lineProtocols); }); - } - - /** - * Test writing multiple records to the database using string protocol. - */ - @Test - public void testWriteMultipleStringData() { - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - - this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, "cpu,atag=test1 idle=100,usertime=10,system=1\ncpu,atag=test2 idle=200,usertime=20,system=2\ncpu,atag=test3 idle=300,usertime=30,system=3"); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - QueryResult result = this.influxDB.query(query); - - Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); - Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); - Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); - Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - } + } + + /** + * Test writing multiple records to the database using string protocol. + */ + @Test + public void testWriteMultipleStringData() { + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, "cpu,atag=test1 idle=100,usertime=10,system=1\ncpu,atag=test2 idle=200,usertime=20,system=2\ncpu,atag=test3 idle=300,usertime=30,system=3"); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + + Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); + Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); + Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); + Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + } /** * Test writing multiple records to the database using string protocol with simpler interface. @@ -587,29 +584,29 @@ public void testWriteMultipleStringDataSimple() { this.influxDB.query(new Query("DROP DATABASE " + dbName)); } - /** - * Test writing multiple separate records to the database using string protocol. - */ - @Test - public void testWriteMultipleStringDataLines() { - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - - this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, Arrays.asList( - "cpu,atag=test1 idle=100,usertime=10,system=1", - "cpu,atag=test2 idle=200,usertime=20,system=2", - "cpu,atag=test3 idle=300,usertime=30,system=3" - )); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - QueryResult result = this.influxDB.query(query); - - Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); - Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); - Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); - Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - } + /** + * Test writing multiple separate records to the database using string protocol. + */ + @Test + public void testWriteMultipleStringDataLines() { + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, Arrays.asList( + "cpu,atag=test1 idle=100,usertime=10,system=1", + "cpu,atag=test2 idle=200,usertime=20,system=2", + "cpu,atag=test3 idle=300,usertime=30,system=3" + )); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + + Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); + Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); + Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); + Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + } /** * Tests writing points using the time precision feature @@ -627,38 +624,38 @@ public void testWriteBatchWithPrecision() throws Exception { // GIVEN a batch of points using second precision DateTimeFormatter formatter = DateTimeFormatter - .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") - .withZone(ZoneId.of("UTC")); + .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") + .withZone(ZoneId.of("UTC")); int t1 = 1485273600; Point p1 = Point - .measurement(measurement) - .addField("foo", 1d) - .tag("device", "one") - .time(t1, TimeUnit.SECONDS).build(); // 2017-01-27T16:00:00 + .measurement(measurement) + .addField("foo", 1d) + .tag("device", "one") + .time(t1, TimeUnit.SECONDS).build(); // 2017-01-27T16:00:00 String timeP1 = formatter.format(Instant.ofEpochSecond(t1)); int t2 = 1485277200; Point p2 = Point - .measurement(measurement) - .addField("foo", 2d) - .tag("device", "two") - .time(t2, TimeUnit.SECONDS).build(); // 2017-01-27T17:00:00 + .measurement(measurement) + .addField("foo", 2d) + .tag("device", "two") + .time(t2, TimeUnit.SECONDS).build(); // 2017-01-27T17:00:00 String timeP2 = formatter.format(Instant.ofEpochSecond(t2)); int t3 = 1485280800; Point p3 = Point - .measurement(measurement) - .addField("foo", 3d) - .tag("device", "three") - .time(t3, TimeUnit.SECONDS).build(); // 2017-01-27T18:00:00 + .measurement(measurement) + .addField("foo", 3d) + .tag("device", "three") + .time(t3, TimeUnit.SECONDS).build(); // 2017-01-27T18:00:00 String timeP3 = formatter.format(Instant.ofEpochSecond(t3)); BatchPoints batchPoints = BatchPoints - .database(dbName) - .retentionPolicy(rp) - .precision(TimeUnit.SECONDS) - .points(p1, p2, p3) - .build(); + .database(dbName) + .retentionPolicy(rp) + .precision(TimeUnit.SECONDS) + .points(p1, p2, p3) + .build(); // WHEN I write the batch this.influxDB.write(batchPoints); @@ -686,33 +683,33 @@ public void testWriteBatchWithoutPrecision() throws Exception { // GIVEN a batch of points that has no specific precision long t1 = 1485273600000000100L; Point p1 = Point - .measurement(measurement) - .addField("foo", 1d) - .tag("device", "one") - .time(t1, TimeUnit.NANOSECONDS).build(); // 2017-01-27T16:00:00.000000100Z + .measurement(measurement) + .addField("foo", 1d) + .tag("device", "one") + .time(t1, TimeUnit.NANOSECONDS).build(); // 2017-01-27T16:00:00.000000100Z Double timeP1 = Double.valueOf(t1); long t2 = 1485277200000000200L; Point p2 = Point - .measurement(measurement) - .addField("foo", 2d) - .tag("device", "two") - .time(t2, TimeUnit.NANOSECONDS).build(); // 2017-01-27T17:00:00.000000200Z + .measurement(measurement) + .addField("foo", 2d) + .tag("device", "two") + .time(t2, TimeUnit.NANOSECONDS).build(); // 2017-01-27T17:00:00.000000200Z Double timeP2 = Double.valueOf(t2); long t3 = 1485280800000000300L; Point p3 = Point - .measurement(measurement) - .addField("foo", 3d) - .tag("device", "three") - .time(t3, TimeUnit.NANOSECONDS).build(); // 2017-01-27T18:00:00.000000300Z + .measurement(measurement) + .addField("foo", 3d) + .tag("device", "three") + .time(t3, TimeUnit.NANOSECONDS).build(); // 2017-01-27T18:00:00.000000300Z Double timeP3 = Double.valueOf(t3); BatchPoints batchPoints = BatchPoints - .database(dbName) - .retentionPolicy(rp) - .points(p1, p2, p3) - .build(); + .database(dbName) + .retentionPolicy(rp) + .points(p1, p2, p3) + .build(); // WHEN I write the batch this.influxDB.write(batchPoints); @@ -733,7 +730,7 @@ public void testWriteBatchWithoutPrecision() throws Exception { Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1); Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2); Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3); - + this.influxDB.query(new Query("DROP DATABASE " + dbName)); } @@ -749,8 +746,8 @@ public void testWriteRecordsWithPrecision() throws Exception { // GIVEN a set of records using second precision DateTimeFormatter formatter = DateTimeFormatter - .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") - .withZone(ZoneId.of("UTC")); + .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") + .withZone(ZoneId.of("UTC")); List records = new ArrayList<>(); records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); String timeP1 = formatter.format(Instant.ofEpochSecond(1485273600)); @@ -785,9 +782,9 @@ public void testWriteMultipleStringDataLinesSimple() { this.influxDB.setRetentionPolicy(rp); this.influxDB.write(Arrays.asList( - "cpu,atag=test1 idle=100,usertime=10,system=1", - "cpu,atag=test2 idle=200,usertime=20,system=2", - "cpu,atag=test3 idle=300,usertime=30,system=3" + "cpu,atag=test1 idle=100,usertime=10,system=1", + "cpu,atag=test2 idle=200,usertime=20,system=2", + "cpu,atag=test3 idle=300,usertime=30,system=3" )); Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); QueryResult result = this.influxDB.query(query); @@ -811,29 +808,29 @@ public void testCreateNumericNamedDatabase() { this.influxDB.query(new Query("DROP DATABASE \"123\"")); } - /** - * Test that creating database which name is empty will throw expected exception - */ - @Test - public void testCreateEmptyNamedDatabase() { - Assertions.assertThrows(org.influxdb.InfluxDBException.class, () -> { + /** + * Test that creating database which name is empty will throw expected exception + */ + @Test + public void testCreateEmptyNamedDatabase() { + Assertions.assertThrows(org.influxdb.InfluxDBException.class, () -> { this.influxDB.query(new Query(Query.encode("CREATE DATABASE \"\""))); }); - } - - /** - * Test that creating database which name contains - - */ - @Test() - public void testCreateDatabaseWithNameContainHyphen() { - this.influxDB.query(new Query("CREATE DATABASE \"123-456\"")); - try { - List result = this.influxDB.describeDatabases(); - Assertions.assertTrue(result.contains("123-456")); - } finally { - this.influxDB.query(new Query("DROP DATABASE \"123-456\"")); - } - } + } + + /** + * Test that creating database which name contains - + */ + @Test() + public void testCreateDatabaseWithNameContainHyphen() { + this.influxDB.query(new Query("CREATE DATABASE \"123-456\"")); + try { + List result = this.influxDB.describeDatabases(); + Assertions.assertTrue(result.contains("123-456")); + } finally { + this.influxDB.query(new Query("DROP DATABASE \"123-456\"")); + } + } /** * Test the implementation of {@link InfluxDB#isBatchEnabled()}. @@ -896,20 +893,20 @@ public void testWrongHostForInfluxdb(){ String unresolvableHost = "a.b.c"; Assertions.assertThrows(InfluxDBIOException.class, () -> { - InfluxDBFactory.connect("http://" + unresolvableHost + ":" + TestUtils.getInfluxPORT(true)); + InfluxDBFactory.connect("http://" + unresolvableHost + ":" + TestUtils.getInfluxPORT(true)); }); } @Test - public void testInvalidUrlHandling(){ - Assertions.assertThrows(IllegalArgumentException.class, () -> { - InfluxDBFactory.connect("@@@http://@@@"); - }); + public void testInvalidUrlHandling(){ + Assertions.assertThrows(IllegalArgumentException.class, () -> { + InfluxDBFactory.connect("@@@http://@@@"); + }); - Assertions.assertThrows(IllegalArgumentException.class, () -> { - InfluxDBFactory.connect("http://@@@abc"); - }); - } + Assertions.assertThrows(IllegalArgumentException.class, () -> { + InfluxDBFactory.connect("http://@@@abc"); + }); + } @Test public void testBatchEnabledTwice() { @@ -935,105 +932,105 @@ public void testCloseInfluxDBClient() { Assertions.assertFalse(influxDB.isBatchEnabled()); } - /** - * Test writing multiple separate records to the database by Gzip compress - */ - @Test - public void testWriteEnableGzip() { - InfluxDB influxDBForTestGzip = InfluxDBFactory.connect("http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true), "admin", "admin"); - String dbName = "write_unittest_" + System.currentTimeMillis(); - try { - influxDBForTestGzip.setLogLevel(LogLevel.NONE); - influxDBForTestGzip.enableGzip(); - influxDBForTestGzip.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - - influxDBForTestGzip.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, Arrays.asList( - "cpu,atag=test1 idle=100,usertime=10,system=1", - "cpu,atag=test2 idle=200,usertime=20,system=2", - "cpu,atag=test3 idle=300,usertime=30,system=3" - )); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - QueryResult result = influxDBForTestGzip.query(query); - - Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); - Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); - Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); - Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); - } finally { - influxDBForTestGzip.query(new Query("DROP DATABASE " + dbName)); - influxDBForTestGzip.close(); - } - } - - /** - * Test the implementation of flag control for gzip such as: - * {@link InfluxDB#disableGzip()}} and {@link InfluxDB#isBatchEnabled()}},etc - */ - @Test - public void testWriteEnableGzipAndDisableGzip() { - InfluxDB influxDBForTestGzip = InfluxDBFactory.connect("http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true), "admin", "admin"); - try { - //test default: gzip is disable - Assertions.assertFalse(influxDBForTestGzip.isGzipEnabled()); - influxDBForTestGzip.enableGzip(); - Assertions.assertTrue(influxDBForTestGzip.isGzipEnabled()); - influxDBForTestGzip.disableGzip(); - Assertions.assertFalse(influxDBForTestGzip.isGzipEnabled()); - } finally { - influxDBForTestGzip.close(); - } - } - - /** - * Test chunking. - * @throws InterruptedException - */ - @Test - public void testChunking() throws InterruptedException { - if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { - // do not test version 0.13 and 1.0 - return; - } - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); - Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); - Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); - Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); - batchPoints.point(point1); - batchPoints.point(point2); - batchPoints.point(point3); - this.influxDB.write(batchPoints); - - Thread.sleep(2000); - final BlockingQueue queue = new LinkedBlockingQueue<>(); - Query query = new Query("SELECT * FROM disk", dbName); - this.influxDB.query(query, 2, new Consumer() { - @Override - public void accept(QueryResult result) { - queue.add(result); - }}); - - Thread.sleep(2000); - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - - QueryResult result = queue.poll(20, TimeUnit.SECONDS); - Assertions.assertNotNull(result); - System.out.println(result); - Assertions.assertEquals(2, result.getResults().get(0).getSeries().get(0).getValues().size()); - - result = queue.poll(20, TimeUnit.SECONDS); - Assertions.assertNotNull(result); - System.out.println(result); - Assertions.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size()); - - result = queue.poll(20, TimeUnit.SECONDS); - Assertions.assertNotNull(result); - System.out.println(result); - Assertions.assertEquals("DONE", result.getError()); - } + /** + * Test writing multiple separate records to the database by Gzip compress + */ + @Test + public void testWriteEnableGzip() { + InfluxDB influxDBForTestGzip = InfluxDBFactory.connect("http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true), "admin", "admin"); + String dbName = "write_unittest_" + System.currentTimeMillis(); + try { + influxDBForTestGzip.setLogLevel(LogLevel.NONE); + influxDBForTestGzip.enableGzip(); + influxDBForTestGzip.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + influxDBForTestGzip.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, Arrays.asList( + "cpu,atag=test1 idle=100,usertime=10,system=1", + "cpu,atag=test2 idle=200,usertime=20,system=2", + "cpu,atag=test3 idle=300,usertime=30,system=3" + )); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = influxDBForTestGzip.query(query); + + Assertions.assertEquals(result.getResults().get(0).getSeries().size(), 3); + Assertions.assertEquals("test1", result.getResults().get(0).getSeries().get(0).getTags().get("atag")); + Assertions.assertEquals("test2", result.getResults().get(0).getSeries().get(1).getTags().get("atag")); + Assertions.assertEquals("test3", result.getResults().get(0).getSeries().get(2).getTags().get("atag")); + } finally { + influxDBForTestGzip.query(new Query("DROP DATABASE " + dbName)); + influxDBForTestGzip.close(); + } + } + + /** + * Test the implementation of flag control for gzip such as: + * {@link InfluxDB#disableGzip()}} and {@link InfluxDB#isBatchEnabled()}},etc + */ + @Test + public void testWriteEnableGzipAndDisableGzip() { + InfluxDB influxDBForTestGzip = InfluxDBFactory.connect("http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true), "admin", "admin"); + try { + //test default: gzip is disable + Assertions.assertFalse(influxDBForTestGzip.isGzipEnabled()); + influxDBForTestGzip.enableGzip(); + Assertions.assertTrue(influxDBForTestGzip.isGzipEnabled()); + influxDBForTestGzip.disableGzip(); + Assertions.assertFalse(influxDBForTestGzip.isGzipEnabled()); + } finally { + influxDBForTestGzip.close(); + } + } + + /** + * Test chunking. + * @throws InterruptedException + */ + @Test + public void testChunking() throws InterruptedException { + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); + Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); + Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); + Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); + batchPoints.point(point1); + batchPoints.point(point2); + batchPoints.point(point3); + this.influxDB.write(batchPoints); + + Thread.sleep(2000); + final BlockingQueue queue = new LinkedBlockingQueue<>(); + Query query = new Query("SELECT * FROM disk", dbName); + this.influxDB.query(query, 2, new Consumer() { + @Override + public void accept(QueryResult result) { + queue.add(result); + }}); + + Thread.sleep(2000); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + + QueryResult result = queue.poll(20, TimeUnit.SECONDS); + Assertions.assertNotNull(result); + System.out.println(result); + Assertions.assertEquals(2, result.getResults().get(0).getSeries().get(0).getValues().size()); + + result = queue.poll(20, TimeUnit.SECONDS); + Assertions.assertNotNull(result); + System.out.println(result); + Assertions.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size()); + + result = queue.poll(20, TimeUnit.SECONDS); + Assertions.assertNotNull(result); + System.out.println(result); + Assertions.assertEquals("DONE", result.getError()); + } /** * Test chunking edge case. @@ -1052,13 +1049,13 @@ public void testChunkingFail() throws InterruptedException { final CountDownLatch countDownLatchFailure = new CountDownLatch(1); Query query = new Query("UNKNOWN_QUERY", dbName); this.influxDB.query(query, 10, - (cancellable, queryResult) -> { - countDownLatch.countDown(); - }, () -> { - }, - throwable -> { - countDownLatchFailure.countDown(); - }); + (cancellable, queryResult) -> { + countDownLatch.countDown(); + }, () -> { + }, + throwable -> { + countDownLatchFailure.countDown(); + }); this.influxDB.query(new Query("DROP DATABASE " + dbName)); Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS)); Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS)); @@ -1088,17 +1085,17 @@ public void testChunkingFailInConsumer() throws InterruptedException { final CountDownLatch countDownLatchComplete = new CountDownLatch(1); Query query = new Query("SELECT * FROM disk", dbName); this.influxDB.query(query, 2, - (cancellable, queryResult) -> { - countDownLatch.countDown(); - throw new RuntimeException("my error"); - }, () -> { - countDownLatchComplete.countDown(); - System.out.println("onComplete()"); - }, - throwable -> { - Assertions.assertEquals(throwable.getMessage(), "my error"); - countDownLatchFailure.countDown(); - }); + (cancellable, queryResult) -> { + countDownLatch.countDown(); + throw new RuntimeException("my error"); + }, () -> { + countDownLatchComplete.countDown(); + System.out.println("onComplete()"); + }, + throwable -> { + Assertions.assertEquals(throwable.getMessage(), "my error"); + countDownLatchFailure.countDown(); + }); this.influxDB.query(new Query("DROP DATABASE " + dbName)); Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS)); Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); @@ -1106,25 +1103,25 @@ public void testChunkingFailInConsumer() throws InterruptedException { } /** - * Test chunking on 0.13 and 1.0. - * @throws InterruptedException - */ - @Test() - public void testChunkingOldVersion() throws InterruptedException { - - if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { - - Assertions.assertThrows(RuntimeException.class, () -> { - String dbName = "write_unittest_" + System.currentTimeMillis(); - Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); - this.influxDB.query(query, 10, new Consumer() { - @Override - public void accept(QueryResult result) { - } + * Test chunking on 0.13 and 1.0. + * @throws InterruptedException + */ + @Test() + public void testChunkingOldVersion() throws InterruptedException { + + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + + Assertions.assertThrows(RuntimeException.class, () -> { + String dbName = "write_unittest_" + System.currentTimeMillis(); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + this.influxDB.query(query, 10, new Consumer() { + @Override + public void accept(QueryResult result) { + } + }); }); - }); - } - } + } + } @Test public void testChunkingOnComplete() throws InterruptedException { @@ -1158,6 +1155,68 @@ public void testChunkingOnComplete() throws InterruptedException { Assertions.assertTrue(await, "The onComplete action did not arrive!"); } + /** + * Test chunking with TimeUnit + * @throws InterruptedException + */ + @Test + public void testChunkingWithImeUnit() throws InterruptedException { + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); + Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); + Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); + Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); + batchPoints.point(point1); + batchPoints.point(point2); + batchPoints.point(point3); + this.influxDB.write(batchPoints); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + Thread.sleep(2000); + Query query = new Query("SELECT * FROM disk", dbName); + this.influxDB.query(query, 2, result -> {}, countDownLatch::countDown); + List results = new ArrayList<>(); + AtomicReference errorFound = new AtomicReference<>(); + + // Run and map to points + this.influxDB.query( + query, + TimeUnit.MILLISECONDS, + 5000, + (cancellable, queryResult) -> results.add(queryResult), + countDownLatch::countDown, + throwable -> { + countDownLatch.countDown(); + errorFound.set(throwable); + } + ); + + Thread.sleep(2000); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + + boolean await = countDownLatch.await(10, TimeUnit.SECONDS); + Assertions.assertTrue(await, "The onComplete action did not arrive!"); + Assertions.assertNull(errorFound.get(), "An error occurred : " + errorFound.get()); + + long totalPoints = results.stream() + .filter(qr -> qr.getResults() != null) + .flatMap(qr -> qr.getResults().stream()) + .filter(r -> r.getSeries() != null) + .flatMap(r -> r.getSeries().stream()) + .filter(s -> s.getValues() != null) + .mapToLong(s -> s.getValues().size()) + .sum(); + Assertions.assertEquals(3, totalPoints); + } + @Test public void testChunkingFailOnComplete() throws InterruptedException { if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { @@ -1189,11 +1248,11 @@ public void testChunkingCancelQuery() throws InterruptedException { for (int i = 0; i < 10; i++) { Point point = Point.measurement("disk") - .tag("atag", "a") - .addField("used", 60L + (i * 10)) - .addField("free", 1L + i) - .time(i, TimeUnit.SECONDS) - .build(); + .tag("atag", "a") + .addField("used", 60L + (i * 10)) + .addField("free", 1L + i) + .time(i, TimeUnit.SECONDS) + .build(); batchPoints.point(point); } @@ -1266,114 +1325,114 @@ public void testChunkingOnFailure() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Query query = new Query("XXXSELECT * FROM disk", "not-existing-db"); this.influxDB.query(query, 2, - //onNext - process result - (cancellable, queryResult) -> { - //assert that this is not executed in this test case - Assertions.fail("onNext() is executed!"); - }, - //onComplete - () -> Assertions.fail("onComplete() is executed !"), - //onFailure - throwable -> { - Assertions.assertTrue(throwable.getLocalizedMessage().contains("error parsing query: found XXXSELECT")); - countDownLatch.countDown(); - }); + //onNext - process result + (cancellable, queryResult) -> { + //assert that this is not executed in this test case + Assertions.fail("onNext() is executed!"); + }, + //onComplete + () -> Assertions.fail("onComplete() is executed !"), + //onFailure + throwable -> { + Assertions.assertTrue(throwable.getLocalizedMessage().contains("error parsing query: found XXXSELECT")); + countDownLatch.countDown(); + }); Assertions.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS)); } - @Test - public void testChunkingOnFailureConnectionError() throws InterruptedException { - - if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { - // do not test version 0.13 and 1.0 - return; - } - //connect to non existing port - InfluxDB influxDB = InfluxDBFactory.connect("http://"+TestUtils.getInfluxIP()+":12345"); - - CountDownLatch countDownLatch = new CountDownLatch(1); - Query query = new Query("SELECT * FROM disk", "not-existing-db"); - influxDB.query(query, 2, - //onNext - process result - (cancellable, queryResult) -> { - //assert that this is not executed in this test case - Assertions.fail("onNext() is executed!"); - }, - //onComplete - () -> Assertions.fail("onComplete() is executed !"), - //onFailure - throwable -> { - Assertions.assertTrue(throwable instanceof ConnectException); - countDownLatch.countDown(); - }); - - Assertions.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS)); - - } + @Test + public void testChunkingOnFailureConnectionError() throws InterruptedException { + + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + //connect to non existing port + InfluxDB influxDB = InfluxDBFactory.connect("http://"+TestUtils.getInfluxIP()+":12345"); + + CountDownLatch countDownLatch = new CountDownLatch(1); + Query query = new Query("SELECT * FROM disk", "not-existing-db"); + influxDB.query(query, 2, + //onNext - process result + (cancellable, queryResult) -> { + //assert that this is not executed in this test case + Assertions.fail("onNext() is executed!"); + }, + //onComplete + () -> Assertions.fail("onComplete() is executed !"), + //onFailure + throwable -> { + Assertions.assertTrue(throwable instanceof ConnectException); + countDownLatch.countDown(); + }); + + Assertions.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS)); + + } + + @Test + public void testChunkingQueryPost() throws InterruptedException { + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); + Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); + Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); + Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); + batchPoints.point(point1); + batchPoints.point(point2); + batchPoints.point(point3); + this.influxDB.write(batchPoints); + + CountDownLatch countDownLatch = new CountDownLatch(2); + + Thread.sleep(2000); + Query query = new Query("SELECT * FROM disk", dbName, true); + this.influxDB.query(query, 2, result -> countDownLatch.countDown()); + + boolean await = countDownLatch.await(10, TimeUnit.SECONDS); + Assertions.assertTrue(await, "The QueryResults did not arrive!"); + + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + } + + @Test + public void testFlushPendingWritesWhenBatchingEnabled() { + String dbName = "flush_tests_" + System.currentTimeMillis(); + try { + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + + // Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush(). + this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS); + + String measurement = TestUtils.getRandomMeasurement(); + Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point); + this.influxDB.flush(); + + Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); + } finally { + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + this.influxDB.disableBatch(); + } + } @Test - public void testChunkingQueryPost() throws InterruptedException { - if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { - // do not test version 0.13 and 1.0 - return; - } - - String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); - BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); - Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); - Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); - Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); - batchPoints.point(point1); - batchPoints.point(point2); - batchPoints.point(point3); - this.influxDB.write(batchPoints); - - CountDownLatch countDownLatch = new CountDownLatch(2); - - Thread.sleep(2000); - Query query = new Query("SELECT * FROM disk", dbName, true); - this.influxDB.query(query, 2, result -> countDownLatch.countDown()); - - boolean await = countDownLatch.await(10, TimeUnit.SECONDS); - Assertions.assertTrue(await, "The QueryResults did not arrive!"); - - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - } - - @Test - public void testFlushPendingWritesWhenBatchingEnabled() { - String dbName = "flush_tests_" + System.currentTimeMillis(); - try { - this.influxDB.query(new Query("CREATE DATABASE " + dbName)); - - // Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush(). - this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS); - - String measurement = TestUtils.getRandomMeasurement(); - Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); - this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point); - this.influxDB.flush(); - - Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName); - QueryResult result = this.influxDB.query(query); - Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); - } finally { - this.influxDB.query(new Query("DROP DATABASE " + dbName)); - this.influxDB.disableBatch(); - } - } - - @Test - public void testFlushThrowsIfBatchingIsNotEnabled() { - Assertions.assertFalse(this.influxDB.isBatchEnabled()); - Assertions.assertThrows(IllegalStateException.class, () -> { + public void testFlushThrowsIfBatchingIsNotEnabled() { + Assertions.assertFalse(this.influxDB.isBatchEnabled()); + Assertions.assertThrows(IllegalStateException.class, () -> { this.influxDB.flush(); }); - } + } /** * Test creation and deletion of retention policies @@ -1412,90 +1471,90 @@ public void testCreateDropRetentionPolicies() { public void testIsBatchEnabledWithConsistency() { Assertions.assertFalse(this.influxDB.isBatchEnabled()); this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory(), - (a, b) -> { - }, InfluxDB.ConsistencyLevel.ALL); + (a, b) -> { + }, InfluxDB.ConsistencyLevel.ALL); Assertions.assertTrue(this.influxDB.isBatchEnabled()); } /** - * Test initialize InfluxDBImpl with MessagePack format for InfluxDB versions before 1.4 will throw exception - */ + * Test initialize InfluxDBImpl with MessagePack format for InfluxDB versions before 1.4 will throw exception + */ @Test @EnabledIfEnvironmentVariable(named = "INFLUXDB_VERSION", matches = "1\\.3|1\\.2|1\\.1") public void testMessagePackOnOldDbVersion() { - Assertions.assertThrows(UnsupportedOperationException.class, () -> { - InfluxDB influxDB = TestUtils.connectToInfluxDB(ResponseFormat.MSGPACK); - influxDB.describeDatabases(); - }); + Assertions.assertThrows(UnsupportedOperationException.class, () -> { + InfluxDB influxDB = TestUtils.connectToInfluxDB(ResponseFormat.MSGPACK); + influxDB.describeDatabases(); + }); + } + + /** + * test for issue #445 + * make sure reusing of OkHttpClient.Builder causes no error + * @throws InterruptedException + */ + @Test + public void testIssue445() throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(100); + + final int maxCallables = 10_000; + List> callableList = new ArrayList<>(maxCallables); + for (int i = 0; i < maxCallables; i++) { + callableList.add(new Callable() { + @Override + public String call() throws Exception { + MyInfluxDBBean myBean = new MyInfluxDBBean(); + return myBean.connectAndDoNothing1(); + } + }); + } + executor.invokeAll(callableList); + executor.shutdown(); + if (!executor.awaitTermination(20, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + Assertions.assertTrue(MyInfluxDBBean.OK); + //assert that MyInfluxDBBean.OKHTTP_BUILDER stays untouched (no interceptor added) + Assertions.assertTrue(MyInfluxDBBean.OKHTTP_BUILDER.interceptors().isEmpty()); } - /** - * test for issue #445 - * make sure reusing of OkHttpClient.Builder causes no error - * @throws InterruptedException - */ - @Test - public void testIssue445() throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(100); - - final int maxCallables = 10_000; - List> callableList = new ArrayList<>(maxCallables); - for (int i = 0; i < maxCallables; i++) { - callableList.add(new Callable() { - @Override - public String call() throws Exception { - MyInfluxDBBean myBean = new MyInfluxDBBean(); - return myBean.connectAndDoNothing1(); - } - }); - } - executor.invokeAll(callableList); - executor.shutdown(); - if (!executor.awaitTermination(20, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - Assertions.assertTrue(MyInfluxDBBean.OK); - //assert that MyInfluxDBBean.OKHTTP_BUILDER stays untouched (no interceptor added) - Assertions.assertTrue(MyInfluxDBBean.OKHTTP_BUILDER.interceptors().isEmpty()); - } - - @Test - public void testQueryPostWithGZIPCompression() { - this.influxDB.enableGzip(); - String database = "db_gzip_" + System.currentTimeMillis(); - this.influxDB.query(new Query(String.format("CREATE DATABASE %s", database), null, true)); - QueryResult query = this.influxDB.query(new Query("SHOW DATABASES", null, true)); - assertThat(query.getResults()).hasSize(1); - assertThat(query.getResults().get(0).getSeries()).hasSize(1); - assertThat(query.getResults().get(0).getSeries().get(0).getValues()).contains(Collections.singletonList(database)); - this.influxDB.query(new Query(String.format("DROP DATABASE %s", database), null, true)); - } - - private static final class MyInfluxDBBean { - - static final OkHttpClient.Builder OKHTTP_BUILDER = new OkHttpClient.Builder(); - static Boolean OK = true; - static final String URL = "http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true); - - InfluxDB influxClient; - - String connectAndDoNothing1() { - synchronized (OK) { - if (!OK) { - return null; - } - } - try { - influxClient = InfluxDBFactory.connect(URL, "admin", "admin", OKHTTP_BUILDER); - influxClient.close(); - } catch (Exception e) { - synchronized (OK) { - if (OK) { - OK = false; - } - } - } - return null; - } - } + @Test + public void testQueryPostWithGZIPCompression() { + this.influxDB.enableGzip(); + String database = "db_gzip_" + System.currentTimeMillis(); + this.influxDB.query(new Query(String.format("CREATE DATABASE %s", database), null, true)); + QueryResult query = this.influxDB.query(new Query("SHOW DATABASES", null, true)); + assertThat(query.getResults()).hasSize(1); + assertThat(query.getResults().get(0).getSeries()).hasSize(1); + assertThat(query.getResults().get(0).getSeries().get(0).getValues()).contains(Collections.singletonList(database)); + this.influxDB.query(new Query(String.format("DROP DATABASE %s", database), null, true)); + } + + private static final class MyInfluxDBBean { + + static final OkHttpClient.Builder OKHTTP_BUILDER = new OkHttpClient.Builder(); + static Boolean OK = true; + static final String URL = "http://" + TestUtils.getInfluxIP() + ":" + TestUtils.getInfluxPORT(true); + + InfluxDB influxClient; + + String connectAndDoNothing1() { + synchronized (OK) { + if (!OK) { + return null; + } + } + try { + influxClient = InfluxDBFactory.connect(URL, "admin", "admin", OKHTTP_BUILDER); + influxClient.close(); + } catch (Exception e) { + synchronized (OK) { + if (OK) { + OK = false; + } + } + } + return null; + } + } }