1616import re
1717import typing
1818import warnings
19+ from time import sleep
20+ from enum import Enum
21+ from typing import Optional , Type , Union
1922
2023from .common import (
24+ SYNC_ATTEMPT_WAIT ,
25+ SYNC_ATTEMPTS ,
26+ SYNC_CALLBACK_WAIT ,
2127 ClientError ,
2228 LoginError ,
2329 WorkspaceRole ,
3844 download_diffs_finalize ,
3945)
4046from .client_pull import pull_project_async , pull_project_wait , pull_project_finalize
41- from .client_push import push_project_async , push_project_wait , push_project_finalize
47+ from .client_push import (
48+ get_push_changes_batch ,
49+ push_project_async ,
50+ push_project_is_running ,
51+ push_project_wait ,
52+ push_project_finalize ,
53+ UploadChunksCache ,
54+ )
4255from .utils import DateTimeEncoder , get_versions_with_file_changes , int_version , is_version_acceptable
56+ from .utils import (
57+ DateTimeEncoder ,
58+ get_versions_with_file_changes ,
59+ int_version ,
60+ is_version_acceptable ,
61+ normalize_role ,
62+ )
4363from .version import __version__
4464
4565this_dir = os .path .dirname (os .path .realpath (__file__ ))
@@ -119,6 +139,8 @@ def __init__(
119139 self ._user_info = None
120140 self ._server_type = None
121141 self ._server_version = None
142+ self ._server_features = {}
143+ self .upload_chunks_cache = UploadChunksCache ()
122144 self .client_version = "Python-client/" + __version__
123145 if plugin_version is not None : # this could be e.g. "Plugin/2020.1 QGIS/3.14"
124146 self .client_version += " " + plugin_version
@@ -378,8 +400,7 @@ def server_type(self):
378400 """
379401 if not self ._server_type :
380402 try :
381- resp = self .get ("/config" , validate_auth = False )
382- config = json .load (resp )
403+ config = self .server_config ()
383404 stype = config .get ("server_type" )
384405 if stype == "ce" :
385406 self ._server_type = ServerType .CE
@@ -404,14 +425,26 @@ def server_version(self):
404425 """
405426 if self ._server_version is None :
406427 try :
407- resp = self .get ("/config" , validate_auth = False )
408- config = json .load (resp )
428+ config = self .server_config ()
409429 self ._server_version = config ["version" ]
410430 except (ClientError , KeyError ):
411431 self ._server_version = ""
412432
413433 return self ._server_version
414434
435+ def server_features (self ):
436+ """
437+ Returns feature flags of the server.
438+ """
439+ if self ._server_features :
440+ return self ._server_features
441+ config = self .server_config ()
442+ self ._server_features = {
443+ "v2_push_enabled" : config .get ("v2_push_enabled" , False ),
444+ "v2_pull_enabled" : config .get ("v2_pull_enabled" , False ),
445+ }
446+ return self ._server_features
447+
415448 def workspaces_list (self ):
416449 """
417450 Find all available workspaces
@@ -532,7 +565,7 @@ def create_project_and_push(self, project_name, directory, is_public=False, name
532565 MerginProject .write_metadata (directory , project_info )
533566 mp = MerginProject (directory )
534567 if mp .inspect_files ():
535- self .push_project (directory )
568+ self .sync_project (directory )
536569
537570 def paginated_projects_list (
538571 self ,
@@ -802,7 +835,7 @@ def download_project(self, project_path, directory, version=None):
802835 def user_info (self ):
803836 server_type = self .server_type ()
804837 if server_type == ServerType .OLD :
805- resp = self .get ("/v1/user/" + self .username ())
838+ resp = self .get (f "/v1/user/{ self .username ()} " )
806839 else :
807840 resp = self .get ("/v1/user/profile" )
808841 return json .load (resp )
@@ -1313,7 +1346,7 @@ def create_user(
13131346 email : str ,
13141347 password : str ,
13151348 workspace_id : int ,
1316- workspace_role : WorkspaceRole ,
1349+ workspace_role : Union [ str , WorkspaceRole ] ,
13171350 username : str = None ,
13181351 notify_user : bool = False ,
13191352 ) -> dict :
@@ -1328,11 +1361,15 @@ def create_user(
13281361 param notify_user: flag for email notifications - confirmation email will be sent
13291362 """
13301363 self .check_collaborators_members_support ()
1364+ role_enum = normalize_role (workspace_role , WorkspaceRole )
1365+ if role_enum is None :
1366+ raise ValueError (f"Invalid role: { workspace_role } " )
1367+
13311368 params = {
13321369 "email" : email ,
13331370 "password" : password ,
13341371 "workspace_id" : workspace_id ,
1335- "role" : workspace_role .value ,
1372+ "role" : role_enum .value ,
13361373 "notify_user" : notify_user ,
13371374 }
13381375 if username :
@@ -1357,17 +1394,26 @@ def list_workspace_members(self, workspace_id: int) -> typing.List[dict]:
13571394 return json .load (resp )
13581395
13591396 def update_workspace_member (
1360- self , workspace_id : int , user_id : int , workspace_role : WorkspaceRole , reset_projects_roles : bool = False
1397+ self ,
1398+ workspace_id : int ,
1399+ user_id : int ,
1400+ workspace_role : Union [str , WorkspaceRole ],
1401+ reset_projects_roles : bool = False ,
13611402 ) -> dict :
13621403 """
13631404 Update workspace role of a workspace member, optionally resets the projects role
13641405
13651406 param reset_projects_roles: all project specific roles will be removed
13661407 """
13671408 self .check_collaborators_members_support ()
1409+
1410+ role_enum = normalize_role (workspace_role , WorkspaceRole )
1411+ if role_enum is None :
1412+ raise ValueError (f"Invalid role: { workspace_role } " )
1413+
13681414 params = {
13691415 "reset_projects_roles" : reset_projects_roles ,
1370- "workspace_role" : workspace_role .value ,
1416+ "workspace_role" : role_enum .value ,
13711417 }
13721418 workspace_member = self .patch (f"v2/workspaces/{ workspace_id } /members/{ user_id } " , params , json_headers )
13731419 return json .load (workspace_member )
@@ -1387,25 +1433,35 @@ def list_project_collaborators(self, project_id: str) -> typing.List[dict]:
13871433 project_collaborators = self .get (f"v2/projects/{ project_id } /collaborators" )
13881434 return json .load (project_collaborators )
13891435
1390- def add_project_collaborator (self , project_id : str , user : str , project_role : ProjectRole ) -> dict :
1436+ def add_project_collaborator (self , project_id : str , user : str , project_role : Union [ str , ProjectRole ] ) -> dict :
13911437 """
13921438 Add a user to project collaborators and grant them a project role.
13931439 Fails if user is already a member of the project.
13941440
13951441 param user: login (username or email) of the user
13961442 """
13971443 self .check_collaborators_members_support ()
1444+
1445+ role_enum = normalize_role (project_role , ProjectRole )
1446+ if role_enum is None :
1447+ raise ValueError (f"Invalid role: { project_role } " )
1448+
13981449 params = {"role" : project_role .value , "user" : user }
13991450 project_collaborator = self .post (f"v2/projects/{ project_id } /collaborators" , params , json_headers )
14001451 return json .load (project_collaborator )
14011452
1402- def update_project_collaborator (self , project_id : str , user_id : int , project_role : ProjectRole ) -> dict :
1453+ def update_project_collaborator (self , project_id : str , user_id : int , project_role : Union [ str , ProjectRole ] ) -> dict :
14031454 """
14041455 Update project role of the existing project collaborator.
14051456 Fails if user is not a member of the project yet.
14061457 """
14071458 self .check_collaborators_members_support ()
1459+
1460+ role_enum = normalize_role (project_role , ProjectRole )
1461+ if role_enum is None :
1462+ raise ValueError (f"Invalid role: { project_role } " )
14081463 params = {"role" : project_role .value }
1464+
14091465 project_collaborator = self .patch (f"v2/projects/{ project_id } /collaborators/{ user_id } " , params , json_headers )
14101466 return json .load (project_collaborator )
14111467
@@ -1481,13 +1537,71 @@ def send_logs(
14811537 request = urllib .request .Request (url , data = payload , headers = header )
14821538 return self ._do_request (request )
14831539
1484- def create_invitation (self , workspace_id : int , email : str , workspace_role : WorkspaceRole ):
1540+ def create_invitation (self , workspace_id : int , email : str , workspace_role : Union [ str , WorkspaceRole ] ):
14851541 """
14861542 Create invitation to workspace for specific role
14871543 """
14881544 min_version = "2025.6.1"
14891545 if not is_version_acceptable (self .server_version (), min_version ):
14901546 raise NotImplementedError (f"This needs server at version { min_version } or later" )
1491- params = {"email" : email , "role" : workspace_role .value }
1547+
1548+ role_enum = normalize_role (workspace_role , WorkspaceRole )
1549+ if role_enum is None :
1550+ raise ValueError (f"Invalid role: { workspace_role } " )
1551+
1552+ params = {"email" : email , "role" : role_enum .value }
14921553 ws_inv = self .post (f"v2/workspaces/{ workspace_id } /invitations" , params , json_headers )
14931554 return json .load (ws_inv )
1555+
1556+ def sync_project_generator (self , project_directory ):
1557+ """
1558+ Syncs project by loop with these steps:
1559+ 1. Pull server version
1560+ 2. Get local changes
1561+ 3. Push first change batch
1562+ Repeat if there are more local changes.
1563+
1564+ :param project_directory: Project's directory
1565+ """
1566+ mp = MerginProject (project_directory )
1567+ has_changes = True
1568+ server_conflict_attempts = 0
1569+ while has_changes :
1570+ self .pull_project (project_directory )
1571+ try :
1572+ job = push_project_async (self , project_directory )
1573+ if not job :
1574+ break
1575+ # waiting for progress
1576+ last_size = 0
1577+ while push_project_is_running (job ):
1578+ sleep (SYNC_CALLBACK_WAIT )
1579+ current_size = job .transferred_size
1580+ yield (current_size - last_size , job ) # Yields the size change and the job object
1581+ last_size = current_size
1582+ push_project_finalize (job )
1583+ _ , has_changes = get_push_changes_batch (self , project_directory )
1584+ server_conflict_attempts = 0
1585+ except ClientError as e :
1586+ if e .is_retryable_sync () and server_conflict_attempts < SYNC_ATTEMPTS - 1 :
1587+ # retry on conflict, e.g. when server has changes that we do not have yet
1588+ mp .log .info (
1589+ f"Restarting sync process (conflict on server) - { server_conflict_attempts + 1 } /{ SYNC_ATTEMPTS } "
1590+ )
1591+ server_conflict_attempts += 1
1592+ sleep (SYNC_ATTEMPT_WAIT )
1593+ continue
1594+ raise e
1595+
1596+ def sync_project (self , project_directory ):
1597+ """
1598+ Syncs project by pulling server changes and pushing local changes. There is intorduced retry mechanism
1599+ for handling server conflicts (when server has changes that we do not have yet or somebody else is syncing).
1600+ See description of _sync_project_generator().
1601+
1602+ :param project_directory: Project's directory
1603+ """
1604+ # walk through the generator to perform the sync
1605+ # in this method we do not yield anything to the caller
1606+ for _ in self .sync_project_generator (project_directory ):
1607+ pass
0 commit comments