Skip to content

A return type of a dict of jobs from a Job is interpreted as a replace#856

Open
vineetbansal wants to merge 6 commits intomaterialsproject:mainfrom
vineetbansal:vb/job_dict
Open

A return type of a dict of jobs from a Job is interpreted as a replace#856
vineetbansal wants to merge 6 commits intomaterialsproject:mainfrom
vineetbansal:vb/job_dict

Conversation

@vineetbansal
Copy link
Contributor

@vineetbansal vineetbansal commented Mar 4, 2026

Summary

In a previously accepted PR, we put in some logic to check if a Job (or a tuple/list of Jobs) is returned by a @job. In those cases it is automatically interpreted as a Replace.

The motivation for that PR was that the @job specifies the dependencies, but the actual mechanics of replacement (Response/replace) are handled internally in jobflow. This has worked well for our suite of recipes, but we've stumbled on a use-case that might need one small enhancement to this behavior.

We found that since a @job that spawns other Jobs needs to return all of them instead of the ones that may be of direct interest to the caller (otherwise the non-returned jobs are never added to the DAG), we're finding that it is useful in certain jobs to namespace the dependent Jobs for easier use by the caller by returning a dict. Something like:


@job
def add
..

@job
def sub
..

@job
def add_distributed(list_a):
    add_jobs = [add(val, 1) for val in list_a]
    sub_jobs = [sub(val, 1) for val in list_a]
    return {"add": add_jobs, "sub": sub_jobs}

This PR adds this functionality while preserving the output structure. I've included a test to demonstrate this.
This change allows jobflow to work with all our recipes.

Let me know if there are any questions or concerns with this. Thanks!

Checklist

Work-in-progress pull requests are encouraged, but please put [WIP] in the pull request
title.

Before a pull request can be merged, the following items must be checked:

  • Code is in the standard Python style.
    The easiest way to handle this is to run the following in the correct sequence on
    your local machine. Start with running black on your new code. This will
    automatically reformat your code to PEP8 conventions and removes most issues. Then run
    pycodestyle, followed by flake8.
  • Docstrings have been added in theNumpy docstring format.
    Run pydocstyle on your code.
  • Type annotations are highly encouraged. Run mypy to
    type check your code.
  • Tests have been added for any new functionality or bug fixes.
  • All linting and tests pass.

Note that the CI system will run all the above checks. But it will be much more
efficient if you already fix most errors prior to submitting the PR. It is highly
recommended that you use the pre-commit hook provided in the repository. Simply
pip install pre-commit and then pre-commit install and a check will be run
prior to allowing commits.

@vineetbansal
Copy link
Contributor Author

vineetbansal commented Mar 4, 2026

I just saw that @gpetretto submitted a small fix to the previous PR. That particular issue does not exist in this PR since everything is inside the if block, but let me add a small check in the unit test for this as well...

Copy link
Contributor

@gpetretto gpetretto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vineetbansal,

thanks for the addition. I have left a few small comments. In particular it would be good to address here the problem reported in #855.

@vineetbansal
Copy link
Contributor Author

vineetbansal commented Mar 4, 2026

Thanks @gpetretto. I think I've addressed the concerns here.

Some tests had to be tweaked slightly because with this change:

From:

    if isinstance(replace, (list, tuple)):
        replace = Flow(jobs=replace)

to:

    if isinstance(replace, (list, tuple)):
        if isinstance(replace, tuple):
            seq_output: tuple | list = tuple(j.output for j in replace)
        else:
            seq_output = [j.output for j in replace]
        replace = Flow(jobs=list(replace), output=seq_output)

The last response.replace becomes a store_inputs job.

However, in doing this, I noticed something that may or may not indicate something problematic. This snippet in one of the tests:

    test_job = Job(replace_flow, config=nopass_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace:
        assert j.config.manager_config == {}

becomes:

    test_job = Job(replace_flow, config=nopass_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace[:-1]:
        assert j.config.manager_config == {}
    assert response.replace[-1].config.manager_config == manager_config2

The fact that the last response.replace.config.manager_config is manager_config2 now doesn't make sense to me. It seems to me like in returning the output of each individual job in the iterable, we may have stumbled upon an unrelated bug?

@gpetretto
Copy link
Contributor

gpetretto commented Mar 5, 2026

Thanks @vineetbansal for the updates. I think they are fine, but apparently I cannot mark the comments as resolved.

Concerning the change in the config manager, you are absolutely right, I think it is a bug. The problem is that the store_inputs job (as would be any other Job obtained from the @job decorator) share the same instace of JobConfig if it is passed to the decorator itself. So, when during the tests the job.config.manager_config is modified multiple times, it changes for all the others jobs as well. I think this functionality is not used very often and if jobs are executed in isolated environments (like in fireworks and jobflow-remote) one would never have this issue. Still, it is a potential source of errors and it would be better to fix this.

I think that the correct solution would be to make this change:

--- a/src/jobflow/core/job.py
+++ b/src/jobflow/core/job.py
@@ -184,6 +184,7 @@ def job(
     --------
     Job, .Flow, .Response
     """
+    from copy import deepcopy

     def decorator(func):
         from functools import wraps
@@ -214,7 +215,7 @@ def job(
                         args = args[1:]

             return Job(
-                function=f, function_args=args, function_kwargs=kwargs, **job_kwargs
+                function=f, function_args=args, function_kwargs=kwargs, **deepcopy(job_kwargs)
             )

I briefly tested and with this the tests can go back to their original form. It makes sense to make sure that the same instances of the jobs_kwargs elements are not shared among the different instances of the jobs created from a single decorated function. @utf do you see any problem with this? or do you have any better solution?

@vineetbansal
Copy link
Contributor Author

Thanks @gpetretto - that makes sense. I've removed the last assert (which will change if/when this deepcopy fix is introduced), since it is unrelated to the change we're trying to make here. Without that assert, it is clear that what we're testing is that the config for all the replaced jobs (except the last store_inputs job) is empty.

I'll open up another issue on the deepcopy stuff in case it has deeper implications that we may want to investigate.

@vineetbansal
Copy link
Contributor Author

@utf - perhaps you can take a look? We're not introducing any deep changes here other than interpreting dict return of jobs as replace (list of jobs being returned are already handled as a replace and were approved already).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants