Skip to content

Commit 76f2753

Browse files
jacalataclaude
andcommitted
fix: treat duplicate extract job (409093) as warning in datasources and flows
Fixes #1090 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 387d043 commit 76f2753

8 files changed

Lines changed: 67 additions & 7 deletions

File tree

tableauserverclient/server/endpoint/datasources_endpoint.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919

2020
from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
2121
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
22-
from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
22+
from tableauserverclient.server.endpoint.exceptions import (
23+
DUPLICATE_EXTRACT_JOB_CODE,
24+
InternalServerError,
25+
MissingRequiredFieldError,
26+
ServerResponseError,
27+
)
2328
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
2429
from tableauserverclient.server.endpoint.resource_tagger import TaggingMixin
2530

@@ -433,7 +438,7 @@ def update_connections(
433438
return connection_items
434439

435440
@api(version="2.8")
436-
def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem:
441+
def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem | None:
437442
"""
438443
Refreshes the extract of an existing workbook.
439444
@@ -454,7 +459,13 @@ def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = Fal
454459
id_ = getattr(datasource_item, "id", datasource_item)
455460
url = f"{self.baseurl}/{id_}/refresh"
456461
refresh_req = RequestFactory.Task.refresh_req(incremental, self.parent_srv)
457-
server_response = self.post_request(url, refresh_req)
462+
try:
463+
server_response = self.post_request(url, refresh_req)
464+
except ServerResponseError as e:
465+
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
466+
logger.warning(f"{e.summary} {e.detail}")
467+
return None
468+
raise
458469
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
459470
return new_job
460471

tableauserverclient/server/endpoint/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from defusedxml.ElementTree import fromstring
22
from typing import TypeVar
33

4+
# Server error code for "extract refresh already queued" — treated as a
5+
# non-fatal warning rather than an exception in refresh() methods.
6+
DUPLICATE_EXTRACT_JOB_CODE = "409093"
7+
48

59
def split_pascal_case(s: str) -> str:
610
return "".join([f" {c}" if c.isupper() else c for c in s]).strip()

tableauserverclient/server/endpoint/flows_endpoint.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010

1111
from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
1212
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api
13-
from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
13+
from tableauserverclient.server.endpoint.exceptions import (
14+
DUPLICATE_EXTRACT_JOB_CODE,
15+
InternalServerError,
16+
MissingRequiredFieldError,
17+
ServerResponseError,
18+
)
1419
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
1520
from tableauserverclient.server.endpoint.resource_tagger import _ResourceTagger, TaggingMixin
1621
from tableauserverclient.models import FlowItem, PaginationItem, ConnectionItem, JobItem
@@ -305,7 +310,7 @@ def update_connection(self, flow_item: FlowItem, connection_item: ConnectionItem
305310
return connection
306311

307312
@api(version="3.3")
308-
def refresh(self, flow_item: FlowItem | str) -> JobItem:
313+
def refresh(self, flow_item: FlowItem | str) -> JobItem | None:
309314
"""
310315
Runs the flow to refresh the data.
311316
@@ -324,7 +329,13 @@ def refresh(self, flow_item: FlowItem | str) -> JobItem:
324329
flow_id = getattr(flow_item, "id", flow_item)
325330
url = f"{self.baseurl}/{flow_id}/run"
326331
empty_req = RequestFactory.Empty.empty_req()
327-
server_response = self.post_request(url, empty_req)
332+
try:
333+
server_response = self.post_request(url, empty_req)
334+
except ServerResponseError as e:
335+
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
336+
logger.warning(f"{e.summary} {e.detail}")
337+
return None
338+
raise
328339
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
329340
return new_job
330341

tableauserverclient/server/endpoint/workbooks_endpoint.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
1313
from tableauserverclient.server.endpoint.exceptions import (
14+
DUPLICATE_EXTRACT_JOB_CODE,
1415
InternalServerError,
1516
MissingRequiredFieldError,
1617
ServerResponseError,
@@ -147,7 +148,7 @@ def refresh(self, workbook_item: WorkbookItem | str, incremental: bool = False)
147148
try:
148149
server_response = self.post_request(url, refresh_req)
149150
except ServerResponseError as e:
150-
if e.code.startswith("409") and "already" in e.detail:
151+
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
151152
logger.warning(f"{e.summary} {e.detail}")
152153
return None
153154
raise
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_27.xsd">
3+
<error code="409093"><summary>Resource Conflict</summary><detail>Job for \'extract\' is already queued. Not queuing a duplicate.</detail></error></tsResponse>
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_27.xsd">
3+
<error code="409093"><summary>Resource Conflict</summary><detail>Job for \'extract\' is already queued. Not queuing a duplicate.</detail></error></tsResponse>

test/test_datasource.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
UPDATE_CONNECTION_XML = TEST_ASSET_DIR / "datasource_connection_update.xml"
3636
UPDATE_CONNECTIONS_XML = TEST_ASSET_DIR / "datasource_connections_update.xml"
3737
UPDATE_CONNECTIONS_NO_AUTH_XML = TEST_ASSET_DIR / "datasource_connections_update_no_auth.xml"
38+
REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "datasource_refresh_duplicate.xml"
3839

3940

4041
@pytest.fixture(scope="function")
@@ -473,6 +474,19 @@ def test_refresh_object(server) -> None:
473474
assert "7c3d599e-949f-44c3-94a1-f30ba85757e4" == new_job.id
474475

475476

477+
def test_refresh_already_running(server) -> None:
478+
server.version = "2.8"
479+
response_xml = REFRESH_DUPLICATE_XML.read_text()
480+
with requests_mock.mock() as m:
481+
m.post(
482+
server.datasources.baseurl + "/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/refresh",
483+
status_code=409,
484+
text=response_xml,
485+
)
486+
refresh_job = server.datasources.refresh("9dbd2263-16b5-46e1-9c43-a76bb8ab65fb")
487+
assert refresh_job is None
488+
489+
476490
def test_datasource_refresh_request_empty(server) -> None:
477491
server.version = "2.8"
478492
item = TSC.DatasourceItem("")

test/test_flow.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
PUBLISH_XML = TEST_ASSET_DIR / "flow_publish.xml"
1818
UPDATE_XML = TEST_ASSET_DIR / "flow_update.xml"
1919
REFRESH_XML = TEST_ASSET_DIR / "flow_refresh.xml"
20+
REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "flow_refresh_duplicate.xml"
2021

2122

2223
@pytest.fixture(scope="function")
@@ -232,6 +233,18 @@ def test_refresh_id_str(server: TSC.Server) -> None:
232233
assert format_datetime(refresh_job.flow_run.started_at) == "2018-05-22T13:00:29Z"
233234

234235

236+
def test_refresh_already_running(server: TSC.Server) -> None:
237+
response_xml = REFRESH_DUPLICATE_XML.read_text()
238+
with requests_mock.mock() as m:
239+
m.post(
240+
server.flows.baseurl + "/92967d2d-c7e2-46d0-8847-4802df58f484/run",
241+
status_code=409,
242+
text=response_xml,
243+
)
244+
refresh_job = server.flows.refresh("92967d2d-c7e2-46d0-8847-4802df58f484")
245+
assert refresh_job is None
246+
247+
235248
def test_bad_download_response(server: TSC.Server) -> None:
236249
with requests_mock.mock() as m, tempfile.TemporaryDirectory() as td:
237250
m.get(

0 commit comments

Comments
 (0)