Retry peeruserimport task on Database or connection errors#13821
Conversation
4e2fbc9 to
236f654
Compare
Build Artifacts
|
rtibbles
left a comment
There was a problem hiding this comment.
I think we can maintain the current separation of concerns, and it may be worth the effort of adding a new column to track the retries rather than keeping it in the extra_metadata.
To allow us to migrate the SQLAlchemy table, adding alembic as a dependency feels a bit heavy duty. So perhaps the answer is to clear the jobs table of any finished tasks, then dump the remainder to a temporary CSV, clear the table, recreate, and then reload the data?
| permission_classes=None, | ||
| long_running=False, | ||
| status_fn=None, | ||
| retry_on=None, |
There was a problem hiding this comment.
Good job avoiding a classic Python gotcha! (passing mutable values as default arguments, such as [] is a very common mistake that can cause issues)
| total_progress=0, | ||
| result=None, | ||
| long_running=False, | ||
| retry_on=None, |
There was a problem hiding this comment.
I feel like we don't need to store this in the job object - we're not allowing this to be customized per job, only per task - so I think we can just reference this from the task itself, rather than having to pass it in at job initialization. This also saves us having to coerce the exception classes to import paths.
| ) | ||
| setattr(current_state_tracker, "job", None) | ||
|
|
||
| def should_retry(self, exception): |
There was a problem hiding this comment.
I think I'd rather defer all this logic to the reschedule_finished_job_if_needed method on the storage class, rather than having it in the job class.
|
|
||
| def should_retry(self, exception): | ||
| retries = self.extra_metadata.get("retries", 0) + 1 | ||
| self.extra_metadata["retries"] = retries |
There was a problem hiding this comment.
I am a bit iffy about using extra_metadata for tracking this - I think if we want to hack the existing schema, 'repeat' is probably a better place for this, but I wonder if instead we should add to the job table schema to add error_retries so that we can put a sensible default in place for failing tasks so they don't endlessly repeat.
I also think I'd rather have the retry interval defined by the task registration (we could also set a sensible default if retryable exceptions are set).
6a0c872 to
55e3dba
Compare
a2765cb to
1a2f204
Compare
| from django import db | ||
|
|
||
| # Destroy current connections and create new ones: | ||
| db.connections.close_all() | ||
| db.connections = db.ConnectionHandler() |
There was a problem hiding this comment.
I have removed these db.connections overrides and have used the patch("django.db.connections" instead. These overrides were having some side effects on the job tests that involved having multiple threads, and it was messing things up in the teardown process.
However, not sure if removing these lines may cause somehow a false positive in the test.
| from django import db | ||
|
|
||
| db.connections["default"].connection = None |
There was a problem hiding this comment.
idem, will instead rely on the django.db.connections patch. But not sure if this may cause false positives.
cd9821f to
402dadd
Compare
rtibbles
left a comment
There was a problem hiding this comment.
The Exception/BaseException validation needs to be cleaned up, as well as the DatabaseLockedError, as I don't think it will catch what we are hoping it will catch.
The Pragma setting, if it's not being done for the additional databases can be deferred to follow up.
Import of storage from main is not a blocker, just a thought.
| if not isinstance(retry_on, list): | ||
| raise TypeError("retry_on must be a list of exceptions") | ||
| for item in retry_on: | ||
| if not issubclass(item, Exception): |
There was a problem hiding this comment.
We should change this to BaseException - it's a little uncommon, but sometimes exceptions are subclassed from this rather than the Exception class: https://docs.python.org/3/library/exceptions.html#BaseException
| def set_sqlite_pragmas(self): | ||
| """ | ||
| Sets the connection PRAGMAs for the sqlalchemy engine stored in self.engine. | ||
| Sets the connection PRAGMAs for the sqlite database. |
There was a problem hiding this comment.
Now this is managed via Django... I think we should be doing this already, and if we're not doing it for all of the additional DBs, we should be.
There was a problem hiding this comment.
Yes! I recall that we were just doing this for the default db, thats why I kept this function here
|
|
||
| def _update_job(self, job_id, state=None, **kwargs): | ||
| with self.session_scope() as session: | ||
| with transaction.atomic(using=self._get_job_database_alias()): |
There was a problem hiding this comment.
I assume this is needed because transaction.atomic by default only operates on the default database?
| "saved_job": job.to_json(), | ||
| } | ||
|
|
||
| if orm_job: |
There was a problem hiding this comment.
Could potentially use update_or_create here - but given that we already know, this seems fine to me.
| return executor(max_workers=max_workers) | ||
|
|
||
|
|
||
| class DatabaseLockedError(OperationalError): |
There was a problem hiding this comment.
I am not sure when this would ever get raised, because we have defined it here, but then we are never using it?
For this to work, it would have to be raised by the sync task that has it as an exception that it can retry on? We have some similar logic in our middleware that raises 502s on requests - perhaps we could create a broader context manager that catches OperationalErrors and reraises them as DatabaseLockedErrors if it meets the criterion?
There was a problem hiding this comment.
Was a bit confused when creating this class, removed it, and used the OperationalError class instead!
| raise TypeError("time delay must be a datetime.timedelta object") | ||
|
|
||
|
|
||
| def validate_exception(value): |
There was a problem hiding this comment.
Is this being used? It seems that this validation was happening inline elsewhere? (noting that here BaseException is being used though!)
There was a problem hiding this comment.
Yes! It is being used here https://github.com/AlexVelezLl/kolibri/blob/402dadd608f01e48679bb4d528389d5ee93553f4/kolibri/core/tasks/storage.py#L517.
I think the inline validation you are talking about is this one https://github.com/AlexVelezLl/kolibri/blob/fix-lod-import-multi-users/kolibri/core/tasks/registry.py#L270, but that one is validating the class; this validate_exception is validating the object.
| connection = db_connection() | ||
|
|
||
| storage = Storage(connection) | ||
| storage = Storage() |
There was a problem hiding this comment.
I wonder.. could we just import the storage object from main here?
There was a problem hiding this comment.
Seems like a good idea! 😅
| self.future_job_mapping = {} | ||
|
|
||
| self.storage = Storage(connection) | ||
| self.storage = Storage() |
|
Thanks @rtibbles! I have addressed all your comments! |
rtibbles
left a comment
There was a problem hiding this comment.
All my comments are addressed! Let's get this QAed.
|
@pcenov @radinamatic hopefully the issue has the details needed for replication - this has ended up being a slightly larger refactor, so doing some additional smoke tests of some async tasks, like content imports, and also checking some different syncs also. There's also a possibility for regression in the Android App, so if we can test the import workflows on Android as well, that would be very helpful. |
6982ecd to
6c5c778
Compare
|
Hi @pcenov! This should be ready for another round of QA. I have made some changes that may have fixed these two comments: #13821 (comment) and #13821 (comment). In general, it should be okay if some tasks fail due to server overload, but if we retry them manually, we should eventually be able to import the learners. Infinite loads should be fixed now. |
|
Hi @AlexVelezLl - it seems that this improvement should be further discussed with @rtibbles so that we can get a better idea of what exactly is the expected behaviour and how many users it should be possible to import without too much trouble. Currently while I am technically able to import a few users simultaneously, when I try to do that with lets say about 20 users then the import takes a very long time, it increases the CPU usage of the devices and I am still getting multiple errors in the console and many of the users are not getting imported: multiple.users.mp4WindowsServer.zip Unless we have clarity on the matter of the number of users that we can handle reliably, then I can't reliably confirm through manual testing that this is an actual improvement, as I am constantly getting inconsistent results. |
|
Thanks @pcenov, yes, it will heavily depend on the device and server resources. We had discussed with @rtibbles, that we can potentially build a bulk import flow, where the admin can select multiple users at the same time, and then import them in a single action, but that will require some more decisions. Another option, @rtibbles, would be to run just one task at a time and make the create task request only after the previous task has already been completed; that would take much longer, but would reduce errors much more. The other thing is that if the devices don't have many resources, could we prompt the admin to adjust the |
|
Follow up for further improvement: #14238 |
rtibbles
left a comment
There was a problem hiding this comment.
This is an improvement on the current workflow. We've added a follow up to handle this completely robustly, but there's a lot of work here that will be more generally useful, and I don't want the perfect to be the enemy of the good.
b1624d0
into
learningequality:release-v0.19.x
Summary
retry_onargument in the@taskdecorator to specify a list of potential non-deterministic exceptions that can be retried if the job failed because of them.grab.mov
References
Closes #11836.
Reviewer guidance