Skip to content

Commit e98c22d

Browse files
author
psingh3
committed
added initial implementation
1 parent 717a437 commit e98c22d

8 files changed

Lines changed: 801 additions & 0 deletions

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.client.transport.sniffer;
10+
11+
import org.apache.hc.core5.http.HttpHost;
12+
import java.util.Objects;
13+
14+
/**
15+
* Represents a node in the OpenSearch cluster discovered by sniffing.
16+
*/
17+
public class Node {
18+
19+
private final HttpHost host;
20+
private final String roles;
21+
private final String version;
22+
private final String name;
23+
24+
public Node(HttpHost host) {
25+
this(host, null, null, null);
26+
}
27+
28+
public Node(HttpHost host, String roles, String version, String name) {
29+
this.host = Objects.requireNonNull(host, "host cannot be null");
30+
this.roles = roles;
31+
this.version = version;
32+
this.name = name;
33+
}
34+
35+
/**
36+
* Returns the host information for this node.
37+
*/
38+
public HttpHost getHost() {
39+
return host;
40+
}
41+
42+
/**
43+
* Returns the roles of this node (e.g., "master", "data", "ingest").
44+
*/
45+
public String getRoles() {
46+
return roles;
47+
}
48+
49+
/**
50+
* Returns the OpenSearch version of this node.
51+
*/
52+
public String getVersion() {
53+
return version;
54+
}
55+
56+
/**
57+
* Returns the name of this node.
58+
*/
59+
public String getName() {
60+
return name;
61+
}
62+
63+
@Override
64+
public boolean equals(Object obj) {
65+
if (this == obj) return true;
66+
if (obj == null || getClass() != obj.getClass()) return false;
67+
Node node = (Node) obj;
68+
return Objects.equals(host, node.host);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hash(host);
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "Node{" +
79+
"host=" + host +
80+
", roles='" + roles + '\'' +
81+
", version='" + version + '\'' +
82+
", name='" + name + '\'' +
83+
'}';
84+
}
85+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.client.transport.sniffer;
10+
11+
import java.util.List;
12+
13+
/**
14+
* Callback interface used to notify when the list of nodes is updated after sniffing.
15+
*/
16+
@FunctionalInterface
17+
public interface NodeListCallback {
18+
19+
/**
20+
* Called when the node list is updated after sniffing.
21+
*
22+
* @param nodes the updated list of nodes
23+
*/
24+
void onNodeListUpdate(List<Node> nodes);
25+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.client.transport.sniffer;
10+
11+
import java.io.IOException;
12+
import java.util.List;
13+
14+
/**
15+
* Interface for sniffing OpenSearch cluster nodes.
16+
* Implementations are responsible for discovering available nodes in the cluster.
17+
*/
18+
public interface NodesSniffer {
19+
20+
/**
21+
* Sniffs the cluster nodes and returns the list of discovered nodes.
22+
*
23+
* @return list of discovered nodes
24+
* @throws IOException if sniffing fails
25+
*/
26+
List<Node> sniff() throws IOException;
27+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.client.transport.sniffer;
10+
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import org.apache.hc.client5.http.classic.methods.HttpGet;
14+
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
15+
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
16+
import org.apache.hc.core5.http.HttpHost;
17+
import org.apache.hc.core5.http.io.entity.EntityUtils;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* Implementation of {@link NodesSniffer} that discovers nodes by calling the OpenSearch cluster's nodes API.
27+
*/
28+
public class OpenSearchNodesSniffer implements NodesSniffer {
29+
30+
public static final String NODES_INFO_PATH = "_nodes/http";
31+
public static final String DEFAULT_SCHEME = "http";
32+
33+
private final CloseableHttpClient httpClient;
34+
private final List<HttpHost> hosts;
35+
private final String scheme;
36+
private final ObjectMapper objectMapper;
37+
38+
public OpenSearchNodesSniffer(CloseableHttpClient httpClient, List<HttpHost> hosts, String scheme) {
39+
this.httpClient = httpClient;
40+
this.hosts = hosts;
41+
this.scheme = scheme != null ? scheme : DEFAULT_SCHEME;
42+
this.objectMapper = new ObjectMapper();
43+
}
44+
45+
@Override
46+
public List<Node> sniff() throws IOException {
47+
List<Node> sniffedNodes = new ArrayList<>();
48+
49+
for (HttpHost host : hosts) {
50+
try {
51+
List<Node> nodesFromHost = sniffFromHost(host);
52+
if (!nodesFromHost.isEmpty()) {
53+
return nodesFromHost; // Return nodes from first successful host
54+
}
55+
} catch (IOException e) {
56+
// Try next host if current one fails
57+
continue;
58+
}
59+
}
60+
61+
throw new IOException("Unable to sniff nodes from any of the provided hosts");
62+
}
63+
64+
private List<Node> sniffFromHost(HttpHost host) throws IOException {
65+
String nodesInfoUrl = host.toURI() + "/" + NODES_INFO_PATH;
66+
HttpGet request = new HttpGet(nodesInfoUrl);
67+
68+
try (CloseableHttpResponse response = httpClient.execute(request)) {
69+
if (response.getCode() != 200) {
70+
throw new IOException("Nodes info request failed with status: " + response.getCode());
71+
}
72+
73+
try {
74+
String responseBody = EntityUtils.toString(response.getEntity());
75+
return parseNodesResponse(responseBody);
76+
} catch (Exception e) {
77+
throw new IOException("Failed to parse response body", e);
78+
}
79+
}
80+
}
81+
82+
private List<Node> parseNodesResponse(String responseBody) throws IOException {
83+
List<Node> nodes = new ArrayList<>();
84+
JsonNode root = objectMapper.readTree(responseBody);
85+
JsonNode nodesNode = root.get("nodes");
86+
87+
if (nodesNode != null && nodesNode.isObject()) {
88+
Iterator<Map.Entry<String, JsonNode>> nodeIterator = nodesNode.fields();
89+
90+
while (nodeIterator.hasNext()) {
91+
Map.Entry<String, JsonNode> nodeEntry = nodeIterator.next();
92+
JsonNode nodeInfo = nodeEntry.getValue();
93+
94+
Node node = parseNodeInfo(nodeInfo);
95+
if (node != null) {
96+
nodes.add(node);
97+
}
98+
}
99+
}
100+
101+
return nodes;
102+
}
103+
104+
private Node parseNodeInfo(JsonNode nodeInfo) {
105+
try {
106+
JsonNode httpNode = nodeInfo.get("http");
107+
if (httpNode == null) {
108+
return null;
109+
}
110+
111+
JsonNode publishAddressNode = httpNode.get("publish_address");
112+
if (publishAddressNode == null) {
113+
return null;
114+
}
115+
116+
String publishAddress = publishAddressNode.asText();
117+
HttpHost host = parseHttpHost(publishAddress);
118+
119+
String roles = extractRoles(nodeInfo);
120+
String version = extractVersion(nodeInfo);
121+
String name = extractName(nodeInfo);
122+
123+
return new Node(host, roles, version, name);
124+
} catch (Exception e) {
125+
// Skip malformed node info
126+
return null;
127+
}
128+
}
129+
130+
private HttpHost parseHttpHost(String publishAddress) {
131+
// Parse format like "127.0.0.1:9200" or "[::1]:9200"
132+
String host;
133+
int port;
134+
135+
if (publishAddress.startsWith("[")) {
136+
// IPv6 format
137+
int closeBracket = publishAddress.indexOf(']');
138+
host = publishAddress.substring(1, closeBracket);
139+
port = Integer.parseInt(publishAddress.substring(closeBracket + 2));
140+
} else {
141+
// IPv4 format
142+
String[] parts = publishAddress.split(":");
143+
host = parts[0];
144+
port = Integer.parseInt(parts[1]);
145+
}
146+
147+
return new HttpHost(scheme, host, port);
148+
}
149+
150+
private String extractRoles(JsonNode nodeInfo) {
151+
JsonNode rolesNode = nodeInfo.get("roles");
152+
if (rolesNode != null && rolesNode.isArray()) {
153+
List<String> roles = new ArrayList<>();
154+
for (JsonNode role : rolesNode) {
155+
roles.add(role.asText());
156+
}
157+
return String.join(",", roles);
158+
}
159+
return null;
160+
}
161+
162+
private String extractVersion(JsonNode nodeInfo) {
163+
JsonNode versionNode = nodeInfo.get("version");
164+
return versionNode != null ? versionNode.asText() : null;
165+
}
166+
167+
private String extractName(JsonNode nodeInfo) {
168+
JsonNode nameNode = nodeInfo.get("name");
169+
return nameNode != null ? nameNode.asText() : null;
170+
}
171+
}

0 commit comments

Comments
 (0)