Skip to content

fix: dedup log collector#1522

Closed
Nexisato wants to merge 2 commits intolegacyfrom
fix/upload-dict-fix
Closed

fix: dedup log collector#1522
Nexisato wants to merge 2 commits intolegacyfrom
fix/upload-dict-fix

Conversation

@Nexisato
Copy link
Copy Markdown
Contributor

Description

  • Deduplicate metrics with identical indexes to support overwriting, and preserve the natural incremental growth of epochs in local logs

Closes: #(issue)

🎯 PRs Should Target Issues

None

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a deduplication strategy for metrics by introducing a write_epoch to track the latest writes for specific key-step pairs. The dedupe_metrics_by_key_step function is utilized in the upload pipeline to filter redundant data. Feedback suggests using generator expressions for memory efficiency and merging a duplicated test class.

Comment on lines +544 to 549
scalars = dedupe_metrics_by_key_step(
list(filter(self._filter_scalar_by_step, [scalar.to_scalar_model() for scalar in self._scalars]))
)
medias = dedupe_metrics_by_key_step(
list(filter(self._filter_media_by_step, [media.to_media_model(self._run_store.media_dir) for media in self._medias]))
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using list comprehensions [...] here creates full lists of ScalarModel and MediaModel objects in memory before filtering. For a large number of metrics, this can be memory-intensive. Using generator expressions (...) would be more memory-efficient as it would create the models one by one as filter consumes them.

Suggested change
scalars = dedupe_metrics_by_key_step(
list(filter(self._filter_scalar_by_step, [scalar.to_scalar_model() for scalar in self._scalars]))
)
medias = dedupe_metrics_by_key_step(
list(filter(self._filter_media_by_step, [media.to_media_model(self._run_store.media_dir) for media in self._medias]))
)
scalars = dedupe_metrics_by_key_step(
list(filter(self._filter_scalar_by_step, (scalar.to_scalar_model() for scalar in self._scalars)))
)
medias = dedupe_metrics_by_key_step(
list(filter(self._filter_media_by_step, (media.to_media_model(self._run_store.media_dir) for media in self._medias)))
)

Comment on lines +355 to +366
def test_overwrite_uses_latest_write_epoch_but_same_storage_file(self):
with UseMockRunState() as run_state:
key_obj = self._new_key(run_state)

first = self._add_line(key_obj, 1, 0)
overwrite = self._add_line(key_obj, 2, 0)

assert overwrite.metric_overwrite is True
assert key_obj._step_epochs[0] == 1
assert key_obj._write_epoch == overwrite.metric_epoch
assert overwrite.metric_epoch == first.metric_epoch + 1
assert overwrite.metric_file_path == first.metric_file_path
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test method is added to a duplicated TestKeySummary class. The class is defined first at line 204, and again at line 315. Please merge the methods from the duplicated class into the first one and remove the second class definition to improve code maintainability.

@Nexisato Nexisato closed this Apr 13, 2026
@Nexisato
Copy link
Copy Markdown
Contributor Author

本地去重策略有粗糙,等新版 sdk 重构

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant