Skip to content

Commit ddd4bf8

Browse files
committed
fix
1 parent 389e78a commit ddd4bf8

File tree

4 files changed

+30
-5
lines changed

4 files changed

+30
-5
lines changed

cozeloop/_client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from cozeloop.internal.httpclient import Auth
2020
from cozeloop.internal.prompt import PromptProvider
2121
from cozeloop.internal.trace import TraceProvider
22-
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf
22+
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf
2323
from cozeloop.internal.trace.trace import default_finish_event_processor
2424
from cozeloop.span import SpanContext, Span
2525

@@ -71,6 +71,7 @@ def new_client(
7171
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
7272
tag_truncate_conf: Optional[TagTruncateConf] = None,
7373
api_base_path: Optional[APIBasePath] = None,
74+
queue_conf: Optional[QueueConf] = None,
7475
) -> Client:
7576
cache_key = _generate_cache_key( # all args are used to generate cache key
7677
api_base_url,
@@ -89,6 +90,7 @@ def new_client(
8990
trace_finish_event_processor,
9091
tag_truncate_conf,
9192
api_base_path,
93+
queue_conf,
9294
)
9395

9496
with _cache_lock:
@@ -113,6 +115,7 @@ def new_client(
113115
trace_finish_event_processor=trace_finish_event_processor,
114116
tag_truncate_conf=tag_truncate_conf,
115117
api_base_path=api_base_path,
118+
queue_conf=queue_conf,
116119
)
117120
_client_cache[cache_key] = client
118121
return client
@@ -143,6 +146,7 @@ def __init__(
143146
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
144147
tag_truncate_conf: Optional[TagTruncateConf] = None,
145148
api_base_path: Optional[APIBasePath] = None,
149+
queue_conf: Optional[QueueConf] = None,
146150
):
147151
workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID)
148152
api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL)
@@ -201,6 +205,7 @@ def combined_processor(event_info: FinishEventInfo):
201205
tag_truncate_conf=tag_truncate_conf,
202206
span_upload_path=span_upload_path,
203207
file_upload_path=file_upload_path,
208+
queue_conf=queue_conf,
204209
)
205210
self._prompt_provider = PromptProvider(
206211
workspace_id=workspace_id,

cozeloop/internal/trace/model/model.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ class UploadType(str, Enum):
3333
"exporter.file_flush.rate"
3434
]
3535

36+
3637
@dataclass
3738
class FinishEventInfoExtra:
3839
is_root_span: bool
3940

41+
4042
@dataclass
4143
class FinishEventInfo:
4244
event_type: SpanFinishEvent
@@ -45,7 +47,14 @@ class FinishEventInfo:
4547
detail_msg: str
4648
extra_params: Optional[FinishEventInfoExtra] = None
4749

50+
4851
@dataclass
4952
class TagTruncateConf:
5053
normal_field_max_byte: int
5154
input_output_field_max_byte: int
55+
56+
57+
@dataclass
58+
class QueueConf:
59+
span_queue_length: int
60+
span_max_export_batch_length: int

cozeloop/internal/trace/span_processor.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from multipart import file_path
1414

1515
from cozeloop.internal.trace.exporter import *
16-
from cozeloop.internal.trace.model.model import FinishEventInfo
16+
from cozeloop.internal.trace.model.model import FinishEventInfo, QueueConf
1717
from cozeloop.internal.trace.queue_manager import BatchQueueManager, BatchQueueManagerOptions, QUEUE_NAME_FILE_RETRY, \
1818
QUEUE_NAME_FILE, QUEUE_NAME_SPAN_RETRY, QUEUE_NAME_SPAN
1919
from cozeloop.internal.trace.span import Span
@@ -49,6 +49,7 @@ def __init__(
4949
client,
5050
upload_path: UploadPath = None,
5151
finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
52+
queue_conf: Optional[QueueConf] = None,
5253
):
5354
span_upload_path = PATH_INGEST_TRACE
5455
file_upload_path = PATH_UPLOAD_FILE
@@ -65,6 +66,14 @@ def __init__(
6566
)
6667
)
6768

69+
span_queue_length = DEFAULT_MAX_QUEUE_LENGTH
70+
span_export_batch_size = DEFAULT_MAX_EXPORT_BATCH_LENGTH
71+
if queue_conf:
72+
if queue_conf.span_queue_length > 0:
73+
span_queue_length = queue_conf.span_queue_length
74+
if queue_conf.span_max_export_batch_length > 0:
75+
span_export_batch_size = queue_conf.span_max_export_batch_length
76+
6877
self.file_retry_qm = BatchQueueManager(
6978
BatchQueueManagerOptions(
7079
queue_name=QUEUE_NAME_FILE_RETRY,
@@ -105,8 +114,8 @@ def __init__(
105114
BatchQueueManagerOptions(
106115
queue_name=QUEUE_NAME_SPAN,
107116
batch_timeout=DEFAULT_SCHEDULE_DELAY,
108-
max_queue_length=DEFAULT_MAX_QUEUE_LENGTH,
109-
max_export_batch_length=DEFAULT_MAX_EXPORT_BATCH_LENGTH,
117+
max_queue_length=span_queue_length,
118+
max_export_batch_length=span_export_batch_size,
110119
max_export_batch_byte_size=DEFAULT_MAX_EXPORT_BATCH_BYTE_SIZE,
111120
export_func=self._new_export_spans_func(self.exporter, self.span_retry_qm, self.file_qm, finish_event_processor),
112121
finish_event_processor=finish_event_processor,

cozeloop/internal/trace/trace.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Dict, Optional, Callable
77

88
from cozeloop.internal.trace.exporter import UploadPath
9-
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf
9+
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf
1010
from cozeloop.spec.tracespec import Runtime, RUNTIME_
1111
from cozeloop.internal import consts
1212
from cozeloop.internal.trace.span import from_header, Span, SpanContext, \
@@ -41,6 +41,7 @@ def __init__(
4141
tag_truncate_conf: Optional[TagTruncateConf] = None,
4242
span_upload_path: str = None,
4343
file_upload_path: str = None,
44+
queue_conf: Optional[QueueConf] = None,
4445
):
4546
self.workspace_id = workspace_id
4647
self.ultra_large_report = ultra_large_report
@@ -54,6 +55,7 @@ def __init__(
5455
http_client,
5556
upload_path,
5657
finish_event_processor,
58+
queue_conf,
5759
)
5860
self.tag_truncate_conf = tag_truncate_conf
5961

0 commit comments

Comments
 (0)