From 5f32c203e8124c87ead348d86ed4eb79223a94f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 20 Aug 2018 23:28:54 +0200 Subject: [PATCH 1/5] started drafting something --- aw_sync/main.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 aw_sync/main.py diff --git a/aw_sync/main.py b/aw_sync/main.py new file mode 100644 index 00000000..3fb1e29a --- /dev/null +++ b/aw_sync/main.py @@ -0,0 +1,33 @@ +from pathlib import Path +from datetime import datetime + +from aw_core import Event +from aw_datastore.storages import PeeweeStorage + + +def store_latest_to_syncdir(): + # Get the local db, export it + local_export = export_dbfile() + save_export_to_db(local_export) + + +def save_export_to_db(export): + p = PeeweeStorage(True, filepath='/home/erb/Cosmosync/test.sqlite') + now = datetime.now() + p.create_bucket('test', 'test', 'localhost', 'localhost', now) + print(p.buckets()) + # TODO: do export, see aw_server.api.ServerAPI + + +def export_dbfile(filepath): + """Open a db, return the export of that db""" + # TODO: Open db as read-only + p = PeeweeStorage(True) + now = datetime.now() + p.create_bucket('test', 'test', 'localhost', 'localhost', now) + print(p.buckets()) + # TODO: do export, see aw_server.api.ServerAPI + + +for filepath in Path('/home/erb/Cosmosync/ActivityWatch/').iterdir(): + print(filepath) From 3481a0d37c97e84b33d7b32b3349accd9d34f7c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Fri, 24 Aug 2018 18:10:17 +0200 Subject: [PATCH 2/5] more progress on basic sync --- aw_sync/main.py | 62 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 10 deletions(-) diff --git a/aw_sync/main.py b/aw_sync/main.py index 3fb1e29a..a5eef363 100644 --- a/aw_sync/main.py +++ b/aw_sync/main.py @@ -1,7 +1,10 @@ +import logging from pathlib import Path from datetime import datetime from aw_core import Event +from aw_server.api import ServerAPI +from aw_datastore.datastore import Datastore from aw_datastore.storages import PeeweeStorage @@ -12,22 +15,61 @@ def store_latest_to_syncdir(): def save_export_to_db(export): - p = PeeweeStorage(True, filepath='/home/erb/Cosmosync/test.sqlite') + p = PeeweeStorage(True, filepath='/home/erb/Cosmosync/master_sync.sqlite') now = datetime.now() p.create_bucket('test', 'test', 'localhost', 'localhost', now) print(p.buckets()) - # TODO: do export, see aw_server.api.ServerAPI -def export_dbfile(filepath): - """Open a db, return the export of that db""" +def get_apiobject(filepath: str=None): # TODO: Open db as read-only - p = PeeweeStorage(True) - now = datetime.now() - p.create_bucket('test', 'test', 'localhost', 'localhost', now) - print(p.buckets()) + db = Datastore(lambda testing: None, testing=True) + db.storage_strategy = PeeweeStorage(testing=True, filepath=filepath) + api = ServerAPI(db, testing=True) + return api + + +def create_testdb(filepath: str=None, hostname: str="unknown-host"): + api = get_apiobject(filepath) + bid = "test-" + hostname + api.create_bucket(bid, "test-type", "aw_sync", hostname) + print("Created test db") + + +def export_dbfile(filepath: str=None): + """Open a db, return the export of that db""" + api = get_apiobject(filepath) + export = api.export_all() + return export # TODO: do export, see aw_server.api.ServerAPI -for filepath in Path('/home/erb/Cosmosync/ActivityWatch/').iterdir(): - print(filepath) +def create_testdbs(testdir): + create_testdb(testdir + "/sync-test-1.db", hostname="host1") + create_testdb(testdir + "/sync-test-2.db", hostname="host2") + + +def merge_exports(exports): + master = {} + for export in exports: + for bid, bucket in export.items(): + if bid in master: + raise Exception('bucket collision') + master[bid] = bucket + return master + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + sync_folder = "/home/erb/Cosmosync/ActivityWatch" + create_testdbs(sync_folder) + + exports = [] + for filepath in Path('/home/erb/Cosmosync/ActivityWatch/').iterdir(): + export = export_dbfile(str(filepath)) + exports.append(export) + + print("Successfully exported {} databases".format(len(exports))) + + merged_export = merge_exports(exports) + print(merged_export) From 2ee069830982d5a37b567cb0c7ac9f6793c0734e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 6 May 2019 10:57:23 +0200 Subject: [PATCH 3/5] fixed type annotation --- aw_server/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aw_server/api.py b/aw_server/api.py index cc060932..2204840d 100644 --- a/aw_server/api.py +++ b/aw_server/api.py @@ -33,7 +33,7 @@ class ServerAPI: def __init__(self, db, testing) -> None: self.db = db self.testing = testing - self.last_event = {} #type: dict + self.last_event: Dict[str, Event] = {} def get_info(self) -> Dict[str, Dict]: """Get server info""" From c1852a25964bdfc76d64b80abfb50f998bb1c12a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 6 May 2019 13:19:16 +0200 Subject: [PATCH 4/5] major progress on sync --- aw_server/api.py | 5 +- aw_server/rest.py | 2 +- aw_sync/main.py | 151 +++++++++++++++++++++++++++++++++------------- 3 files changed, 112 insertions(+), 46 deletions(-) diff --git a/aw_server/api.py b/aw_server/api.py index 4980f8d8..704e9c1a 100644 --- a/aw_server/api.py +++ b/aw_server/api.py @@ -67,7 +67,7 @@ def get_bucket_metadata(self, bucket_id: str) -> Dict[str, Any]: def export_bucket(self, bucket_id: str) -> Dict[str, Any]: """Export a bucket to a dataformat consistent across versions, including all events in it.""" bucket = self.get_bucket_metadata(bucket_id) - bucket["events"] = self.get_events(bucket_id, limit=-1) + bucket["events"] = [e.to_json_dict() for e in self.get_events(bucket_id, limit=-1)] # Scrub event IDs for event in bucket["events"]: del event["id"] @@ -133,8 +133,7 @@ def get_events(self, bucket_id: str, limit: int = -1, logger.debug("Received get request for events in bucket '{}'".format(bucket_id)) if limit is None: # Let limit = None also mean "no limit" limit = -1 - events = [event.to_json_dict() for event in - self.db[bucket_id].get(limit, start, end)] + events = self.db[bucket_id].get(limit, start, end) return events @check_bucket_exists diff --git a/aw_server/rest.py b/aw_server/rest.py index 73ddc915..9be1d480 100644 --- a/aw_server/rest.py +++ b/aw_server/rest.py @@ -156,7 +156,7 @@ def get(self, bucket_id): start = iso8601.parse_date(args["start"]) if "start" in args else None end = iso8601.parse_date(args["end"]) if "end" in args else None - events = current_app.api.get_events(bucket_id, limit=limit, start=start, end=end) + events = [e.to_json_dict() for e in current_app.api.get_events(bucket_id, limit=limit, start=start, end=end)] return events, 200 # TODO: How to tell expect that it could be a list of events? Until then we can't use validate. diff --git a/aw_sync/main.py b/aw_sync/main.py index a5eef363..f985309f 100644 --- a/aw_sync/main.py +++ b/aw_sync/main.py @@ -1,75 +1,142 @@ import logging from pathlib import Path -from datetime import datetime +from datetime import datetime, timezone +from typing import Union +import os from aw_core import Event +from aw_client import ActivityWatchClient from aw_server.api import ServerAPI from aw_datastore.datastore import Datastore -from aw_datastore.storages import PeeweeStorage +from aw_datastore.storages import SqliteStorage +SYNC_FOLDER = "/home/erb/Cosmosync/ActivityWatch" -def store_latest_to_syncdir(): - # Get the local db, export it - local_export = export_dbfile() - save_export_to_db(local_export) +AWAPI = Union[ActivityWatchClient, ServerAPI] def save_export_to_db(export): - p = PeeweeStorage(True, filepath='/home/erb/Cosmosync/master_sync.sqlite') - now = datetime.now() + p = SqliteStorage(testing=True, filepath=SYNC_FOLDER + '/master_sync.sqlite') + now = datetime.now(tz=timezone.utc) p.create_bucket('test', 'test', 'localhost', 'localhost', now) print(p.buckets()) -def get_apiobject(filepath: str=None): +def get_apiobject(filepath: Path) -> ServerAPI: # TODO: Open db as read-only - db = Datastore(lambda testing: None, testing=True) - db.storage_strategy = PeeweeStorage(testing=True, filepath=filepath) + os.makedirs(filepath.parent, exist_ok=True) + db = Datastore((lambda testing: None), testing=True) + db.storage_strategy = SqliteStorage(testing=True, filepath=filepath, enable_lazy_commit=False) api = ServerAPI(db, testing=True) return api -def create_testdb(filepath: str=None, hostname: str="unknown-host"): +def create_testdbs(testpath: Path): + create_testdb(testpath / "sync-test-1.sqlite", hostname="host1") + create_testdb(testpath / "sync-test-2.sqlite", hostname="host2") + + +def create_testdb(filepath: Path, hostname: str="unknown-host"): api = get_apiobject(filepath) bid = "test-" + hostname - api.create_bucket(bid, "test-type", "aw_sync", hostname) - print("Created test db") + if bid not in api.get_buckets(): + api.create_bucket(bid, "test-type", "aw_sync", hostname) + api.create_events(bid, [Event(data={"test": 1})]) + print(f"Created test db {bid}") + + +# Sync new events +def sync_bucket(api_from: AWAPI, api_to: AWAPI, bucket_id_from: str, bucket_id_to: str) -> None: + api_from = universalize_api_accessor(api_from) + api_to = universalize_api_accessor(api_to) + + print(f"Syncing {bucket_id_from} to {api_to}...") + + buckets_to = api_to.get_buckets() + if bucket_id_to not in buckets_to: + # Do full first import + export = api_from.export_bucket(bucket_id_from) + export['id'] = bucket_id_to + api_to.import_bucket(export) + print(f"Imported new bucket {bucket_id_from} as {bucket_id_to}!") + else: + last_event_local = api_to.get_events(bucket_id_from, limit=1) or None + if last_event_local: + last_event_local = last_event_local[0] + synced_until = last_event_local.timestamp + else: + synced_until = None + new_events = sorted(api_from.get_events(bucket_id_from, start=synced_until, limit=-1), key=lambda e: e.timestamp) + + # Send the first event as a heartbeat, as it could be an updated version of the last local event + if len(new_events) > 0: + first_new_event = new_events[0] + if last_event_local.timestamp == first_new_event.timestamp: + api_to.heartbeat(bucket_id_to, first_new_event, 0) + + #for e in new_events: + # print(e) + + new_events = new_events[1:] + # Unset the ID for the new events + for e in new_events: + e['id'] = None + + api_to.insert_events(bucket_id_to, new_events) # type: ignore + + print(f"Fetched {len(new_events)} new events from {bucket_id_from}!") + + +# Used to universalize API of ActivityWatchClient and ServerAPI by monkeypatching +def universalize_api_accessor(api: AWAPI) -> AWAPI: + if isinstance(api, ActivityWatchClient): + api.create_events = api.insert_events + elif isinstance(api, ServerAPI): + api.insert_events = api.create_events # type: ignore + + if isinstance(api, ActivityWatchClient): + import types + + orig_export_bucket = api.export_bucket + + def export_bucket_new(self, bucket_id): + export = orig_export_bucket(bucket_id) + return export["buckets"][bucket_id] + + if api.export_bucket.__name__ != export_bucket_new.__name__: + print("monkeypatched export_bucket") + api.export_bucket = types.MethodType(export_bucket_new, api) + + return api -def export_dbfile(filepath: str=None): +def incremental_export() -> None: """Open a db, return the export of that db""" - api = get_apiobject(filepath) - export = api.export_all() - return export - # TODO: do export, see aw_server.api.ServerAPI + test_folder = Path(SYNC_FOLDER + "/test-incremental") + create_testdbs(test_folder) + # API of local sync database + # TODO: Give sync files unique, identifiable, names + filepath_local = test_folder / 'main.sqlite' + api_local = get_apiobject(filepath_local) -def create_testdbs(testdir): - create_testdb(testdir + "/sync-test-1.db", hostname="host1") - create_testdb(testdir + "/sync-test-2.db", hostname="host2") + # Push all changes to the sync db of localhost + awc = ActivityWatchClient(testing=True) + for bucket_id in awc.get_buckets(): + sync_bucket(awc, api_local, bucket_id, bucket_id) + # Fetch all changes to the local db of localhost + for filepath in Path(test_folder).glob('*.sqlite'): + if filepath == filepath_local: + continue + api_from = get_apiobject(Path(filepath)) + buckets_remote = api_from.get_buckets() -def merge_exports(exports): - master = {} - for export in exports: - for bid, bucket in export.items(): - if bid in master: - raise Exception('bucket collision') - master[bid] = bucket - return master + # TODO: Be careful which buckets get synced! There might be bucket-name collisions! + for bucket_id in buckets_remote: + sync_bucket(api_from, awc, bucket_id, bucket_id + "-remote-test") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - sync_folder = "/home/erb/Cosmosync/ActivityWatch" - create_testdbs(sync_folder) - - exports = [] - for filepath in Path('/home/erb/Cosmosync/ActivityWatch/').iterdir(): - export = export_dbfile(str(filepath)) - exports.append(export) - - print("Successfully exported {} databases".format(len(exports))) - - merged_export = merge_exports(exports) - print(merged_export) + incremental_export() From dd75507ef568a3288a8c343f73899fdc1c9d0b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 6 May 2019 18:12:43 +0200 Subject: [PATCH 5/5] improved syncing --- aw_sync/main.py | 59 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/aw_sync/main.py b/aw_sync/main.py index f985309f..a75940e9 100644 --- a/aw_sync/main.py +++ b/aw_sync/main.py @@ -4,7 +4,10 @@ from typing import Union import os +from requests.exceptions import HTTPError + from aw_core import Event +from aw_core.log import setup_logging from aw_client import ActivityWatchClient from aw_server.api import ServerAPI from aw_datastore.datastore import Datastore @@ -14,12 +17,18 @@ AWAPI = Union[ActivityWatchClient, ServerAPI] +# Makes things log doubly +#setup_logging("aw-sync", testing=False, verbose=False, log_stderr=True, log_file=False) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + def save_export_to_db(export): p = SqliteStorage(testing=True, filepath=SYNC_FOLDER + '/master_sync.sqlite') now = datetime.now(tz=timezone.utc) p.create_bucket('test', 'test', 'localhost', 'localhost', now) - print(p.buckets()) + logger.debug(p.buckets()) def get_apiobject(filepath: Path) -> ServerAPI: @@ -33,7 +42,7 @@ def get_apiobject(filepath: Path) -> ServerAPI: def create_testdbs(testpath: Path): create_testdb(testpath / "sync-test-1.sqlite", hostname="host1") - create_testdb(testpath / "sync-test-2.sqlite", hostname="host2") + #create_testdb(testpath / "sync-test-2.sqlite", hostname="host2") def create_testdb(filepath: Path, hostname: str="unknown-host"): @@ -42,7 +51,7 @@ def create_testdb(filepath: Path, hostname: str="unknown-host"): if bid not in api.get_buckets(): api.create_bucket(bid, "test-type", "aw_sync", hostname) api.create_events(bid, [Event(data={"test": 1})]) - print(f"Created test db {bid}") + logger.info(f"Created test db {bid}") # Sync new events @@ -50,41 +59,56 @@ def sync_bucket(api_from: AWAPI, api_to: AWAPI, bucket_id_from: str, bucket_id_t api_from = universalize_api_accessor(api_from) api_to = universalize_api_accessor(api_to) - print(f"Syncing {bucket_id_from} to {api_to}...") + logger.info(f"Syncing {bucket_id_from} to {bucket_id_to} in {api_to}...") + buckets_from = api_from.get_buckets() buckets_to = api_to.get_buckets() + + assert bucket_id_from in buckets_from + if bucket_id_to not in buckets_to: # Do full first import export = api_from.export_bucket(bucket_id_from) export['id'] = bucket_id_to api_to.import_bucket(export) - print(f"Imported new bucket {bucket_id_from} as {bucket_id_to}!") + logger.info(f"Imported new bucket {bucket_id_from} as {bucket_id_to}!") else: - last_event_local = api_to.get_events(bucket_id_from, limit=1) or None + c_from = api_from.get_eventcount(bucket_id_from) + c_to = api_to.get_eventcount(bucket_id_to) + if c_from != c_to: + # TODO: If this happens when buckets are up-to-date timewise + # then buckets have diverged (something must have gone wrong), + # and the sync should be redone in full. + logger.warning(f"Event count differed. From: {c_from} vs To: {c_to}") + + last_event_local = api_to.get_events(bucket_id_to, limit=1) or None + if last_event_local: last_event_local = last_event_local[0] synced_until = last_event_local.timestamp else: synced_until = None + new_events = sorted(api_from.get_events(bucket_id_from, start=synced_until, limit=-1), key=lambda e: e.timestamp) # Send the first event as a heartbeat, as it could be an updated version of the last local event - if len(new_events) > 0: + if last_event_local and len(new_events) > 0: first_new_event = new_events[0] - if last_event_local.timestamp == first_new_event.timestamp: + if last_event_local and last_event_local.timestamp == first_new_event.timestamp: api_to.heartbeat(bucket_id_to, first_new_event, 0) + new_events = new_events[1:] #for e in new_events: # print(e) - new_events = new_events[1:] # Unset the ID for the new events for e in new_events: e['id'] = None api_to.insert_events(bucket_id_to, new_events) # type: ignore - print(f"Fetched {len(new_events)} new events from {bucket_id_from}!") + if len(new_events) > 0: + logger.info(f"Fetched {len(new_events)} new events from {bucket_id_from}!") # Used to universalize API of ActivityWatchClient and ServerAPI by monkeypatching @@ -104,7 +128,7 @@ def export_bucket_new(self, bucket_id): return export["buckets"][bucket_id] if api.export_bucket.__name__ != export_bucket_new.__name__: - print("monkeypatched export_bucket") + logger.debug("monkeypatched export_bucket") api.export_bucket = types.MethodType(export_bucket_new, api) return api @@ -118,17 +142,22 @@ def incremental_export() -> None: # API of local sync database # TODO: Give sync files unique, identifiable, names filepath_local = test_folder / 'main.sqlite' - api_local = get_apiobject(filepath_local) + api_staging = get_apiobject(filepath_local) - # Push all changes to the sync db of localhost + # Push all changes to non-remote buckets to the sync db of localhost + logger.info("PUSHING") awc = ActivityWatchClient(testing=True) - for bucket_id in awc.get_buckets(): - sync_bucket(awc, api_local, bucket_id, bucket_id) + for bucket_id in sorted(awc.get_buckets().keys()): + # This would be better as a value set in the upcoming `data` attribute of buckets + if 'remote' not in bucket_id: + sync_bucket(awc, api_staging, bucket_id, bucket_id) # Fetch all changes to the local db of localhost + logger.info("PULLING") for filepath in Path(test_folder).glob('*.sqlite'): if filepath == filepath_local: continue + # print(filepath, filepath_local) api_from = get_apiobject(Path(filepath)) buckets_remote = api_from.get_buckets()