Skip to content

Commit bec9183

Browse files
authored
chore: Refactor cache usage and code flow (#60)
* wip: refactoring * refer to class in readme * improved listener resolution * added logging * more logging changes * code cleanup * query listener version from crd * delay resolution of listener version * catch exception on listener fetch * use full resource name * more logging * bumped kubernetes client version, removed unused dependency * restored okio lib * revert version * add resolution against node IPs * linting * re-work flow to allow short-circuiting * minor correction * add pod informer * cleaned up docs, lowered logging level for informer * changelog * add null checks and some code consolidation
1 parent 921c42d commit bec9183

File tree

12 files changed

+876
-503
lines changed

12 files changed

+876
-503
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7+
### Changed
8+
9+
- Refactored topology-provider to improve cache usage and readability ([#60]).
10+
11+
[#60]: https://github.com/stackabletech/hdfs-utils/pull/60
12+
713
## [0.4.2] - 2025-09-30
814

915
### Added

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ In Kubernetes, the most commonly used mechanism for topology awareness are label
701701
The most prevalent example for this is the node label [topology.kubernetes.io/zone](https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone) which often refers to availability zones in cloud providers or similar things.
702702

703703
The purpose of this tool is to feed information from Kubernetes into the HDFS rack awareness functionality.
704-
In order to do this, it implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`.
704+
In order to do this, `tech.stackable.hadoop.StackableTopologyProvider` implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`.
705705

706706
The topology provider watches all HDFS pods deployed by Stackable and Kubernetes nodes and keeps an in memory cache of the current state of these objects.
707707
From this state store the tool can then calculate rack IDs for nodes that HDFS asks for without needing to talk to the api-server and incurring an extra network round-trip.

src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java

Lines changed: 526 additions & 472 deletions
Large diffs are not rendered by default.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package tech.stackable.hadoop;
2+
3+
import com.github.benmanes.caffeine.cache.Cache;
4+
import com.github.benmanes.caffeine.cache.Caffeine;
5+
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
6+
import io.fabric8.kubernetes.api.model.Node;
7+
import io.fabric8.kubernetes.api.model.Pod;
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/** Manages all caching layers for the topology provider. */
12+
public class TopologyCache {
13+
private final Cache<String, String> topology;
14+
private final Cache<String, Node> nodes;
15+
private final Cache<String, GenericKubernetesResource> listeners;
16+
private final Cache<String, Pod> pods;
17+
18+
TopologyCache(int expirationSeconds, int defaultExpirationSeconds) {
19+
this.topology =
20+
Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build();
21+
22+
this.nodes =
23+
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();
24+
25+
this.listeners =
26+
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();
27+
28+
this.pods =
29+
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();
30+
}
31+
32+
String getTopology(String key) {
33+
return topology.getIfPresent(key);
34+
}
35+
36+
void putTopology(String key, String value) {
37+
topology.put(key, value);
38+
}
39+
40+
void invalidateAllTopologyKeys() {
41+
topology.invalidateAll();
42+
}
43+
44+
void invalidateTopologyKeys(List<String> keys) {
45+
keys.forEach(topology::invalidate);
46+
}
47+
48+
Node getNode(String name) {
49+
return nodes.getIfPresent(name);
50+
}
51+
52+
void putNode(String name, Node node) {
53+
nodes.put(name, node);
54+
}
55+
56+
GenericKubernetesResource getListener(String name) {
57+
return listeners.getIfPresent(name);
58+
}
59+
60+
void putListener(String name, GenericKubernetesResource listener) {
61+
listeners.put(name, listener);
62+
}
63+
64+
Pod getPod(String name) {
65+
return pods.getIfPresent(name);
66+
}
67+
68+
void putPod(String name, Pod pod) {
69+
pods.put(name, pod);
70+
}
71+
72+
void deletePod(String name) {
73+
pods.invalidate(name);
74+
}
75+
76+
boolean hasAllPods(List<String> names) {
77+
return names.stream().noneMatch(name -> pods.getIfPresent(name) == null);
78+
}
79+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package tech.stackable.hadoop;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
import java.util.Locale;
6+
import java.util.stream.Collectors;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class TopologyLabel {
11+
private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class);
12+
public static final String VARNAME_LABELS = "TOPOLOGY_LABELS";
13+
public static final String VARNAME_MAX_LEVELS = "TOPOLOGY_MAX_LEVELS";
14+
private static final int MAX_LEVELS_DEFAULT = 2;
15+
16+
public enum Type {
17+
NODE,
18+
POD,
19+
UNDEFINED
20+
}
21+
22+
private final Type type;
23+
private final String name;
24+
25+
TopologyLabel(String config) {
26+
if (config == null || config.isEmpty()) {
27+
this.type = Type.UNDEFINED;
28+
this.name = null;
29+
return;
30+
}
31+
32+
String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2);
33+
34+
if (parts.length != 2) {
35+
LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:<label>'", config);
36+
this.type = Type.UNDEFINED;
37+
this.name = null;
38+
return;
39+
}
40+
41+
this.name = parts[1];
42+
43+
switch (parts[0]) {
44+
case "node":
45+
this.type = Type.NODE;
46+
break;
47+
case "pod":
48+
this.type = Type.POD;
49+
break;
50+
default:
51+
LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]);
52+
this.type = Type.UNDEFINED;
53+
}
54+
}
55+
56+
boolean isNodeLabel() {
57+
return type == Type.NODE;
58+
}
59+
60+
boolean isUndefined() {
61+
return type == Type.UNDEFINED;
62+
}
63+
64+
String getName() {
65+
return name;
66+
}
67+
68+
Type getType() {
69+
return type;
70+
}
71+
72+
public static List<TopologyLabel> initializeTopologyLabels() {
73+
// Read the labels to be used to build a topology from environment variables. Labels are
74+
// configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form
75+
// "[node|pod]:<labelname>" and separated by ";". So a valid configuration that reads topology
76+
// information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node
77+
// that is running a datanode pod would look like this:
78+
// "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on
79+
// the number of labels that are processed, because this is what Hadoop traditionally allows -
80+
// this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS".
81+
String topologyConfig = System.getenv(VARNAME_LABELS);
82+
83+
if (topologyConfig == null || topologyConfig.isEmpty()) {
84+
LOG.error(
85+
"Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS);
86+
throw new RuntimeException("TOPOLOGY_LABELS environment variable not set");
87+
}
88+
89+
String[] labelConfigs = topologyConfig.split(";");
90+
91+
if (labelConfigs.length > getMaxLabels()) {
92+
LOG.error(
93+
"Found [{}] topology labels configured, but maximum allowed number is [{}]: "
94+
+ "please check your config or raise the number of allowed labels.",
95+
labelConfigs.length,
96+
getMaxLabels());
97+
throw new RuntimeException("Too many topology labels configured");
98+
}
99+
// Create TopologyLabels from config strings
100+
List<TopologyLabel> labels =
101+
Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList());
102+
103+
if (labels.stream().anyMatch(TopologyLabel::isUndefined)) {
104+
LOG.error(
105+
"Invalid topology label configuration - labels must be in format '[pod|node]:<label>'");
106+
throw new RuntimeException("Invalid topology label configuration");
107+
}
108+
109+
return labels;
110+
}
111+
112+
private static int getMaxLabels() {
113+
return TopologyUtils.parseIntFromEnv(
114+
VARNAME_MAX_LEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels");
115+
}
116+
}
Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,56 @@
11
package tech.stackable.hadoop;
22

33
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
4+
import java.util.Collections;
45
import java.util.List;
56
import java.util.Map;
67
import java.util.stream.Collectors;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
710

811
public class TopologyUtils {
12+
private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class);
13+
914
private static final String ADDRESS = "address";
1015
private static final String STATUS = "status";
1116
private static final String INGRESS_ADDRESSES = "ingressAddresses";
1217

1318
public static List<String> getIngressAddresses(GenericKubernetesResource listener) {
1419
// suppress warning as we know the structure of our own listener resource
20+
Object statusObj = listener.getAdditionalProperties().get(STATUS);
21+
if (statusObj == null) {
22+
LOG.warn("Listener {} has no status", listener.getMetadata().getName());
23+
return Collections.emptyList();
24+
}
25+
@SuppressWarnings("unchecked")
26+
Map<String, Object> status = (Map<String, Object>) statusObj;
27+
Object addressesObj = status.get(INGRESS_ADDRESSES);
28+
if (addressesObj == null) {
29+
LOG.warn("Listener {} has no ingress addresses", listener.getMetadata().getName());
30+
return Collections.emptyList();
31+
}
1532
@SuppressWarnings("unchecked")
16-
List<Map<String, Object>> ingressAddresses =
17-
((List<Map<String, Object>>)
18-
((Map<String, Object>) listener.getAdditionalProperties().get(STATUS))
19-
.get(INGRESS_ADDRESSES));
33+
List<Map<String, Object>> ingressAddresses = (List<Map<String, Object>>) addressesObj;
2034
return ingressAddresses.stream()
2135
.map(ingress -> (String) ingress.get(ADDRESS))
2236
.collect(Collectors.toList());
2337
}
38+
39+
public static int parseIntFromEnv(String varName, int defaultValue, String description) {
40+
String value = System.getenv(varName);
41+
if (value == null || value.isEmpty()) {
42+
LOG.info("Set {} to default value {}", description, defaultValue);
43+
return defaultValue;
44+
}
45+
46+
try {
47+
int parsed = Integer.parseInt(value);
48+
LOG.info("Set {} to {} from environment variable {}", description, parsed, varName);
49+
return parsed;
50+
} catch (NumberFormatException e) {
51+
LOG.warn(
52+
"Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue);
53+
return defaultValue;
54+
}
55+
}
2456
}

test/topology-provider/stack/01-install-krb5-kdc.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ spec:
3131
spec:
3232
initContainers:
3333
- name: init
34-
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
34+
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
3535
args:
3636
- sh
3737
- -euo
@@ -52,7 +52,7 @@ spec:
5252
name: data
5353
containers:
5454
- name: kdc
55-
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
55+
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
5656
args:
5757
- krb5kdc
5858
- -n
@@ -65,7 +65,7 @@ spec:
6565
- mountPath: /var/kerberos/krb5kdc
6666
name: data
6767
- name: kadmind
68-
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
68+
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
6969
args:
7070
- kadmind
7171
- -nofork
@@ -78,7 +78,7 @@ spec:
7878
- mountPath: /var/kerberos/krb5kdc
7979
name: data
8080
- name: client
81-
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
81+
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
8282
tty: true
8383
stdin: true
8484
env:

test/topology-provider/stack/03-hdfs.yaml

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,57 @@ metadata:
55
name: simple-zk
66
spec:
77
image:
8-
productVersion: 3.8.3
8+
productVersion: 3.9.4
9+
pullPolicy: IfNotPresent
910
servers:
1011
roleGroups:
1112
default:
1213
replicas: 1
1314
---
14-
apiVersion: zookeeper.stackable.tech/v1alpha1
15-
kind: ZookeeperZnode
16-
metadata:
17-
name: simple-hdfs-znode
18-
spec:
19-
clusterRef:
20-
name: simple-zk
21-
---
2215
apiVersion: hdfs.stackable.tech/v1alpha1
2316
kind: HdfsCluster
2417
metadata:
2518
name: simple-hdfs
2619
spec:
2720
image:
28-
productVersion: 3.4.0
29-
custom: hdfs # updated by tilt
21+
productVersion: 3.4.2
22+
custom: oci.stackable.tech/sandbox/hadoop:3.4.2-stackable0.0.0-dev
3023
pullPolicy: IfNotPresent
3124
clusterConfig:
3225
dfsReplication: 1
33-
zookeeperConfigMapName: simple-hdfs-znode
26+
zookeeperConfigMapName: simple-zk
3427
rackAwareness:
35-
- labelType: node
36-
labelName: kubernetes.io/hostname
37-
- labelType: pod
38-
labelName: app.kubernetes.io/role-group
28+
- nodeLabel: kubernetes.io/hostname
29+
- podLabel: app.kubernetes.io/role-group
3930
authentication:
4031
tlsSecretClass: tls
4132
kerberos:
4233
secretClass: kerberos-default
4334
nameNodes:
4435
config:
45-
listenerClass: external-stable # We want to access the Web UI
36+
listenerClass: external-stable
37+
logging:
38+
enableVectorAgent: false
39+
containers:
40+
hdfs:
41+
console:
42+
level: DEBUG
43+
file:
44+
level: DEBUG
45+
loggers:
46+
ROOT:
47+
level: DEBUG
4648
configOverrides: &configOverrides
4749
core-site.xml:
4850
hadoop.user.group.static.mapping.overrides: "dr.who=;nn=;nm=;jn=;testuser=supergroup;"
51+
envOverrides:
52+
HADOOP_NAMENODE_OPTS: -agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n
4953
roleGroups:
5054
default:
5155
replicas: 2
5256
dataNodes:
57+
config:
58+
listenerClass: external-stable
5359
configOverrides: *configOverrides
5460
roleGroups:
5561
default:

0 commit comments

Comments
 (0)