diff --git a/README.md b/README.md index 0a9aa10e..4b07f152 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,45 @@ public class DisableTLSExample { } ``` +#### Capturing response metadata for observability + +The SDK supports capturing response metadata for all data plane operations (upsert, query, fetch, update, delete). This enables you to track latency metrics and integrate with observability tools like OpenTelemetry, Prometheus, or Datadog. + +```java +import io.pinecone.clients.Pinecone; +import io.pinecone.clients.Index; + +Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY") + .withResponseMetadataListener(metadata -> { + System.out.printf("Operation: %s | Client: %dms | Server: %dms | Network: %dms%n", + metadata.getOperationName(), + metadata.getClientDurationMs(), + metadata.getServerDurationMs(), + metadata.getNetworkOverheadMs()); + }) + .build(); + +Index index = pinecone.getIndexConnection("example-index"); +index.query(5, Arrays.asList(1.0f, 2.0f, 3.0f)); +// Output: Operation: query | Client: 45ms | Server: 32ms | Network: 13ms +``` + +The `ResponseMetadata` object provides: + +| Method | Description | +|--------|-------------| +| `getOperationName()` | Operation type: upsert, query, fetch, update, delete | +| `getClientDurationMs()` | Total round-trip time measured by the client | +| `getServerDurationMs()` | Server processing time from `x-pinecone-response-duration-ms` header | +| `getNetworkOverheadMs()` | Computed: client duration - server duration | +| `getIndexName()` | Name of the index | +| `getNamespace()` | Namespace used | +| `isSuccess()` | Whether the operation succeeded | +| `getGrpcStatusCode()` | gRPC status code | +| `getErrorType()` | Error category when failed | + +For a complete OpenTelemetry integration example with Prometheus and Grafana, see the [java-otel-metrics example](examples/java-otel-metrics/). + # Indexes Operations related to the building and managing of Pinecone indexes are called [control plane](https://docs.pinecone.io/reference/api/introduction#control-plane) operations. diff --git a/examples/build.gradle b/examples/build.gradle index 82b10a16..58b39d36 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -10,20 +10,28 @@ repositories { jcenter() } +def opentelemetryVersion = '1.35.0' + dependencies { implementation project(':pinecone-client') implementation "org.slf4j:slf4j-simple:1.7.30" implementation 'org.codehaus.groovy:groovy-all:2.4.15' + + // OpenTelemetry dependencies for java-otel-metrics example + implementation "io.opentelemetry:opentelemetry-sdk:${opentelemetryVersion}" + implementation "io.opentelemetry:opentelemetry-sdk-metrics:${opentelemetryVersion}" + implementation "io.opentelemetry:opentelemetry-exporter-otlp:${opentelemetryVersion}" + implementation "io.opentelemetry:opentelemetry-exporter-logging:${opentelemetryVersion}" } sourceSets { main { java { - srcDirs = ['java-basic-mvn/src'] + srcDirs = ['java-basic-mvn/src', 'java-otel-metrics/src/main/java'] } groovy { srcDirs = ['groovy-basic'] } } -} \ No newline at end of file +} diff --git a/examples/java-otel-metrics/README.md b/examples/java-otel-metrics/README.md new file mode 100644 index 00000000..0d02689b --- /dev/null +++ b/examples/java-otel-metrics/README.md @@ -0,0 +1,151 @@ +# Pinecone Java SDK - OpenTelemetry Metrics Example + +This example demonstrates how to integrate OpenTelemetry metrics with the Pinecone Java SDK using the `ResponseMetadataListener` feature. It captures latency metrics for all data plane operations and exports them to Prometheus/Grafana for visualization. + +## What This Example Does + +- Captures **client-side latency** (total round-trip time) for Pinecone operations +- Captures **server-side latency** from the `x-pinecone-response-duration-ms` header +- Calculates **network overhead** (client - server duration) +- Exports metrics to OpenTelemetry-compatible backends (Prometheus, Grafana, Datadog, etc.) + +## Metrics Recorded + +| Metric | Type | Description | +|--------|------|-------------| +| `db.client.operation.duration` | Histogram | Client-measured round-trip time (ms) | +| `pinecone.server.processing.duration` | Histogram | Server processing time from header (ms) | +| `db.client.operation.count` | Counter | Total number of operations | + +### Attributes + +| Attribute | Description | +|-----------|-------------| +| `db.system` | Always "pinecone" | +| `db.operation.name` | Operation type (upsert, query, fetch, update, delete) | +| `db.namespace` | Pinecone namespace | +| `pinecone.index_name` | Index name | +| `server.address` | Pinecone host | +| `status` | "success" or "error" | + +## Prerequisites + +- Java 8+ +- Maven 3.6+ +- Docker and Docker Compose +- A Pinecone account with an API key and index + +## Project Structure + +``` +java-otel-metrics/ +├── pom.xml # Maven dependencies +├── README.md # This file +├── observability/ # Local observability stack +│ ├── docker-compose.yml # Prometheus + Grafana + OTel Collector +│ ├── otel-collector-config.yaml # OTel Collector configuration +│ └── prometheus.yml # Prometheus scrape config +└── src/main/java/pineconeexamples/ + ├── PineconeOtelMetricsExample.java # Main example + └── PineconeMetricsRecorder.java # Reusable metrics recorder +``` + +## Quick Start + +### 1. Start the Observability Stack + +```bash +cd examples/java-otel-metrics/observability +docker-compose up -d +``` + +This starts: +- **OpenTelemetry Collector** (port 4317) - receives metrics via OTLP +- **Prometheus** (port 9090) - stores metrics +- **Grafana** (port 3000) - visualizes metrics + +### 2. Run the Example + +```bash +cd examples/java-otel-metrics + +export PINECONE_API_KEY=your-api-key +export PINECONE_INDEX=your-index-name +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 + +mvn package exec:java -Dexec.mainClass="pineconeexamples.PineconeOtelMetricsExample" +``` + +### 3. View Metrics in Grafana + +1. Open http://localhost:3000 +2. Login with `admin` / `admin` +3. Go to **Connections** → **Data sources** → **Add data source** +4. Select **Prometheus**, set URL to `http://prometheus:9090`, click **Save & test** +5. Go to **Dashboards** → **New** → **New Dashboard** → **Add visualization** + +### 4. Sample Grafana Queries + +**P50 Client vs Server Latency:** +```promql +histogram_quantile(0.5, sum(rate(db_client_operation_duration_milliseconds_bucket[5m])) by (le)) +histogram_quantile(0.5, sum(rate(pinecone_server_processing_duration_milliseconds_bucket[5m])) by (le)) +``` + +**P95 Latency by Operation:** +```promql +histogram_quantile(0.95, sum(rate(db_client_operation_duration_milliseconds_bucket[5m])) by (le, db_operation_name)) +``` + +**Operation Count by Type:** +```promql +sum by (db_operation_name) (db_client_operation_count_total) +``` + +## Understanding the Metrics + +### Percentiles Explained + +| Percentile | Meaning | +|------------|---------| +| P50 | Median - typical latency | +| P90 | 90% of requests are faster | +| P95 | Tail latency - good for SLAs | +| P99 | Worst-case for most users | + +### Network Overhead + +The difference between client and server duration shows network overhead: + +``` +Network Overhead = Client Duration - Server Duration +``` + +This helps identify whether latency issues are: +- **Server-side** (high server duration) +- **Network-side** (high network overhead) + +## Cleanup + +```bash +cd examples/java-otel-metrics/observability +docker-compose down +``` + +## Using in Your Project + +Copy `PineconeMetricsRecorder.java` into your project: + +```java +Meter meter = meterProvider.get("pinecone.client"); +PineconeMetricsRecorder recorder = new PineconeMetricsRecorder(meter); + +Pinecone client = new Pinecone.Builder(apiKey) + .withResponseMetadataListener(recorder) + .build(); + +// All operations now emit metrics automatically +Index index = client.getIndexConnection(indexName); +index.upsert(...); // Metrics recorded! +index.query(...); // Metrics recorded! +``` diff --git a/examples/java-otel-metrics/observability/docker-compose.yml b/examples/java-otel-metrics/observability/docker-compose.yml new file mode 100644 index 00000000..9773a7dd --- /dev/null +++ b/examples/java-otel-metrics/observability/docker-compose.yml @@ -0,0 +1,30 @@ +version: '3.8' + +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:0.96.0 + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "4317:4317" # OTLP gRPC receiver + + prometheus: + image: prom/prometheus:v2.49.1 + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - "9090:9090" # Prometheus UI + depends_on: + - otel-collector + + grafana: + image: grafana/grafana:10.3.1 + ports: + - "3000:3000" # Grafana UI + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_AUTH_ANONYMOUS_ENABLED=true + depends_on: + - prometheus + diff --git a/examples/java-otel-metrics/observability/otel-collector-config.yaml b/examples/java-otel-metrics/observability/otel-collector-config.yaml new file mode 100644 index 00000000..5d0c9623 --- /dev/null +++ b/examples/java-otel-metrics/observability/otel-collector-config.yaml @@ -0,0 +1,18 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + prometheus: + endpoint: "0.0.0.0:8889" + debug: + verbosity: detailed + +service: + pipelines: + metrics: + receivers: [otlp] + exporters: [prometheus, debug] + diff --git a/examples/java-otel-metrics/observability/prometheus.yml b/examples/java-otel-metrics/observability/prometheus.yml new file mode 100644 index 00000000..d3c34083 --- /dev/null +++ b/examples/java-otel-metrics/observability/prometheus.yml @@ -0,0 +1,9 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:8889'] + diff --git a/examples/java-otel-metrics/pom.xml b/examples/java-otel-metrics/pom.xml new file mode 100644 index 00000000..4e7fb7d5 --- /dev/null +++ b/examples/java-otel-metrics/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + pineconeexamples + java-otel-metrics + 1.0-SNAPSHOT + + java-otel-metrics + Example showing how to integrate OpenTelemetry metrics with Pinecone Java SDK + https://github.com/pinecone-io/pinecone-java-client + + + UTF-8 + 1.8 + 1.8 + 1.35.0 + + + + + + io.pinecone + pinecone-client + 6.1.0 + + + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + + + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + + + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + + + + + org.slf4j + slf4j-simple + 1.7.36 + + + + + + + + maven-clean-plugin + 3.1.0 + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + + + + diff --git a/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeMetricsRecorder.java b/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeMetricsRecorder.java new file mode 100644 index 00000000..3ed9358c --- /dev/null +++ b/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeMetricsRecorder.java @@ -0,0 +1,126 @@ +package pineconeexamples; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.pinecone.configs.ResponseMetadata; +import io.pinecone.configs.ResponseMetadataListener; + +/** + * A reusable OpenTelemetry metrics recorder for Pinecone operations. + * + *

This class implements {@link ResponseMetadataListener} to capture response metadata + * from Pinecone data plane operations and record them as OpenTelemetry metrics. + * + *

Metrics recorded: + *

+ * + *

Attributes follow OpenTelemetry semantic conventions for database clients: + *

+ * + *

Example usage: + *

{@code
+ * Meter meter = meterProvider.get("pinecone.client");
+ * PineconeMetricsRecorder recorder = new PineconeMetricsRecorder(meter);
+ * 
+ * Pinecone client = new Pinecone.Builder(apiKey)
+ *     .withResponseMetadataListener(recorder)
+ *     .build();
+ * }
+ * + *

You can copy this class into your project and customize it as needed. + */ +public class PineconeMetricsRecorder implements ResponseMetadataListener { + + // Attribute keys following OTel semantic conventions + private static final AttributeKey DB_SYSTEM = AttributeKey.stringKey("db.system"); + private static final AttributeKey DB_OPERATION_NAME = AttributeKey.stringKey("db.operation.name"); + private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey PINECONE_INDEX_NAME = AttributeKey.stringKey("pinecone.index_name"); + private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address"); + private static final AttributeKey STATUS = AttributeKey.stringKey("status"); + private static final AttributeKey ERROR_TYPE = AttributeKey.stringKey("error.type"); + + private final LongHistogram clientDurationHistogram; + private final LongHistogram serverDurationHistogram; + private final LongCounter operationCounter; + + /** + * Creates a new PineconeMetricsRecorder with the given OpenTelemetry Meter. + * + * @param meter the OpenTelemetry Meter to use for creating instruments + */ + public PineconeMetricsRecorder(Meter meter) { + // Client-side operation duration histogram + this.clientDurationHistogram = meter.histogramBuilder("db.client.operation.duration") + .setDescription("Duration of Pinecone operations from client perspective") + .setUnit("ms") + .ofLongs() + .build(); + + // Server-side processing duration histogram + this.serverDurationHistogram = meter.histogramBuilder("pinecone.server.processing.duration") + .setDescription("Server processing time from x-pinecone-response-duration-ms header") + .setUnit("ms") + .ofLongs() + .build(); + + // Operation counter + this.operationCounter = meter.counterBuilder("db.client.operation.count") + .setDescription("Total number of Pinecone operations") + .setUnit("{operation}") + .build(); + } + + @Override + public void onResponse(ResponseMetadata metadata) { + // Build common attributes + AttributesBuilder attributesBuilder = Attributes.builder() + .put(DB_SYSTEM, "pinecone") + .put(DB_OPERATION_NAME, metadata.getOperationName()) + .put(PINECONE_INDEX_NAME, metadata.getIndexName()) + .put(SERVER_ADDRESS, metadata.getServerAddress()) + .put(STATUS, metadata.getStatus()); + + // Add namespace if present + String namespace = metadata.getNamespace(); + if (namespace != null && !namespace.isEmpty()) { + attributesBuilder.put(DB_NAMESPACE, namespace); + } + + // Add error type if this was an error + if (!metadata.isSuccess() && metadata.getErrorType() != null) { + attributesBuilder.put(ERROR_TYPE, metadata.getErrorType()); + } + + Attributes attributes = attributesBuilder.build(); + + // Record client duration (always available) + clientDurationHistogram.record(metadata.getClientDurationMs(), attributes); + + // Record server duration (if available from header) + Long serverDuration = metadata.getServerDurationMs(); + if (serverDuration != null) { + serverDurationHistogram.record(serverDuration, attributes); + } + + // Increment operation counter + operationCounter.add(1, attributes); + } +} + diff --git a/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeOtelMetricsExample.java b/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeOtelMetricsExample.java new file mode 100644 index 00000000..89313b83 --- /dev/null +++ b/examples/java-otel-metrics/src/main/java/pineconeexamples/PineconeOtelMetricsExample.java @@ -0,0 +1,311 @@ +package pineconeexamples; + +import com.google.common.primitives.Floats; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributeKey; +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.proto.UpsertResponse; +import io.pinecone.unsigned_indices_model.QueryResponseWithUnsignedIndices; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/** + * Example demonstrating how to integrate OpenTelemetry metrics with the Pinecone Java SDK. + * + *

This example shows: + *

+ * + *

Environment variables: + *

+ * + *

Run with: + *

+ * mvn package exec:java -Dexec.mainClass="pineconeexamples.PineconeOtelMetricsExample"
+ * 
+ */ +public class PineconeOtelMetricsExample { + + private static final String SERVICE_NAME = "pinecone-otel-example"; + + public static void main(String[] args) { + // Read configuration from environment + String apiKey = getRequiredEnv("PINECONE_API_KEY"); + String indexName = getRequiredEnv("PINECONE_INDEX"); + String otlpEndpoint = System.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); + + System.out.println("============================================================"); + System.out.println("Pinecone OpenTelemetry Metrics Example"); + System.out.println("============================================================"); + System.out.println("Index: " + indexName); + System.out.println("OTLP Endpoint: " + (otlpEndpoint != null ? otlpEndpoint : "(not configured - console only)")); + System.out.println(); + + // Initialize OpenTelemetry + OpenTelemetrySdk openTelemetry = initializeOpenTelemetry(otlpEndpoint); + + try { + // Get a Meter for creating instruments + Meter meter = openTelemetry.getMeter("pinecone.client"); + + // Create the metrics recorder + PineconeMetricsRecorder metricsRecorder = new PineconeMetricsRecorder(meter); + + // Build Pinecone client with the metrics recorder as listener + Pinecone pinecone = new Pinecone.Builder(apiKey) + .withResponseMetadataListener(metricsRecorder) + .build(); + + // Get index connection + Index index = pinecone.getIndexConnection(indexName); + + System.out.println("Performing Pinecone operations..."); + System.out.println(); + + // Perform sample operations + performSampleOperations(index); + + // Close the index connection + index.close(); + + System.out.println(); + System.out.println("Operations complete. Flushing metrics..."); + System.out.println(); + + // Give metrics time to export (periodic reader exports every 10 seconds by default) + // Force a flush by waiting a bit + Thread.sleep(2000); + + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + } finally { + // Shutdown OpenTelemetry SDK + openTelemetry.shutdown(); + System.out.println("OpenTelemetry SDK shutdown complete."); + } + } + + /** + * Initialize OpenTelemetry SDK with console and optional OTLP exporters. + */ + private static OpenTelemetrySdk initializeOpenTelemetry(String otlpEndpoint) { + // Create resource with service name + Resource resource = Resource.getDefault() + .merge(Resource.create(Attributes.of( + AttributeKey.stringKey("service.name"), SERVICE_NAME + ))); + + // Create console exporter (logs metrics to stdout) + MetricReader consoleReader = PeriodicMetricReader.builder(LoggingMetricExporter.create()) + .setInterval(Duration.ofSeconds(10)) + .build(); + + // Define custom histogram buckets optimized for Pinecone latencies (in milliseconds) + // Fine granularity across the typical latency range (5ms - 500ms) + List latencyBuckets = Arrays.asList( + 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 40.0, 50.0, 60.0, 75.0, + 100.0, 125.0, 150.0, 175.0, 200.0, 250.0, 300.0, 400.0, 500.0, + 750.0, 1000.0, 2000.0, 5000.0 + ); + + // Create a View to apply custom buckets to all histograms from pinecone.client + View latencyHistogramView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets)) + .build(); + + SdkMeterProviderBuilder meterProviderBuilder = SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader(consoleReader) + // Apply custom histogram buckets to duration metrics + .registerView( + InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setMeterName("pinecone.client") + .build(), + latencyHistogramView + ); + + // Add OTLP exporter if endpoint is configured + if (otlpEndpoint != null && !otlpEndpoint.isEmpty()) { + OtlpGrpcMetricExporter otlpExporter = OtlpGrpcMetricExporter.builder() + .setEndpoint(otlpEndpoint) + .build(); + + MetricReader otlpReader = PeriodicMetricReader.builder(otlpExporter) + .setInterval(Duration.ofSeconds(10)) + .build(); + + meterProviderBuilder.registerMetricReader(otlpReader); + System.out.println("OTLP exporter configured for: " + otlpEndpoint); + } + + SdkMeterProvider meterProvider = meterProviderBuilder.build(); + + return OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + } + + /** + * Perform sample Pinecone operations to generate metrics. + * Runs a good number of operations to produce meaningful histogram data. + */ + private static void performSampleOperations(Index index) { + String namespace = "otel-example-ns"; + Random random = new Random(); + + // Generate sample vectors (assuming 3-dimensional index for simplicity) + // Adjust dimension based on your index configuration + int dimension = 3; + + // Number of operations to run (increase for richer metrics data) + int numWarmup = 5; + int numUpserts = 25; + int numQueries = 20; + int numFetches = 15; + int numUpdates = 10; + + try { + // 0. Warmup phase - establish connections (metrics not representative) + System.out.println("0. Warming up connections (first few requests are slower)..."); + for (int i = 0; i < numWarmup; i++) { + String vectorId = "warmup-vec-" + i; + List values = generateRandomVector(dimension, random); + index.upsert(vectorId, values, namespace); + } + // Query to warm up read path + index.query(1, generateRandomVector(dimension, random), null, null, null, namespace, null, false, false); + // Clean up warmup vectors + List warmupIds = new java.util.ArrayList<>(); + for (int i = 0; i < numWarmup; i++) { + warmupIds.add("warmup-vec-" + i); + } + index.deleteByIds(warmupIds, namespace); + System.out.println(" Warmup complete (connection established)"); + System.out.println(); + + // 1. Upsert operations + System.out.println("1. Upserting " + numUpserts + " vectors..."); + for (int i = 0; i < numUpserts; i++) { + String vectorId = "otel-vec-" + i; + List values = generateRandomVector(dimension, random); + UpsertResponse response = index.upsert(vectorId, values, namespace); + if (i % 5 == 0) { + System.out.println(" Upserted " + (i + 1) + "/" + numUpserts); + } + } + System.out.println(" Completed " + numUpserts + " upserts"); + + // Wait for eventual consistency + Thread.sleep(2000); + + // 2. Query operations + System.out.println("2. Running " + numQueries + " queries..."); + for (int i = 0; i < numQueries; i++) { + List queryVector = generateRandomVector(dimension, random); + QueryResponseWithUnsignedIndices response = index.query( + 5, queryVector, null, null, null, namespace, null, false, false); + if (i % 5 == 0) { + System.out.println(" Query " + (i + 1) + "/" + numQueries + " returned " + + response.getMatchesList().size() + " matches"); + } + } + System.out.println(" Completed " + numQueries + " queries"); + + // 3. Fetch operations + System.out.println("3. Running " + numFetches + " fetches..."); + for (int i = 0; i < numFetches; i++) { + String vectorId = "otel-vec-" + (i % numUpserts); + index.fetch(Arrays.asList(vectorId), namespace); + } + System.out.println(" Completed " + numFetches + " fetches"); + + // 4. Update operations + System.out.println("4. Running " + numUpdates + " updates..."); + for (int i = 0; i < numUpdates; i++) { + String vectorId = "otel-vec-" + i; + List newValues = generateRandomVector(dimension, random); + index.update(vectorId, newValues, namespace); + } + System.out.println(" Completed " + numUpdates + " updates"); + + // 5. Delete operations (cleanup) + System.out.println("5. Deleting vectors..."); + for (int i = 0; i < numUpserts; i += 5) { + List idsToDelete = Arrays.asList( + "otel-vec-" + i, + "otel-vec-" + (i + 1), + "otel-vec-" + (i + 2), + "otel-vec-" + (i + 3), + "otel-vec-" + (i + 4) + ); + index.deleteByIds(idsToDelete, namespace); + } + System.out.println(" Completed " + (numUpserts / 5) + " delete batches"); + + System.out.println(); + System.out.println("Total operations: " + (numUpserts + numQueries + numFetches + numUpdates + numUpserts/5)); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("Operation interrupted"); + } + } + + /** + * Generate a random vector of the specified dimension. + */ + private static List generateRandomVector(int dimension, Random random) { + Float[] values = new Float[dimension]; + for (int i = 0; i < dimension; i++) { + values[i] = random.nextFloat(); + } + return Arrays.asList(values); + } + + /** + * Get a required environment variable or exit with an error. + */ + private static String getRequiredEnv(String name) { + String value = System.getenv(name); + if (value == null || value.isEmpty()) { + System.err.println("Error: Required environment variable " + name + " is not set."); + System.err.println(); + System.err.println("Usage:"); + System.err.println(" export PINECONE_API_KEY=your-api-key"); + System.err.println(" export PINECONE_INDEX=your-index-name"); + System.err.println(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 # optional"); + System.err.println(); + System.err.println(" mvn package exec:java -Dexec.mainClass=\"pineconeexamples.PineconeOtelMetricsExample\""); + System.exit(1); + } + return value; + } +} + diff --git a/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataAsyncListenerIntegrationTest.java b/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataAsyncListenerIntegrationTest.java new file mode 100644 index 00000000..2d0979ea --- /dev/null +++ b/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataAsyncListenerIntegrationTest.java @@ -0,0 +1,259 @@ +package io.pinecone.integration.dataPlane; + +import com.google.common.util.concurrent.ListenableFuture; +import io.pinecone.clients.AsyncIndex; +import io.pinecone.clients.Pinecone; +import io.pinecone.configs.ResponseMetadata; +import io.pinecone.helpers.RandomStringBuilder; +import io.pinecone.helpers.TestResourcesManager; +import io.pinecone.unsigned_indices_model.VectorWithUnsignedIndices; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static io.pinecone.commons.IndexInterface.buildUpsertVectorWithUnsignedIndices; +import static io.pinecone.helpers.BuildUpsertRequest.*; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for the ResponseMetadataListener feature with AsyncIndex. + * Verifies that response metadata is correctly captured for async data plane operations. + */ +public class ResponseMetadataAsyncListenerIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(ResponseMetadataAsyncListenerIntegrationTest.class); + private static final TestResourcesManager resourceManager = TestResourcesManager.getInstance(); + private static final String namespace = RandomStringBuilder.build("async-resp-meta-ns", 8); + private static final CopyOnWriteArrayList capturedMetadata = new CopyOnWriteArrayList<>(); + + private static String indexName; + private static AsyncIndex asyncIndex; + private static int dimension; + + @BeforeAll + public static void setUp() throws InterruptedException { + dimension = resourceManager.getDimension(); + indexName = resourceManager.getOrCreateServerlessIndex(); + + Pinecone pineconeClient = new Pinecone.Builder(System.getenv("PINECONE_API_KEY")) + .withSourceTag("pinecone_test") + .withResponseMetadataListener(metadata -> { + logger.debug("Captured async metadata: {}", metadata); + capturedMetadata.add(metadata); + }) + .build(); + + asyncIndex = pineconeClient.getAsyncIndexConnection(indexName); + } + + @AfterAll + public static void cleanUp() { + if (asyncIndex != null) { + asyncIndex.close(); + } + } + + @Test + public void testAsyncUpsertCapturesMetadata() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("async-upsert", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + + asyncIndex.upsert(Collections.singletonList(vector), namespace).get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + ResponseMetadata metadata = findMetadataForOperation("upsert"); + assertNotNull(metadata, "Should have captured metadata for async upsert operation"); + + assertEquals("upsert", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertEquals("success", metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + + logMetadata("Async Upsert", metadata); + } + + @Test + public void testAsyncQueryCapturesMetadata() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("async-query", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + asyncIndex.upsert(Collections.singletonList(vector), namespace).get(30, TimeUnit.SECONDS); + + Thread.sleep(1000); + capturedMetadata.clear(); + + asyncIndex.query(5, values, null, null, null, namespace, null, false, false).get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + ResponseMetadata metadata = findMetadataForOperation("query"); + assertNotNull(metadata, "Should have captured metadata for async query operation"); + + assertEquals("query", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals("success", metadata.getStatus()); + + logMetadata("Async Query", metadata); + } + + @Test + public void testAsyncFetchCapturesMetadata() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("async-fetch", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + asyncIndex.upsert(Collections.singletonList(vector), namespace).get(30, TimeUnit.SECONDS); + + Thread.sleep(1000); + capturedMetadata.clear(); + + asyncIndex.fetch(Collections.singletonList(vectorId), namespace).get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + ResponseMetadata metadata = findMetadataForOperation("fetch"); + assertNotNull(metadata, "Should have captured metadata for async fetch operation"); + + assertEquals("fetch", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals("success", metadata.getStatus()); + + logMetadata("Async Fetch", metadata); + } + + @Test + public void testAsyncUpdateCapturesMetadata() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("async-update", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + asyncIndex.upsert(Collections.singletonList(vector), namespace).get(30, TimeUnit.SECONDS); + + Thread.sleep(1000); + capturedMetadata.clear(); + + List updatedValues = generateVectorValuesByDimension(dimension); + asyncIndex.update(vectorId, updatedValues, namespace).get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + ResponseMetadata metadata = findMetadataForOperation("update"); + assertNotNull(metadata, "Should have captured metadata for async update operation"); + + assertEquals("update", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals("success", metadata.getStatus()); + + logMetadata("Async Update", metadata); + } + + @Test + public void testAsyncDeleteCapturesMetadata() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("async-delete", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + asyncIndex.upsert(Collections.singletonList(vector), namespace).get(30, TimeUnit.SECONDS); + + Thread.sleep(1000); + capturedMetadata.clear(); + + asyncIndex.deleteByIds(Collections.singletonList(vectorId), namespace).get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + ResponseMetadata metadata = findMetadataForOperation("delete"); + assertNotNull(metadata, "Should have captured metadata for async delete operation"); + + assertEquals("delete", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals("success", metadata.getStatus()); + + logMetadata("Async Delete", metadata); + } + + @Test + public void testAsyncOperationsThreadSafety() throws ExecutionException, InterruptedException, TimeoutException { + capturedMetadata.clear(); + + String vectorId1 = RandomStringBuilder.build("async-ts-1", 8); + String vectorId2 = RandomStringBuilder.build("async-ts-2", 8); + String vectorId3 = RandomStringBuilder.build("async-ts-3", 8); + List values = generateVectorValuesByDimension(dimension); + + ListenableFuture future1 = asyncIndex.upsert( + Collections.singletonList(buildUpsertVectorWithUnsignedIndices(vectorId1, values, null, null, null)), + namespace); + ListenableFuture future2 = asyncIndex.upsert( + Collections.singletonList(buildUpsertVectorWithUnsignedIndices(vectorId2, values, null, null, null)), + namespace); + ListenableFuture future3 = asyncIndex.upsert( + Collections.singletonList(buildUpsertVectorWithUnsignedIndices(vectorId3, values, null, null, null)), + namespace); + + future1.get(30, TimeUnit.SECONDS); + future2.get(30, TimeUnit.SECONDS); + future3.get(30, TimeUnit.SECONDS); + Thread.sleep(100); + + int upsertCount = 0; + for (ResponseMetadata m : capturedMetadata) { + if ("upsert".equals(m.getOperationName())) { + upsertCount++; + } + } + + assertTrue(upsertCount >= 3, "Should have captured metadata for all 3 concurrent upserts, got: " + upsertCount); + + for (ResponseMetadata metadata : capturedMetadata) { + assertNotNull(metadata.getOperationName()); + assertNotNull(metadata.getIndexName()); + assertNotNull(metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + } + + logger.info("Captured {} metadata entries for concurrent async operations", capturedMetadata.size()); + } + + private void logMetadata(String operation, ResponseMetadata metadata) { + if (metadata.getServerDurationMs() != null) { + logger.info("{} - Client: {}ms, Server: {}ms, Network overhead: {}ms", + operation, + metadata.getClientDurationMs(), + metadata.getServerDurationMs(), + metadata.getNetworkOverheadMs()); + } else { + logger.info("{} - Client: {}ms (server duration header not present)", + operation, metadata.getClientDurationMs()); + } + } + + private ResponseMetadata findMetadataForOperation(String operationName) { + for (int i = capturedMetadata.size() - 1; i >= 0; i--) { + ResponseMetadata metadata = capturedMetadata.get(i); + if (operationName.equals(metadata.getOperationName())) { + return metadata; + } + } + return null; + } +} diff --git a/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataListenerIntegrationTest.java b/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataListenerIntegrationTest.java new file mode 100644 index 00000000..2df011ca --- /dev/null +++ b/src/integration/java/io/pinecone/integration/dataPlane/ResponseMetadataListenerIntegrationTest.java @@ -0,0 +1,285 @@ +package io.pinecone.integration.dataPlane; + +import io.pinecone.clients.Index; +import io.pinecone.clients.Pinecone; +import io.pinecone.configs.ResponseMetadata; +import io.pinecone.helpers.RandomStringBuilder; +import io.pinecone.helpers.TestResourcesManager; +import io.pinecone.unsigned_indices_model.VectorWithUnsignedIndices; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import static io.pinecone.commons.IndexInterface.buildUpsertVectorWithUnsignedIndices; +import static io.pinecone.helpers.BuildUpsertRequest.*; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for the ResponseMetadataListener feature. + * Verifies that response metadata is correctly captured for all data plane operations. + */ +public class ResponseMetadataListenerIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(ResponseMetadataListenerIntegrationTest.class); + private static final TestResourcesManager resourceManager = TestResourcesManager.getInstance(); + private static final String namespace = RandomStringBuilder.build("resp-meta-ns", 8); + private static final CopyOnWriteArrayList capturedMetadata = new CopyOnWriteArrayList<>(); + + private static String indexName; + private static Index index; + private static int dimension; + + @BeforeAll + public static void setUp() throws InterruptedException { + dimension = resourceManager.getDimension(); + indexName = resourceManager.getOrCreateServerlessIndex(); + + Pinecone pineconeClient = new Pinecone.Builder(System.getenv("PINECONE_API_KEY")) + .withSourceTag("pinecone_test") + .withResponseMetadataListener(metadata -> { + logger.debug("Captured metadata: {}", metadata); + capturedMetadata.add(metadata); + }) + .build(); + + index = pineconeClient.getIndexConnection(indexName); + } + + @AfterAll + public static void cleanUp() { + if (index != null) { + index.close(); + } + } + + @Test + public void testUpsertCapturesMetadata() { + capturedMetadata.clear(); + + List ids = getIdsList(1); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + ids.get(0), values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + assertFalse(capturedMetadata.isEmpty(), "Should have captured at least one metadata entry"); + + ResponseMetadata metadata = findMetadataForOperation("upsert"); + assertNotNull(metadata, "Should have captured metadata for upsert operation"); + + assertEquals("upsert", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertNotNull(metadata.getServerAddress()); + assertTrue(metadata.getServerAddress().contains("pinecone.io")); + assertTrue(metadata.getClientDurationMs() >= 0, "Client duration should be non-negative"); + assertEquals("success", metadata.getStatus()); + assertEquals("OK", metadata.getGrpcStatusCode()); + assertNull(metadata.getErrorType()); + assertTrue(metadata.isSuccess()); + + logMetadata("Upsert", metadata); + } + + @Test + public void testQueryCapturesMetadata() throws InterruptedException { + capturedMetadata.clear(); + + List ids = getIdsList(1); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + ids.get(0), values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + Thread.sleep(1000); + capturedMetadata.clear(); + + index.query(5, values, null, null, null, namespace, null, false, false); + + ResponseMetadata metadata = findMetadataForOperation("query"); + assertNotNull(metadata, "Should have captured metadata for query operation"); + + assertEquals("query", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertEquals("success", metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + + logMetadata("Query", metadata); + } + + @Test + public void testFetchCapturesMetadata() throws InterruptedException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("fetch-test", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + Thread.sleep(1000); + capturedMetadata.clear(); + + index.fetch(Collections.singletonList(vectorId), namespace); + + ResponseMetadata metadata = findMetadataForOperation("fetch"); + assertNotNull(metadata, "Should have captured metadata for fetch operation"); + + assertEquals("fetch", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertEquals("success", metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + + logMetadata("Fetch", metadata); + } + + @Test + public void testUpdateCapturesMetadata() throws InterruptedException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("update-test", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + Thread.sleep(1000); + capturedMetadata.clear(); + + List updatedValues = generateVectorValuesByDimension(dimension); + index.update(vectorId, updatedValues, namespace); + + ResponseMetadata metadata = findMetadataForOperation("update"); + assertNotNull(metadata, "Should have captured metadata for update operation"); + + assertEquals("update", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertEquals("success", metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + + logMetadata("Update", metadata); + } + + @Test + public void testDeleteCapturesMetadata() throws InterruptedException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("delete-test", 8); + List values = generateVectorValuesByDimension(dimension); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + Thread.sleep(1000); + capturedMetadata.clear(); + + index.deleteByIds(Collections.singletonList(vectorId), namespace); + + ResponseMetadata metadata = findMetadataForOperation("delete"); + assertNotNull(metadata, "Should have captured metadata for delete operation"); + + assertEquals("delete", metadata.getOperationName()); + assertEquals(indexName, metadata.getIndexName()); + assertEquals(namespace, metadata.getNamespace()); + assertEquals("success", metadata.getStatus()); + assertTrue(metadata.getClientDurationMs() >= 0); + + logMetadata("Delete", metadata); + } + + @Test + public void testMultipleOperationsCaptureAllMetadata() throws InterruptedException { + capturedMetadata.clear(); + + String vectorId = RandomStringBuilder.build("multi-test", 8); + List values = generateVectorValuesByDimension(dimension); + + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + Thread.sleep(500); + + index.query(5, values, null, null, null, namespace, null, false, false); + index.fetch(Collections.singletonList(vectorId), namespace); + + List updatedValues = generateVectorValuesByDimension(dimension); + index.update(vectorId, updatedValues, namespace); + index.deleteByIds(Collections.singletonList(vectorId), namespace); + + assertTrue(capturedMetadata.size() >= 5, "Should have captured metadata for at least 5 operations"); + + assertNotNull(findMetadataForOperation("upsert"), "Should have upsert metadata"); + assertNotNull(findMetadataForOperation("query"), "Should have query metadata"); + assertNotNull(findMetadataForOperation("fetch"), "Should have fetch metadata"); + assertNotNull(findMetadataForOperation("update"), "Should have update metadata"); + assertNotNull(findMetadataForOperation("delete"), "Should have delete metadata"); + + logger.info("Captured {} metadata entries for multiple operations", capturedMetadata.size()); + } + + @Test + public void testDefaultNamespaceCapturedCorrectly() { + capturedMetadata.clear(); + + List values = generateVectorValuesByDimension(dimension); + String vectorId = RandomStringBuilder.build("default-ns-test", 8); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + + index.upsert(Collections.singletonList(vector), ""); + + ResponseMetadata metadata = findMetadataForOperation("upsert"); + assertNotNull(metadata, "Should have captured metadata"); + assertEquals("", metadata.getNamespace(), "Default namespace should be empty string"); + } + + @Test + public void testServerAddressMatchesIndexHost() { + capturedMetadata.clear(); + + List values = generateVectorValuesByDimension(dimension); + String vectorId = RandomStringBuilder.build("host-test", 8); + VectorWithUnsignedIndices vector = buildUpsertVectorWithUnsignedIndices( + vectorId, values, null, null, null); + index.upsert(Collections.singletonList(vector), namespace); + + ResponseMetadata metadata = findMetadataForOperation("upsert"); + assertNotNull(metadata, "Should have captured metadata"); + assertNotNull(metadata.getServerAddress()); + assertTrue(metadata.getServerAddress().contains("pinecone.io"), + "Server address should be a Pinecone host"); + } + + private void logMetadata(String operation, ResponseMetadata metadata) { + if (metadata.getServerDurationMs() != null) { + logger.info("{} - Client: {}ms, Server: {}ms, Network overhead: {}ms", + operation, + metadata.getClientDurationMs(), + metadata.getServerDurationMs(), + metadata.getNetworkOverheadMs()); + } else { + logger.info("{} - Client: {}ms (server duration header not present)", + operation, metadata.getClientDurationMs()); + } + } + + private ResponseMetadata findMetadataForOperation(String operationName) { + for (int i = capturedMetadata.size() - 1; i >= 0; i--) { + ResponseMetadata metadata = capturedMetadata.get(i); + if (operationName.equals(metadata.getOperationName())) { + return metadata; + } + } + return null; + } +} diff --git a/src/main/java/io/pinecone/clients/Pinecone.java b/src/main/java/io/pinecone/clients/Pinecone.java index ef3d953f..9d7cc082 100644 --- a/src/main/java/io/pinecone/clients/Pinecone.java +++ b/src/main/java/io/pinecone/clients/Pinecone.java @@ -3,6 +3,7 @@ import io.pinecone.configs.PineconeConfig; import io.pinecone.configs.PineconeConnection; import io.pinecone.configs.ProxyConfig; +import io.pinecone.configs.ResponseMetadataListener; import io.pinecone.exceptions.*; import okhttp3.OkHttpClient; import org.openapitools.db_control.client.ApiClient; @@ -1477,6 +1478,10 @@ public Index getIndexConnection(String indexName) throws PineconeValidationExcep PineconeConfig perConnectionConfig = new PineconeConfig(config.getApiKey(), config.getSourceTag()); perConnectionConfig.setHost(getIndexHost(indexName)); + perConnectionConfig.setTLSEnabled(config.isTLSEnabled()); + if (config.getResponseMetadataListener() != null) { + perConnectionConfig.setResponseMetadataListener(config.getResponseMetadataListener()); + } PineconeConnection connection = getConnection(indexName, perConnectionConfig); return new Index(perConnectionConfig, connection, indexName); @@ -1509,9 +1514,13 @@ public AsyncIndex getAsyncIndexConnection(String indexName) throws PineconeValid PineconeConfig perConnectionConfig = new PineconeConfig(config.getApiKey(), config.getSourceTag()); perConnectionConfig.setHost(getIndexHost(indexName)); + perConnectionConfig.setTLSEnabled(config.isTLSEnabled()); + if (config.getResponseMetadataListener() != null) { + perConnectionConfig.setResponseMetadataListener(config.getResponseMetadataListener()); + } PineconeConnection connection = getConnection(indexName, perConnectionConfig); - return new AsyncIndex(config, connection, indexName); + return new AsyncIndex(perConnectionConfig, connection, indexName); } /** @@ -1527,7 +1536,7 @@ public Inference getInferenceClient() { } PineconeConnection getConnection(String indexName, PineconeConfig perConnectionConfig) { - return connectionsMap.computeIfAbsent(indexName, key -> new PineconeConnection(perConnectionConfig)); + return connectionsMap.computeIfAbsent(indexName, key -> new PineconeConnection(perConnectionConfig, indexName)); } ConcurrentHashMap getConnectionsMap() { @@ -1563,6 +1572,7 @@ public static class Builder { private ProxyConfig proxyConfig; private OkHttpClient customOkHttpClient; private boolean enableTls = true; + private ResponseMetadataListener responseMetadataListener; /** * Constructs a new {@link Builder} with the mandatory API key. @@ -1714,6 +1724,34 @@ public Builder withTlsEnabled(boolean enableTls) { return this; } + /** + * Sets a listener to receive response metadata from data plane operations. + *

+ * The listener is invoked after each upsert, query, fetch, update, or delete operation completes. + * Use this for custom metrics, logging, or OpenTelemetry integration. + *

+ * Example usage: + *

{@code
+         * Pinecone client = new Pinecone.Builder("PINECONE_API_KEY")
+         *     .withResponseMetadataListener(metadata -> {
+         *         System.out.println("Operation: " + metadata.getOperationName());
+         *         System.out.println("Server time: " + metadata.getServerDurationMs() + "ms");
+         *         System.out.println("Total time: " + metadata.getClientDurationMs() + "ms");
+         *     })
+         *     .build();
+         *
+         * Index index = client.getIndexConnection("my-index");
+         * index.query(...);  // Listener is automatically invoked
+         * }
+ * + * @param listener The listener to receive response metadata. + * @return This {@link Builder} instance for chaining method calls. + */ + public Builder withResponseMetadataListener(ResponseMetadataListener listener) { + this.responseMetadataListener = listener; + return this; + } + /** * Builds and returns a {@link Pinecone} instance configured with the provided API key, optional source tag, * and OkHttpClient. @@ -1726,6 +1764,9 @@ public Builder withTlsEnabled(boolean enableTls) { public Pinecone build() { PineconeConfig config = new PineconeConfig(apiKey, sourceTag, proxyConfig, customOkHttpClient); config.setTLSEnabled(enableTls); + if (responseMetadataListener != null) { + config.setResponseMetadataListener(responseMetadataListener); + } config.validate(); if (proxyConfig != null && customOkHttpClient != null) { diff --git a/src/main/java/io/pinecone/configs/PineconeConfig.java b/src/main/java/io/pinecone/configs/PineconeConfig.java index f435e0d0..a33313e0 100644 --- a/src/main/java/io/pinecone/configs/PineconeConfig.java +++ b/src/main/java/io/pinecone/configs/PineconeConfig.java @@ -55,6 +55,7 @@ public class PineconeConfig { private OkHttpClient customOkHttpClient; private ManagedChannel customManagedChannel; private boolean enableTLS = true; + private ResponseMetadataListener responseMetadataListener; /** * Constructs a {@link PineconeConfig} instance with the specified API key. @@ -248,6 +249,25 @@ public void setTLSEnabled(boolean enableTLS) { this.enableTLS = enableTLS; } + /** + * Returns the response metadata listener, or null if not configured. + * + * @return The response metadata listener for capturing timing metrics from data plane operations. + */ + public ResponseMetadataListener getResponseMetadataListener() { + return responseMetadataListener; + } + + /** + * Sets the response metadata listener for capturing timing metrics from data plane operations. + * The listener is invoked after each upsert, query, fetch, update, or delete operation completes. + * + * @param responseMetadataListener The listener to receive response metadata. + */ + public void setResponseMetadataListener(ResponseMetadataListener responseMetadataListener) { + this.responseMetadataListener = responseMetadataListener; + } + private String buildUserAgent() { String userAgent = String.format("lang=java; %s=%s", "pineconeClientVersion", pineconeClientVersion); if (this.getSourceTag() != null && !this.getSourceTag().isEmpty()) { diff --git a/src/main/java/io/pinecone/configs/PineconeConnection.java b/src/main/java/io/pinecone/configs/PineconeConnection.java index 212bb27b..21685b7e 100644 --- a/src/main/java/io/pinecone/configs/PineconeConnection.java +++ b/src/main/java/io/pinecone/configs/PineconeConnection.java @@ -48,6 +48,7 @@ public class PineconeConnection implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(PineconeConnection.class); private final PineconeConfig config; + private final String indexName; final ManagedChannel channel; /** @@ -70,7 +71,22 @@ public class PineconeConnection implements AutoCloseable { * @throws PineconeValidationException If index name or host is not provided for data plane operations. */ public PineconeConnection(PineconeConfig config) { + this(config, null); + } + + /** + * Constructs a {@link PineconeConnection} instance with the specified {@link PineconeConfig} and index name. + * If a custom gRPC ManagedChannel is provided in the {@link PineconeConfig}, it will be used. + * Otherwise, a new gRPC ManagedChannel will be built using the host specified in the {@link PineconeConfig}. + *

+ * + * @param config The {@link PineconeConfig} containing configuration settings for the PineconeConnection. + * @param indexName The name of the index, used for response metadata tracking. + * @throws PineconeValidationException If index name or host is not provided for data plane operations. + */ + public PineconeConnection(PineconeConfig config, String indexName) { this.config = config; + this.indexName = indexName; if (config.getCustomManagedChannel() != null) { channel = config.getCustomManagedChannel(); } else { @@ -91,19 +107,41 @@ private void initialize() { } private VectorServiceGrpc.VectorServiceBlockingStub generateBlockingStub(Metadata metadata) { - return VectorServiceGrpc + VectorServiceGrpc.VectorServiceBlockingStub stub = VectorServiceGrpc .newBlockingStub(channel) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .withMaxInboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE) .withMaxOutboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE); + + // Add response metadata interceptor if listener is configured + if (config.getResponseMetadataListener() != null) { + stub = stub.withInterceptors( + new ResponseMetadataInterceptor( + config.getResponseMetadataListener(), + indexName != null ? indexName : "", + config.getHost() != null ? config.getHost() : "")); + } + + return stub; } private VectorServiceGrpc.VectorServiceFutureStub generateAsyncStub(Metadata metadata) { - return VectorServiceGrpc + VectorServiceGrpc.VectorServiceFutureStub stub = VectorServiceGrpc .newFutureStub(channel) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .withMaxInboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE) .withMaxOutboundMessageSize(DEFAULT_MAX_MESSAGE_SIZE); + + // Add response metadata interceptor if listener is configured + if (config.getResponseMetadataListener() != null) { + stub = stub.withInterceptors( + new ResponseMetadataInterceptor( + config.getResponseMetadataListener(), + indexName != null ? indexName : "", + config.getHost() != null ? config.getHost() : "")); + } + + return stub; } /** diff --git a/src/main/java/io/pinecone/configs/ResponseMetadata.java b/src/main/java/io/pinecone/configs/ResponseMetadata.java new file mode 100644 index 00000000..3b4f14dd --- /dev/null +++ b/src/main/java/io/pinecone/configs/ResponseMetadata.java @@ -0,0 +1,230 @@ +package io.pinecone.configs; + +/** + * Captures response metadata from Pinecone data plane operations. + * Contains timing information for observability and monitoring. + * + *

This class provides: + *

    + *
  • {@link #getClientDurationMs()} - Total round-trip time measured by the client
  • + *
  • {@link #getServerDurationMs()} - Server processing time from x-pinecone-response-duration-ms header
  • + *
  • {@link #getNetworkOverheadMs()} - Computed network + serialization overhead
  • + *
+ * + *

Example usage with a listener: + *

{@code
+ * Pinecone client = new Pinecone.Builder(apiKey)
+ *     .withResponseMetadataListener(metadata -> {
+ *         System.out.println("Operation: " + metadata.getOperationName());
+ *         System.out.println("Server duration: " + metadata.getServerDurationMs() + "ms");
+ *         System.out.println("Client duration: " + metadata.getClientDurationMs() + "ms");
+ *         System.out.println("Network overhead: " + metadata.getNetworkOverheadMs() + "ms");
+ *     })
+ *     .build();
+ * }
+ */ +public class ResponseMetadata { + + private final String operationName; + private final String indexName; + private final String namespace; + private final String serverAddress; + private final long clientDurationMs; + private final Long serverDurationMs; + private final String status; + private final String grpcStatusCode; + private final String errorType; + + private ResponseMetadata(Builder builder) { + this.operationName = builder.operationName; + this.indexName = builder.indexName; + this.namespace = builder.namespace; + this.serverAddress = builder.serverAddress; + this.clientDurationMs = builder.clientDurationMs; + this.serverDurationMs = builder.serverDurationMs; + this.status = builder.status; + this.grpcStatusCode = builder.grpcStatusCode; + this.errorType = builder.errorType; + } + + /** + * Returns the operation name (e.g., "upsert", "query", "fetch", "update", "delete"). + * Corresponds to OTel attribute: db.operation.name + */ + public String getOperationName() { + return operationName; + } + + /** + * Returns the Pinecone index name. + * Corresponds to OTel attribute: pinecone.index_name + */ + public String getIndexName() { + return indexName; + } + + /** + * Returns the Pinecone namespace (empty string if default namespace). + * Corresponds to OTel attribute: db.namespace + */ + public String getNamespace() { + return namespace; + } + + /** + * Returns the server address/host. + * Corresponds to OTel attribute: server.address + */ + public String getServerAddress() { + return serverAddress; + } + + /** + * Returns the total client-side duration in milliseconds. + * Measured from request initiation to response completion. + * Corresponds to metric: db.client.operation.duration + */ + public long getClientDurationMs() { + return clientDurationMs; + } + + /** + * Returns the server processing duration in milliseconds, or null if the + * x-pinecone-response-duration-ms header was not present. + * Corresponds to metric: pinecone.server.processing.duration + */ + public Long getServerDurationMs() { + return serverDurationMs; + } + + /** + * Returns the computed network overhead in milliseconds (client duration minus server duration), + * or null if server duration is not available. + * This includes network latency, serialization, and deserialization time. + */ + public Long getNetworkOverheadMs() { + if (serverDurationMs == null) { + return null; + } + return clientDurationMs - serverDurationMs; + } + + /** + * Returns the operation status: "success" or "error". + * Corresponds to OTel attribute: status + */ + public String getStatus() { + return status; + } + + /** + * Returns the raw gRPC status code (e.g., "OK", "UNAVAILABLE", "DEADLINE_EXCEEDED"). + * Corresponds to OTel attribute: db.response.status_code + */ + public String getGrpcStatusCode() { + return grpcStatusCode; + } + + /** + * Returns the error type category, or null if status is "success". + * Possible values: "validation", "connection", "server", "rate_limit", "timeout", "auth", "not_found" + * Corresponds to OTel attribute: error.type + */ + public String getErrorType() { + return errorType; + } + + /** + * Returns true if the operation was successful. + */ + public boolean isSuccess() { + return "success".equals(status); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ResponseMetadata{"); + sb.append("operation=").append(operationName); + sb.append(", index=").append(indexName); + if (namespace != null && !namespace.isEmpty()) { + sb.append(", namespace=").append(namespace); + } + sb.append(", clientDurationMs=").append(clientDurationMs); + if (serverDurationMs != null) { + sb.append(", serverDurationMs=").append(serverDurationMs); + sb.append(", networkOverheadMs=").append(getNetworkOverheadMs()); + } + sb.append(", status=").append(status); + if (errorType != null) { + sb.append(", errorType=").append(errorType); + } + sb.append("}"); + return sb.toString(); + } + + public static class Builder { + private String operationName; + private String indexName; + private String namespace = ""; + private String serverAddress; + private long clientDurationMs; + private Long serverDurationMs; + private String status = "success"; + private String grpcStatusCode = "OK"; + private String errorType; + + public Builder operationName(String operationName) { + this.operationName = operationName; + return this; + } + + public Builder indexName(String indexName) { + this.indexName = indexName; + return this; + } + + public Builder namespace(String namespace) { + this.namespace = namespace != null ? namespace : ""; + return this; + } + + public Builder serverAddress(String serverAddress) { + this.serverAddress = serverAddress; + return this; + } + + public Builder clientDurationMs(long clientDurationMs) { + this.clientDurationMs = clientDurationMs; + return this; + } + + public Builder serverDurationMs(Long serverDurationMs) { + this.serverDurationMs = serverDurationMs; + return this; + } + + public Builder status(String status) { + this.status = status; + return this; + } + + public Builder grpcStatusCode(String grpcStatusCode) { + this.grpcStatusCode = grpcStatusCode; + return this; + } + + public Builder errorType(String errorType) { + this.errorType = errorType; + return this; + } + + public ResponseMetadata build() { + return new ResponseMetadata(this); + } + } +} + diff --git a/src/main/java/io/pinecone/configs/ResponseMetadataInterceptor.java b/src/main/java/io/pinecone/configs/ResponseMetadataInterceptor.java new file mode 100644 index 00000000..12774d14 --- /dev/null +++ b/src/main/java/io/pinecone/configs/ResponseMetadataInterceptor.java @@ -0,0 +1,207 @@ +package io.pinecone.configs; + +import io.grpc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * gRPC ClientInterceptor that captures response metadata from Pinecone data plane operations. + * Extracts timing information from the x-pinecone-response-duration-ms trailing header. + */ +public class ResponseMetadataInterceptor implements ClientInterceptor { + + private static final Logger logger = LoggerFactory.getLogger(ResponseMetadataInterceptor.class); + + private static final Metadata.Key RESPONSE_DURATION_KEY = + Metadata.Key.of("x-pinecone-response-duration-ms", Metadata.ASCII_STRING_MARSHALLER); + + // Operations to track (matches VectorService RPC method names) + private static final Set TRACKED_OPERATIONS = new HashSet<>(Arrays.asList( + "Upsert", "Query", "Fetch", "Update", "Delete" + )); + + private final ResponseMetadataListener listener; + private final String indexName; + private final String serverAddress; + + /** + * Creates a new interceptor. + * + * @param listener The listener to receive response metadata + * @param indexName The name of the Pinecone index + * @param serverAddress The server address/host + */ + public ResponseMetadataInterceptor(ResponseMetadataListener listener, String indexName, String serverAddress) { + this.listener = listener; + this.indexName = indexName; + this.serverAddress = serverAddress; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + + String methodName = method.getBareMethodName(); + + // Only intercept tracked operations + if (!TRACKED_OPERATIONS.contains(methodName)) { + return next.newCall(method, callOptions); + } + + final long startTimeNanos = System.nanoTime(); + final String operationName = methodName.toLowerCase(); + + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + + private volatile String namespace = ""; + + @Override + public void sendMessage(ReqT message) { + // Extract namespace from request for metrics context + namespace = extractNamespace(message); + super.sendMessage(message); + } + + @Override + public void start(Listener responseListener, Metadata headers) { + super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + + private Metadata initialHeaders; + + @Override + public void onHeaders(Metadata headers) { + // Capture initial headers (some servers send custom headers here) + this.initialHeaders = headers; + if (logger.isDebugEnabled()) { + logger.debug("Initial headers for {}: {}", operationName, headers.keys()); + } + super.onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + try { + long clientDurationMs = (System.nanoTime() - startTimeNanos) / 1_000_000; + + if (logger.isDebugEnabled()) { + logger.debug("Trailing metadata for {}: {}", operationName, trailers.keys()); + } + + // Try to extract server duration from trailing metadata first, then initial headers + Long serverDurationMs = extractServerDuration(trailers); + if (serverDurationMs == null && initialHeaders != null) { + serverDurationMs = extractServerDuration(initialHeaders); + } + + // Determine status and error type + String statusStr = status.isOk() ? "success" : "error"; + String errorType = status.isOk() ? null : mapGrpcStatusToErrorType(status.getCode()); + + // Build and emit metadata + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName(operationName) + .indexName(indexName) + .namespace(namespace) + .serverAddress(serverAddress) + .clientDurationMs(clientDurationMs) + .serverDurationMs(serverDurationMs) + .status(statusStr) + .grpcStatusCode(status.getCode().name()) + .errorType(errorType) + .build(); + + invokeListener(metadata); + + } catch (Exception e) { + logger.warn("Error capturing response metadata", e); + } + + super.onClose(status, trailers); + } + }, headers); + } + }; + } + + private Long extractServerDuration(Metadata trailers) { + String durationHeader = trailers.get(RESPONSE_DURATION_KEY); + if (durationHeader != null) { + try { + return Long.parseLong(durationHeader.trim()); + } catch (NumberFormatException e) { + logger.warn("Invalid x-pinecone-response-duration-ms header value: {}", durationHeader); + } + } + return null; + } + + private String extractNamespace(T message) { + // Use reflection to extract namespace from request objects + // All tracked request types have a getNamespace() method + try { + java.lang.reflect.Method getNamespace = message.getClass().getMethod("getNamespace"); + Object result = getNamespace.invoke(message); + return result != null ? result.toString() : ""; + } catch (NoSuchMethodException e) { + // Some requests may not have namespace + return ""; + } catch (Exception e) { + logger.debug("Could not extract namespace from request", e); + return ""; + } + } + + private String mapGrpcStatusToErrorType(Status.Code code) { + switch (code) { + case INVALID_ARGUMENT: + case FAILED_PRECONDITION: + case OUT_OF_RANGE: + return "validation"; + + case UNAVAILABLE: + case UNKNOWN: + case ABORTED: + return "connection"; + + case INTERNAL: + case DATA_LOSS: + case UNIMPLEMENTED: + return "server"; + + case RESOURCE_EXHAUSTED: + return "rate_limit"; + + case DEADLINE_EXCEEDED: + case CANCELLED: + return "timeout"; + + case UNAUTHENTICATED: + case PERMISSION_DENIED: + return "auth"; + + case NOT_FOUND: + case ALREADY_EXISTS: + return "not_found"; + + default: + return "unknown"; + } + } + + private void invokeListener(ResponseMetadata metadata) { + try { + listener.onResponse(metadata); + } catch (Exception e) { + logger.error("Exception in ResponseMetadataListener.onResponse()", e); + } + } +} + diff --git a/src/main/java/io/pinecone/configs/ResponseMetadataListener.java b/src/main/java/io/pinecone/configs/ResponseMetadataListener.java new file mode 100644 index 00000000..eba171b3 --- /dev/null +++ b/src/main/java/io/pinecone/configs/ResponseMetadataListener.java @@ -0,0 +1,90 @@ +package io.pinecone.configs; + +/** + * Listener interface for receiving response metadata from Pinecone data plane operations. + * + *

Implement this interface to capture timing metrics for observability purposes. + * The listener is invoked after each data plane operation completes (success or failure). + * + *

Supported operations: + *

    + *
  • upsert - Insert or update vectors
  • + *
  • query - Search for similar vectors
  • + *
  • fetch - Retrieve vectors by ID
  • + *
  • update - Update vector metadata
  • + *
  • delete - Delete vectors
  • + *
+ * + *

Example - Simple logging: + *

{@code
+ * Pinecone client = new Pinecone.Builder(apiKey)
+ *     .withResponseMetadataListener(metadata -> {
+ *         logger.info("Pinecone {} completed in {}ms (server: {}ms)",
+ *             metadata.getOperationName(),
+ *             metadata.getClientDurationMs(),
+ *             metadata.getServerDurationMs());
+ *     })
+ *     .build();
+ * }
+ * + *

Example - OpenTelemetry integration: + *

{@code
+ * Meter meter = openTelemetry.getMeter("io.pinecone");
+ * DoubleHistogram clientDuration = meter.histogramBuilder("db.client.operation.duration")
+ *     .setUnit("ms").build();
+ * DoubleHistogram serverDuration = meter.histogramBuilder("pinecone.server.processing.duration")
+ *     .setUnit("ms").build();
+ * LongCounter operationCount = meter.counterBuilder("db.client.operation.count").build();
+ *
+ * Pinecone client = new Pinecone.Builder(apiKey)
+ *     .withResponseMetadataListener(metadata -> {
+ *         Attributes attrs = Attributes.builder()
+ *             .put("db.system", "pinecone")
+ *             .put("db.operation.name", metadata.getOperationName())
+ *             .put("db.namespace", metadata.getNamespace())
+ *             .put("status", metadata.getStatus())
+ *             .build();
+ *
+ *         clientDuration.record(metadata.getClientDurationMs(), attrs);
+ *         if (metadata.getServerDurationMs() != null) {
+ *             serverDuration.record(metadata.getServerDurationMs(), attrs);
+ *         }
+ *         operationCount.add(1, attrs);
+ *     })
+ *     .build();
+ * }
+ * + *

Example - Micrometer/Prometheus: + *

{@code
+ * Pinecone client = new Pinecone.Builder(apiKey)
+ *     .withResponseMetadataListener(metadata -> {
+ *         Timer.builder("pinecone.client.duration")
+ *             .tag("operation", metadata.getOperationName())
+ *             .tag("status", metadata.getStatus())
+ *             .register(meterRegistry)
+ *             .record(metadata.getClientDurationMs(), TimeUnit.MILLISECONDS);
+ *     })
+ *     .build();
+ * }
+ * + * @see ResponseMetadata + */ +@FunctionalInterface +public interface ResponseMetadataListener { + + /** + * Called after each data plane operation completes. + * + *

This method is called synchronously after the gRPC response is received. + * Implementations should be lightweight and non-blocking to avoid impacting + * request latency. For heavy processing, consider queuing the metadata for + * async handling. + * + *

Exceptions thrown by this method are logged but do not affect the + * operation result. + * + * @param metadata The response metadata containing timing and operation details + */ + void onResponse(ResponseMetadata metadata); +} + diff --git a/src/test/java/io/pinecone/configs/ResponseMetadataTest.java b/src/test/java/io/pinecone/configs/ResponseMetadataTest.java new file mode 100644 index 00000000..1695dd4e --- /dev/null +++ b/src/test/java/io/pinecone/configs/ResponseMetadataTest.java @@ -0,0 +1,184 @@ +package io.pinecone.configs; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class ResponseMetadataTest { + + @Test + void testBuilderWithAllFields() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("query") + .indexName("test-index") + .namespace("test-namespace") + .serverAddress("test-index-abc.svc.pinecone.io") + .clientDurationMs(150) + .serverDurationMs(100L) + .status("success") + .grpcStatusCode("OK") + .build(); + + assertEquals("query", metadata.getOperationName()); + assertEquals("test-index", metadata.getIndexName()); + assertEquals("test-namespace", metadata.getNamespace()); + assertEquals("test-index-abc.svc.pinecone.io", metadata.getServerAddress()); + assertEquals(150, metadata.getClientDurationMs()); + assertEquals(100L, metadata.getServerDurationMs()); + assertEquals("success", metadata.getStatus()); + assertEquals("OK", metadata.getGrpcStatusCode()); + assertTrue(metadata.isSuccess()); + assertNull(metadata.getErrorType()); + } + + @Test + void testNetworkOverheadCalculation() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("upsert") + .clientDurationMs(200) + .serverDurationMs(150L) + .build(); + + assertEquals(50L, metadata.getNetworkOverheadMs()); + } + + @Test + void testNetworkOverheadNullWhenServerDurationMissing() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("upsert") + .clientDurationMs(200) + .serverDurationMs(null) + .build(); + + assertNull(metadata.getNetworkOverheadMs()); + } + + @Test + void testErrorMetadata() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("query") + .indexName("test-index") + .clientDurationMs(50) + .status("error") + .grpcStatusCode("UNAVAILABLE") + .errorType("connection") + .build(); + + assertFalse(metadata.isSuccess()); + assertEquals("error", metadata.getStatus()); + assertEquals("UNAVAILABLE", metadata.getGrpcStatusCode()); + assertEquals("connection", metadata.getErrorType()); + } + + @Test + void testDefaultNamespaceIsEmptyString() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("fetch") + .namespace(null) + .build(); + + assertEquals("", metadata.getNamespace()); + } + + @Test + void testDefaultStatusIsSuccess() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("delete") + .build(); + + assertEquals("success", metadata.getStatus()); + assertEquals("OK", metadata.getGrpcStatusCode()); + assertTrue(metadata.isSuccess()); + } + + @Test + void testToStringContainsKeyFields() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("delete") + .indexName("my-index") + .namespace("ns1") + .clientDurationMs(100) + .serverDurationMs(80L) + .status("success") + .build(); + + String str = metadata.toString(); + assertTrue(str.contains("operation=delete")); + assertTrue(str.contains("index=my-index")); + assertTrue(str.contains("namespace=ns1")); + assertTrue(str.contains("clientDurationMs=100")); + assertTrue(str.contains("serverDurationMs=80")); + assertTrue(str.contains("networkOverheadMs=20")); + } + + @Test + void testToStringWithoutServerDuration() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("query") + .indexName("my-index") + .clientDurationMs(100) + .serverDurationMs(null) + .status("success") + .build(); + + String str = metadata.toString(); + assertTrue(str.contains("operation=query")); + assertTrue(str.contains("clientDurationMs=100")); + assertFalse(str.contains("serverDurationMs")); + assertFalse(str.contains("networkOverheadMs")); + } + + @Test + void testToStringWithError() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("upsert") + .indexName("my-index") + .clientDurationMs(50) + .status("error") + .errorType("rate_limit") + .build(); + + String str = metadata.toString(); + assertTrue(str.contains("status=error")); + assertTrue(str.contains("errorType=rate_limit")); + } + + @Test + void testToStringWithoutNamespace() { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("fetch") + .indexName("my-index") + .namespace("") + .clientDurationMs(100) + .build(); + + String str = metadata.toString(); + assertFalse(str.contains("namespace=")); + } + + @Test + void testAllOperationTypes() { + String[] operations = {"upsert", "query", "fetch", "update", "delete"}; + for (String op : operations) { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName(op) + .build(); + assertEquals(op, metadata.getOperationName()); + } + } + + @Test + void testAllErrorTypes() { + String[] errorTypes = {"validation", "connection", "server", "rate_limit", "timeout", "auth", "not_found", "unknown"}; + for (String errorType : errorTypes) { + ResponseMetadata metadata = ResponseMetadata.builder() + .operationName("query") + .status("error") + .errorType(errorType) + .build(); + assertEquals(errorType, metadata.getErrorType()); + assertFalse(metadata.isSuccess()); + } + } +} +