Skip to content

Commit 4202fae

Browse files
committed
update
1 parent 424d832 commit 4202fae

File tree

2 files changed

+46
-31
lines changed

2 files changed

+46
-31
lines changed

cozeloop/internal/trace/exporter.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
logger = logging.getLogger(__name__)
1919

2020
class Exporter:
21-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> (bool, str):
21+
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
2222
raise NotImplementedError
2323

24-
def export_files(self, ctx: dict, files: List['UploadFile']) -> (bool, str):
24+
def export_files(self, ctx: dict, files: List['UploadFile']):
2525
raise NotImplementedError
2626

2727

@@ -54,7 +54,7 @@ def __init__(
5454
self.client = client
5555
self.upload_path = upload_path
5656

57-
def export_files(self, ctx: dict, files: List['UploadFile']) -> (bool, str):
57+
def export_files(self, ctx: dict, files: List['UploadFile']):
5858
for file in files:
5959
if not file:
6060
continue
@@ -69,16 +69,16 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> (bool, str):
6969
{"workspace_id": file.space_id},
7070
)
7171
if resp.code != 0: # todo: some err code do not need retry
72-
return False, f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]"
72+
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
7373
except Exception as e:
74-
return False, f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]"
74+
raise Exception(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")
7575

7676
logger.debug(f"uploadFile end, file name: {file.name}")
77-
return True, ''
7877

79-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> (bool, str):
80-
if not spans:
81-
return True, ''
78+
79+
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
80+
if not spans or len(spans) == 0:
81+
return
8282

8383
try:
8484
resp = self.client.post(
@@ -87,11 +87,9 @@ def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> (bool, str):
8787
UploadSpanData(spans=spans),
8888
)
8989
if resp.code != 0: # todo: some err code do not need retry
90-
return False, f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]"
90+
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
9191
except Exception as e:
92-
return False, f"export spans fail, err:[{e}]"
93-
94-
return True, ""
92+
raise Exception(f"export spans fail, err:[{e}]")
9593

9694

9795
class UploadSpanData(BaseModel):

cozeloop/internal/trace/span_processor.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -153,34 +153,43 @@ def _new_export_spans_func(
153153
):
154154
def export_func(ctx: dict, items: List[Any]):
155155
spans = [s for s in items if isinstance(s, Span)]
156+
if not spans or len(spans) == 0:
157+
return
156158
try:
157159
upload_spans, upload_files = transfer_to_upload_span_and_file(spans)
158160
except Exception as e:
159161
logger.warning(f"transfer_to_upload_span_and_file fail")
162+
return
160163

161-
is_fail = False
162-
err_msg = ""
164+
event_err_msg = ""
163165
before = time.perf_counter()
164-
is_pass, msg = exporter.export_spans(ctx, upload_spans)
166+
167+
is_export_pass = True
168+
export_msg = ""
169+
try:
170+
exporter.export_spans(ctx, upload_spans)
171+
except Exception as e:
172+
is_export_pass = False
173+
export_msg = f"{e}"
174+
165175
elapsed_time_ms = (time.perf_counter() - before) * 1000
166-
if not is_pass:
176+
if not is_export_pass:
167177
if span_retry_queue:
168178
for span in spans:
169179
span_retry_queue.enqueue(span, span.bytes_size)
170-
err_msg = f'{msg}, retry later'
180+
event_err_msg = f'{export_msg}, retry later'
171181
else:
172-
err_msg = f'{msg}, retry second time failed'
173-
is_fail = True
182+
event_err_msg = f'{export_msg}, retry second time failed'
174183
else:
175184
for file in upload_files:
176185
if file and file_queue:
177186
file_queue.enqueue(file, len(file.data))
178187
if finish_event_processor:
179188
finish_event_processor(FinishEventInfo(
180189
event_type="exporter.span_flush.rate",
181-
is_event_fail=is_fail,
190+
is_event_fail=not is_export_pass,
182191
item_num=len(spans),
183-
detail_msg=err_msg,
192+
detail_msg=event_err_msg,
184193
extra_params=FinishEventInfoExtra(
185194
latency_ms=int(elapsed_time_ms)
186195
)
@@ -196,26 +205,34 @@ def _new_export_files_func(
196205
):
197206
def export_func(ctx: dict, items: List[Any]):
198207
files = [f for f in items if isinstance(f, UploadFile)]
208+
if not files or len(files) == 0:
209+
return
199210

200-
is_fail = False
201-
err_msg = ""
211+
event_err_msg = ""
202212
before = time.perf_counter()
203-
is_pass, msg = exporter.export_files(ctx, files)
213+
214+
is_export_pass = True
215+
export_msg = ""
216+
try:
217+
exporter.export_files(ctx, files)
218+
except Exception as e:
219+
is_export_pass = True
220+
export_msg = f"{e}"
221+
204222
elapsed_time_ms = (time.perf_counter() - before) * 1000
205-
if not is_pass:
223+
if not is_export_pass:
206224
if file_retry_queue:
207225
for file in files:
208226
file_retry_queue.enqueue(file, len(file.data))
209-
err_msg = f'{msg}, retry later'
227+
event_err_msg = f'{export_msg}, retry later'
210228
else:
211-
err_msg = f'{msg}, retry second time failed'
212-
is_fail = True
229+
event_err_msg = f'{export_msg}, retry second time failed'
213230
if finish_event_processor:
214231
finish_event_processor(FinishEventInfo(
215232
event_type="exporter.file_flush.rate",
216-
is_event_fail=is_fail,
233+
is_event_fail=not is_export_pass,
217234
item_num=len(files),
218-
detail_msg=err_msg,
235+
detail_msg=event_err_msg,
219236
extra_params=FinishEventInfoExtra(
220237
latency_ms=int(elapsed_time_ms)
221238
)

0 commit comments

Comments
 (0)