Skip to content

Commit a2b0437

Browse files
committed
add example
1 parent a2c8692 commit a2b0437

File tree

7 files changed

+93
-64
lines changed

7 files changed

+93
-64
lines changed

examples/metrics/metrics_configs.yaml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ metric_prefix: "ucm:"
1919

2020
# Histogram metrics configuration
2121
histogram:
22-
- name: "load_requests_num"
22+
- name: "load_requests_total"
2323
documentation: "Number of requests loaded from ucm"
2424
buckets: [1, 5, 10, 20, 50, 100, 200, 500, 1000]
25-
- name: "load_blocks_num"
25+
- name: "load_blocks_total"
2626
documentation: "Number of blocks loaded from ucm"
2727
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
2828
- name: "load_duration"
@@ -31,10 +31,10 @@ histogram:
3131
- name: "load_speed"
3232
documentation: "Speed of loading from ucm (GB/s)"
3333
buckets: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 50, 60, 70, 80, 90, 100]
34-
- name: "save_requests_num"
34+
- name: "save_requests_total"
3535
documentation: "Number of requests saved to ucm"
3636
buckets: [1, 5, 10, 20, 50, 100, 200, 500, 1000]
37-
- name: "save_blocks_num"
37+
- name: "save_blocks_total"
3838
documentation: "Number of blocks saved to ucm"
3939
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
4040
- name: "save_duration"
@@ -46,4 +46,9 @@ histogram:
4646
- name: "interval_lookup_hit_rates"
4747
documentation: "Hit rates of ucm lookup requests"
4848
buckets: [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
49-
49+
- name: "s2d_bandwidth"
50+
documentation: "Band width of uc store task s2d, copy tensors from storage to device"
51+
buckets: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
52+
- name: "d2s_bandwidth"
53+
documentation: "Band width of uc store task d2s, copy tensors from device to storage"
54+
buckets: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

ucm/integration/vllm/ucm_connector.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from vllm.v1.core.sched.output import SchedulerOutput
1919

2020
from ucm.logger import init_logger
21-
from ucm.shared.metrics import ucmmonitor
22-
from ucm.shared.metrics.observability import UCMStatsLogger
21+
from ucm.observability import PrometheusStatsLogger
22+
from ucm.shared.metrics import ucmmetrics
2323
from ucm.store.factory_v1 import UcmConnectorFactoryV1
2424
from ucm.store.ucmstore_v1 import Task, UcmKVStoreBaseV1
2525
from ucm.utils import Config
@@ -160,12 +160,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
160160

161161
self.metrics_config = self.launch_config.get("metrics_config_path", "")
162162
if self.metrics_config:
163-
self.stats_logger = UCMStatsLogger(
163+
self.stats_logger = PrometheusStatsLogger(
164164
vllm_config.model_config.served_model_name,
165165
self.global_rank,
166166
self.metrics_config,
167167
)
168-
self.monitor = ucmmonitor.StatsMonitor.get_instance()
169168

170169
self.synchronize = (
171170
torch.cuda.synchronize
@@ -295,8 +294,7 @@ def get_num_new_matched_tokens(
295294
f"hit external: {external_hit_blocks}"
296295
)
297296
if self.metrics_config:
298-
self.monitor.update_stats(
299-
"ConnStats",
297+
ucmmetrics.update_stats(
300298
{"interval_lookup_hit_rates": external_hit_blocks / len(ucm_block_ids)},
301299
)
302300

@@ -539,14 +537,13 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
539537
/ 1024
540538
) # GB/s
541539
if self.metrics_config and is_load:
542-
self.monitor.update_stats(
543-
"ConnStats",
540+
ucmmetrics.update_stats(
544541
{
545-
"load_requests_num": num_loaded_request,
546-
"load_blocks_num": num_loaded_block,
542+
"load_requests_total": num_loaded_request,
543+
"load_blocks_total": num_loaded_block,
547544
"load_duration": load_end_time - load_start_time,
548545
"load_speed": load_speed,
549-
},
546+
}
550547
)
551548

552549
def wait_for_layer_load(self, layer_name: str) -> None:
@@ -615,11 +612,10 @@ def wait_for_save(self) -> None:
615612
/ 1024
616613
) # GB/s
617614
if self.metrics_config and is_save:
618-
self.monitor.update_stats(
619-
"ConnStats",
615+
ucmmetrics.update_stats(
620616
{
621-
"save_requests_num": num_saved_request,
622-
"save_blocks_num": num_saved_block,
617+
"save_requests_total": num_saved_request,
618+
"save_blocks_total": num_saved_block,
623619
"save_duration": save_end_time - save_start_time,
624620
"save_speed": save_speed,
625621
},

ucm/observability.py

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,17 @@
2828
import time
2929
from typing import Any, Union
3030

31-
from prometheus_client import Gauge, Counter, Histogram
3231
import yaml
32+
from prometheus_client import Counter, Gauge, Histogram
3333

3434
from ucm.logger import init_logger
3535
from ucm.shared.metrics import ucmmetrics
3636

3737
logger = init_logger(__name__)
3838

39+
3940
class PrometheusStatsLogger:
40-
41+
4142
def _load_config(self, config_path: str) -> dict[str, Any]:
4243
"""Load configuration from YAML file"""
4344
try:
@@ -57,9 +58,14 @@ def _load_config(self, config_path: str) -> dict[str, Any]:
5758
return {}
5859

5960
def __init__(self, model_name, worker_id, config_path):
60-
# Ensure PROMETHEUS_MULTIPROC_DIR is set before any metric registration
61+
"""
62+
Load metrics config from YAML file (config_path),
63+
register metrics using prometheus_client, and start a thread to get updated metrics.
64+
"""
65+
# Load metrics config
6166
self.config = self._load_config(config_path)
6267
self.log_interval = self.config.get("log_interval", 10)
68+
6369
multiproc_dir = self.config.get("multiproc_dir", "/vllm-workspace")
6470
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
6571
os.environ["PROMETHEUS_MULTIPROC_DIR"] = multiproc_dir
@@ -71,60 +77,56 @@ def __init__(self, model_name, worker_id, config_path):
7177
"worker_id": worker_id,
7278
}
7379
self.labelnames = list(self.labels.keys())
74-
75-
# Initialize metrics based on configuration
80+
7681
self.metric_type_config = {
77-
"counter": (
78-
Counter,
79-
{}
80-
),
81-
"gauge": (
82-
Gauge,
83-
{"multiprocess_mode": "all"}
84-
),
85-
"histogram": (
86-
Histogram,
87-
{"buckets": []}
88-
)
82+
"counter": (Counter, {}),
83+
"gauge": (Gauge, {"multiprocess_mode": "all"}),
84+
"histogram": (Histogram, {"buckets": []}),
8985
}
86+
# Initialize metrics based on config
9087
self._init_metrics_from_config()
88+
89+
# Start thread to update metrics
9190
self.is_running = True
92-
self.thread = threading.Thread(target=self.obtain_stats_thread, daemon=True)
91+
self.thread = threading.Thread(target=self.update_stats_loop, daemon=True)
9392
self.thread.start()
94-
95-
def _process_metric_group(self, group_name):
96-
metric_cls, default_kwargs = self.metric_type_config[group_name]
97-
cfg_list = self.config.get(group_name, [])
98-
93+
94+
def _register_metrics_by_type(self, metric_type):
95+
"""
96+
Register metrics by different metric types.
97+
"""
98+
metric_cls, default_kwargs = self.metric_type_config[metric_type]
99+
cfg_list = self.config.get(metric_type, [])
100+
99101
for cfg in cfg_list:
100102
name = cfg.get("name")
101103
doc = cfg.get("documentation", "")
102104
# Prometheus metric name with prefix
103105
prometheus_name = f"{self.metric_prefix}{name}"
104-
ucmmetrics.create_stats(name, group_name)
105-
106+
ucmmetrics.create_stats(name, metric_type)
107+
106108
metric_kwargs = {
107109
"name": prometheus_name,
108110
"documentation": doc,
109111
"labelnames": self.labelnames,
110112
**default_kwargs,
111-
**{k: v for k, v in cfg.items() if k in default_kwargs}
113+
**{k: v for k, v in cfg.items() if k in default_kwargs},
112114
}
113-
115+
114116
self.metric_mappings[name] = metric_cls(**metric_kwargs)
115117

116118
def _init_metrics_from_config(self):
117-
"""Initialize metrics based on configuration"""
119+
"""Initialize metrics based on config"""
118120
# Get metric name prefix from config (e.g., "ucm:")
119121
self.metric_prefix = self.config.get("metric_prefix", "ucm:")
120122

121123
# Store metric mapping: metric_name -> Union[Counter, Gauge, Histogram]
122124
# This mapping will be used in update_stats to dynamically log metrics
123125
self.metric_mappings: dict[str, Union[Counter, Gauge, Histogram]] = {}
124126

125-
for group_name in self.metric_type_config.keys():
126-
self._process_metric_group(group_name)
127-
127+
for metric_type in self.metric_type_config.keys():
128+
self._register_metrics_by_type(metric_type)
129+
128130
def _update_counter(self, metric, value):
129131
if value < 0:
130132
return
@@ -136,13 +138,17 @@ def _update_gauge(self, metric, value):
136138
def _update_histogram(self, metric, value):
137139
for data in value:
138140
metric.observe(data)
139-
141+
140142
def _update_with_func(self, update_func, stats: dict[str, Any], op_desc: str):
143+
"""
144+
Generic update for Prometheus metrics: match metrics by name, bind labels,
145+
and update values via the specified function (update_func).
146+
"""
141147
for stat_name, value in stats.items():
142148
if stat_name not in self.metric_mappings:
143149
logger.error(f"Metric {stat_name} not found")
144150
continue
145-
151+
146152
metric = self.metric_mappings[stat_name]
147153
try:
148154
metric_with_labels = metric.labels(**self.labels)
@@ -153,8 +159,9 @@ def _update_with_func(self, update_func, stats: dict[str, Any], op_desc: str):
153159
logger.debug(f"Failed to {op_desc} {stat_name}: {e}")
154160

155161
def update_stats(self, counter_stats, gauge_stats, histogram_stats):
156-
"""Log metrics to Prometheus based on configuration file"""
157-
# Dynamically log metrics based on what's configured in YAML
162+
"""
163+
Update all Prometheus metrics (Counter/Gauge/Histogram) with given stats.
164+
"""
158165
update_tasks = [
159166
(self._update_counter, counter_stats, "increment"),
160167
(self._update_gauge, gauge_stats, "set"),
@@ -163,9 +170,14 @@ def update_stats(self, counter_stats, gauge_stats, histogram_stats):
163170
for update_func, stats, op_desc in update_tasks:
164171
self._update_with_func(update_func, stats, op_desc)
165172

166-
def obtain_stats_thread(self):
173+
def update_stats_loop(self):
174+
"""
175+
Periodically update Prometheus metrics in a loop until stopped.
176+
"""
167177
while self.is_running:
168-
counter_stats, gauge_stats, histogram_stats = ucmmetrics.get_all_stats_and_clear()
178+
counter_stats, gauge_stats, histogram_stats = (
179+
ucmmetrics.get_all_stats_and_clear()
180+
)
169181
self.update_stats(counter_stats, gauge_stats, histogram_stats)
170182
time.sleep(self.log_interval)
171183

ucm/shared/metrics/cc/domain/metrics.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2222
* SOFTWARE.
2323
* */
24-
#ifndef UNIFIEDCACHE_MONITOR_H
25-
#define UNIFIEDCACHE_MONITOR_H
24+
#ifndef UNIFIEDCACHE_METRICS_H
25+
#define UNIFIEDCACHE_METRICS_H
2626

2727
#include <memory>
2828
#include <mutex>

ucm/shared/test/example/metrics/monitor_stats_example.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ def wrapper(*args, **kwargs):
4242
def metrics_with_update_stats():
4343
ucmmetrics.create_stats("counter_1", "counter")
4444
ucmmetrics.update_stats("counter_1", 1.2)
45-
45+
4646
ucmmetrics.create_stats("gauge_1", "gauge")
4747
ucmmetrics.update_stats("gauge_1", 2.2)
48-
48+
4949
ucmmetrics.create_stats("histogram_1", "histogram")
5050
ucmmetrics.update_stats("histogram_1", 1)
5151

5252
counters, gauges, histograms = ucmmetrics.get_all_stats_and_clear()
53-
print(f"After clear then get counters: {counters}, gauges: {gauges}, histograms: {histograms}")
53+
print(
54+
f"After clear then get counters: {counters}, gauges: {gauges}, histograms: {histograms}"
55+
)
5456
ucmmetrics.update_stats(
5557
{
5658
"counter_1": 5,
@@ -59,7 +61,9 @@ def metrics_with_update_stats():
5961
}
6062
)
6163
counters, gauges, histograms = ucmmetrics.get_all_stats_and_clear()
62-
print(f"After clear then get counters: {counters}, gauges: {gauges}, histograms: {histograms}")
64+
print(
65+
f"After clear then get counters: {counters}, gauges: {gauges}, histograms: {histograms}"
66+
)
6367

6468

6569
if __name__ == "__main__":

ucm/store/detail/task/task_shard.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <vector>
3636
#include "logger/logger.h"
3737
#include "task_waiter.h"
38+
#include "metrics_api.h"
3839

3940
namespace UC {
4041

@@ -129,6 +130,17 @@ class Task {
129130
auto wait = execTp_ - startTp_;
130131
auto exec = NowTp() - execTp_;
131132
auto bw = size_ / exec / 1024 / 1024 / 1024;
133+
switch (type_)
134+
{
135+
case Type::DUMP:
136+
UC::Metrics::UpdateStats("d2s_bandwidth", bw);
137+
break;
138+
case Type::LOAD:
139+
UC::Metrics::UpdateStats("s2d_bandwidth", bw);
140+
break;
141+
default:
142+
break;
143+
}
132144
return fmt::format("wait={:.06f}s, exec={:.06f}s, bw={:.06f}GB/s", wait, exec, bw);
133145
}
134146

ucm/store/nfsstore/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ target_include_directories(nfsstore PUBLIC
66
${CMAKE_CURRENT_SOURCE_DIR}/cc/api
77
${CMAKE_CURRENT_SOURCE_DIR}/cc/domain
88
)
9-
target_link_libraries(nfsstore PUBLIC storeintf storedevice infra_logger)
9+
target_link_libraries(nfsstore PUBLIC storeintf storedevice infra_logger metrics)
1010

1111
file(GLOB_RECURSE UCMSTORE_NFS_CPY_SOURCE_FILES "./cpy/*.cc")
1212
pybind11_add_module(ucmnfsstore ${UCMSTORE_NFS_CPY_SOURCE_FILES})

0 commit comments

Comments
 (0)