|
10 | 10 |
|
11 | 11 | import tableauserverclient as TSC |
12 | 12 | from tableauserverclient.datetime_helpers import format_datetime, parse_datetime |
| 13 | +from tableauserverclient.server.endpoint.users_endpoint import create_users_csv, remove_users_csv |
13 | 14 |
|
14 | 15 | TEST_ASSET_DIR = Path(__file__).resolve().parent / "assets" |
15 | 16 |
|
|
28 | 29 | USERS = os.path.join(TEST_ASSET_DIR, "Data", "user_details.csv") |
29 | 30 |
|
30 | 31 |
|
| 32 | +def make_user( |
| 33 | + name: str, |
| 34 | + site_role: str = "", |
| 35 | + auth_setting: str = "", |
| 36 | + domain: str = "", |
| 37 | + fullname: str = "", |
| 38 | + email: str = "", |
| 39 | +) -> TSC.UserItem: |
| 40 | + user = TSC.UserItem(name, site_role or None) |
| 41 | + if auth_setting: |
| 42 | + user.auth_setting = auth_setting |
| 43 | + if domain: |
| 44 | + user._domain_name = domain |
| 45 | + if fullname: |
| 46 | + user.fullname = fullname |
| 47 | + if email: |
| 48 | + user.email = email |
| 49 | + return user |
| 50 | + |
| 51 | + |
31 | 52 | class UserTests(unittest.TestCase): |
32 | 53 | def setUp(self) -> None: |
33 | 54 | self.server = TSC.Server("http://test", False) |
@@ -328,26 +349,64 @@ def test_update_user_idp_configuration(self) -> None: |
328 | 349 | assert user_elem is not None |
329 | 350 | assert user_elem.attrib["idpConfigurationId"] == "012345" |
330 | 351 |
|
331 | | - def test_bulk_add(self): |
332 | | - def make_user( |
333 | | - name: str, |
334 | | - site_role: str = "", |
335 | | - auth_setting: str = "", |
336 | | - domain: str = "", |
337 | | - fullname: str = "", |
338 | | - email: str = "", |
339 | | - ) -> TSC.UserItem: |
340 | | - user = TSC.UserItem(name, site_role or None) |
341 | | - if auth_setting: |
342 | | - user.auth_setting = auth_setting |
343 | | - if domain: |
344 | | - user._domain_name = domain |
345 | | - if fullname: |
346 | | - user.fullname = fullname |
347 | | - if email: |
348 | | - user.email = email |
349 | | - return user |
| 352 | + def test_create_users_csv(self): |
| 353 | + users = [ |
| 354 | + make_user("Alice", "Viewer"), |
| 355 | + make_user("Bob", "Explorer"), |
| 356 | + make_user("Charlie", "Creator", "SAML"), |
| 357 | + make_user("Dave"), |
| 358 | + make_user("Eve", "ServerAdministrator", "OpenID", "example.com", "Eve Example", "Eve@example.com"), |
| 359 | + make_user("Frank", "SiteAdministratorExplorer", "TableauIDWithMFA", email="Frank@example.com"), |
| 360 | + make_user("Grace", "SiteAdministratorCreator", "SAML", "example.com", "Grace Example", "gex@example.com"), |
| 361 | + make_user("Hank", "Unlicensed"), |
| 362 | + ] |
| 363 | + |
| 364 | + license_map = { |
| 365 | + "Viewer": "Viewer", |
| 366 | + "Explorer": "Explorer", |
| 367 | + "ExplorerCanPublish": "Explorer", |
| 368 | + "Creator": "Creator", |
| 369 | + "SiteAdministratorExplorer": "Explorer", |
| 370 | + "SiteAdministratorCreator": "Creator", |
| 371 | + "ServerAdministrator": "Creator", |
| 372 | + "Unlicensed": "Unlicensed", |
| 373 | + } |
| 374 | + publish_map = { |
| 375 | + "Unlicensed": 0, |
| 376 | + "Viewer": 0, |
| 377 | + "Explorer": 0, |
| 378 | + "Creator": 1, |
| 379 | + "ExplorerCanPublish": 1, |
| 380 | + "SiteAdministratorExplorer": 1, |
| 381 | + "SiteAdministratorCreator": 1, |
| 382 | + "ServerAdministrator": 1, |
| 383 | + } |
| 384 | + admin_map = { |
| 385 | + "SiteAdministratorExplorer": "Site", |
| 386 | + "SiteAdministratorCreator": "Site", |
| 387 | + "ServerAdministrator": "System", |
| 388 | + } |
| 389 | + |
| 390 | + csv_columns = ["name", "password", "fullname", "license", "admin", "publish", "email"] |
| 391 | + csv_data = create_users_csv(users) |
| 392 | + csv_file = io.StringIO(csv_data.decode("utf-8")) |
| 393 | + csv_reader = csv.reader(csv_file) |
| 394 | + for user, row in zip(users, csv_reader): |
| 395 | + with self.subTest(user=user): |
| 396 | + site_role = user.site_role or "Unlicensed" |
| 397 | + name = f"{user.domain_name}\\{user.name}" if user.domain_name else user.name |
| 398 | + csv_user = dict(zip(csv_columns, row)) |
| 399 | + assert name == csv_user["name"] |
| 400 | + assert (user.fullname or "") == csv_user["fullname"] |
| 401 | + assert (user.email or "") == csv_user["email"] |
| 402 | + assert license_map[site_role] == csv_user["license"] |
| 403 | + assert admin_map.get(site_role, "") == csv_user["admin"] |
| 404 | + assert publish_map[site_role] == int(csv_user["publish"]) |
350 | 405 |
|
| 406 | + |
| 407 | + |
| 408 | + |
| 409 | + def test_bulk_add(self): |
351 | 410 | self.server.version = "3.15" |
352 | 411 | users = [ |
353 | 412 | make_user("Alice", "Viewer"), |
@@ -391,45 +450,8 @@ def make_user( |
391 | 450 | assert user.name == xml_user.get("name") |
392 | 451 | assert xml_user.get("authSetting") == (user.auth_setting or "ServerDefault") |
393 | 452 |
|
394 | | - license_map = { |
395 | | - "Viewer": "Viewer", |
396 | | - "Explorer": "Explorer", |
397 | | - "ExplorerCanPublish": "Explorer", |
398 | | - "Creator": "Creator", |
399 | | - "SiteAdministratorExplorer": "Explorer", |
400 | | - "SiteAdministratorCreator": "Creator", |
401 | | - "ServerAdministrator": "Creator", |
402 | | - "Unlicensed": "Unlicensed", |
403 | | - } |
404 | | - publish_map = { |
405 | | - "Unlicensed": 0, |
406 | | - "Viewer": 0, |
407 | | - "Explorer": 0, |
408 | | - "Creator": 1, |
409 | | - "ExplorerCanPublish": 1, |
410 | | - "SiteAdministratorExplorer": 1, |
411 | | - "SiteAdministratorCreator": 1, |
412 | | - "ServerAdministrator": 1, |
413 | | - } |
414 | | - admin_map = { |
415 | | - "SiteAdministratorExplorer": "Site", |
416 | | - "SiteAdministratorCreator": "Site", |
417 | | - "ServerAdministrator": "System", |
418 | | - } |
419 | | - |
420 | | - csv_columns = ["name", "password", "fullname", "license", "admin", "publish", "email"] |
421 | | - csv_file = io.StringIO(segments[0].split(b"\n\n")[1].decode("utf-8")) |
422 | | - csv_reader = csv.reader(csv_file) |
423 | | - for user, row in zip(users, csv_reader): |
424 | | - site_role = user.site_role or "Unlicensed" |
425 | | - name = f"{user.domain_name}\\{user.name}" if user.domain_name else user.name |
426 | | - csv_user = dict(zip(csv_columns, row)) |
427 | | - assert name == csv_user["name"] |
428 | | - assert (user.fullname or "") == csv_user["fullname"] |
429 | | - assert (user.email or "") == csv_user["email"] |
430 | | - assert license_map[site_role] == csv_user["license"] |
431 | | - assert admin_map.get(site_role, "") == csv_user["admin"] |
432 | | - assert publish_map[site_role] == int(csv_user["publish"]) |
| 453 | + csv_data = create_users_csv(users).replace(b"\r\n", b"\n") |
| 454 | + assert csv_data.strip() == segments[0].split(b"\n\n")[1].strip() |
433 | 455 |
|
434 | 456 | def test_bulk_add_no_name(self): |
435 | 457 | self.server.version = "3.15" |
|
0 commit comments