From ba24711f8d09e9ee20afcb5ae49d647511246a65 Mon Sep 17 00:00:00 2001 From: Alexander Oganezov Date: Fri, 5 Jun 2026 09:30:03 -0700 Subject: [PATCH 01/11] DAOS-19031 cart: Fix test_ep_cred_client race condition (#18439) Fix test race condition where status was checked outside of the completion callback. Signed-off-by: Alexander A Oganezov --- src/tests/ftest/cart/test_ep_cred_client.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tests/ftest/cart/test_ep_cred_client.c b/src/tests/ftest/cart/test_ep_cred_client.c index 83686667d76..558e5809b60 100644 --- a/src/tests/ftest/cart/test_ep_cred_client.c +++ b/src/tests/ftest/cart/test_ep_cred_client.c @@ -37,8 +37,10 @@ static void rpc_handle_ping_front_q(const struct crt_cb_info *info) { DBG_PRINT("Response from front queued rpc\n"); - D_ASSERTF(info->cci_rc == 0, "rpc response failed. rc: %d\n", - info->cci_rc); + D_ASSERTF(info->cci_rc == 0, "rpc response failed. rc: %d\n", info->cci_rc); + + /* sent_count == resp_count means rpc didn't get queued in the front */ + D_ASSERTF(sent_count != resp_count, "Send count matches response count\n"); sem_post(&test.tg_queue_front_token); } @@ -130,8 +132,6 @@ test_run() D_ASSERTF(rc == 0, "crt_req_send() failed. rc: %d\n", rc); crtu_sem_timedwait(&test.tg_queue_front_token, 61, __LINE__); - D_ASSERTF(sent_count != resp_count, - "Send count matches response count\n"); } DBG_PRINT("Waiting for responses to %d rpcs\n", From 3ed11cd452f0e772c01bd56860f8d81f726eeafc Mon Sep 17 00:00:00 2001 From: Cedric Koch-Hofer <94527853+knard38@users.noreply.github.com> Date: Sat, 6 Jun 2026 01:12:20 +0200 Subject: [PATCH 02/11] DAOS-19019 spdk: fix two memory leaks in NVMe control bindings (#18427) opts_add_pci_addr() allocates opts.pci_allowed via realloc() but daos_spdk_init() never frees it after calling spdk_env_init(). Add free(opts.pci_allowed) at the out: label so both the success and error paths release the buffer. _collect() allocates ctrlr->pci_addr via calloc() for each controller, but free_ctrlr_fields() freed all other heap fields (model, serial, fw_rev, vendor_id, pci_type) while omitting pci_addr. Add the missing free(). Two new unit tests are added to nvme_control_ut.c to verify under ASan that opts.pci_allowed and pci_addr are both freed on return. Also fix mock_spdk_nvme_ctrlr_get_pci_device() to use a static local pool instead of calloc(), which was itself leaking under ASan. Signed-off-by: Cedric Koch-Hofer --- src/control/lib/spdk/ctests/nvme_control_ut.c | 72 +++++++++++++++---- src/control/lib/spdk/src/nvme_control.c | 2 + .../lib/spdk/src/nvme_control_common.c | 2 + 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/src/control/lib/spdk/ctests/nvme_control_ut.c b/src/control/lib/spdk/ctests/nvme_control_ut.c index 94eb5745ab7..ac5727af821 100644 --- a/src/control/lib/spdk/ctests/nvme_control_ut.c +++ b/src/control/lib/spdk/ctests/nvme_control_ut.c @@ -17,12 +17,16 @@ #include "nvme_internal.h" #include "../include/nvme_control_common.h" +#include "../include/nvme_control.h" #if D_HAS_WARNING(4, "-Wframe-larger-than=") #pragma GCC diagnostic ignored "-Wframe-larger-than=" #endif +#define MAX_MOCK_PCI_DEVS 16 + static struct ret_t *test_ret; +static int mock_pci_dev_count; /** * ============================== @@ -85,15 +89,18 @@ mock_copy_ctrlr_data(struct nvme_ctrlr_t *ctrlr, const struct spdk_nvme_ctrlr_da return 0; } +/* + * Return a unique device per call from a static local pool (indexed by + * mock_pci_dev_count, reset in teardown). + */ static struct spdk_pci_device * mock_spdk_nvme_ctrlr_get_pci_device(struct spdk_nvme_ctrlr *ctrlr) { - struct spdk_pci_device *dev; - - dev = calloc(1, sizeof(struct spdk_pci_device)); + static struct spdk_pci_device devs[MAX_MOCK_PCI_DEVS]; (void)ctrlr; - return dev; + assert_true(mock_pci_dev_count < MAX_MOCK_PCI_DEVS); + return &devs[mock_pci_dev_count++]; } static int @@ -274,6 +281,46 @@ test_get_controller(void **state) assert_return_code(rc, 0); } +/* + * This test should be run with ASan to detect the memory leak fixed by DAOS-19019. It verifies + * that the opts.pci_allowed buffer built by opts_add_pci_addr() is freed before daos_spdk_init() + * returns. + */ +static void +test_daos_spdk_init_pci_freed(void **state) +{ + char *pcil[] = {"0000:01:00.0"}; + + (void)state; + + test_ret = daos_spdk_init(0, NULL, 1, pcil); + /* spdk_env_init() is expected to fail in a unit-test context (no hugepages) */ + assert_non_null(test_ret); +} + +/* + * This test should be run with ASan to detect the memory leak fixed by DAOS-19019. It verifies + * that pci_addr is freed by teardown via clean_ret() -> free_ctrlr_fields(). + */ +static void +test_collect_pci_addr_freed(void **state) +{ + (void)state; + + /* + * Inject two mock controllers and drive _collect() via function pointers instead of real + * SPDK calls. + */ + attach_mock_controllers(); + test_ret = init_ret(); + _collect(test_ret, &mock_copy_ctrlr_data, &mock_spdk_nvme_ctrlr_get_pci_device, + &mock_spdk_pci_device_get_numa_id, &mock_spdk_pci_device_get_type); + assert_int_equal(test_ret->rc, 0); + assert_non_null(test_ret->ctrlrs); + assert_non_null(test_ret->ctrlrs->pci_addr); + assert_non_null(test_ret->ctrlrs->next->pci_addr); +} + static int setup(void **state) { @@ -291,6 +338,7 @@ teardown(void **state) free(test_ret); test_ret = NULL; cleanup(false); + mock_pci_dev_count = 0; return 0; } @@ -299,15 +347,13 @@ int main(void) { const struct CMUnitTest tests[] = { - cmocka_unit_test_setup_teardown(test_discover_null_controllers, - setup, teardown), - cmocka_unit_test_setup_teardown(test_discover_set_controllers, - setup, teardown), - cmocka_unit_test_setup_teardown(test_discover_probe_fail, setup, - teardown), - cmocka_unit_test_setup_teardown(test_collect, setup, teardown), - cmocka_unit_test_setup_teardown(test_get_controller, setup, - teardown), + cmocka_unit_test_setup_teardown(test_discover_null_controllers, setup, teardown), + cmocka_unit_test_setup_teardown(test_discover_set_controllers, setup, teardown), + cmocka_unit_test_setup_teardown(test_discover_probe_fail, setup, teardown), + cmocka_unit_test_setup_teardown(test_collect, setup, teardown), + cmocka_unit_test_setup_teardown(test_get_controller, setup, teardown), + cmocka_unit_test_setup_teardown(test_daos_spdk_init_pci_freed, setup, teardown), + cmocka_unit_test_setup_teardown(test_collect_pci_addr_freed, setup, teardown), }; return cmocka_run_group_tests_name("control_nvme_control_ut", tests, diff --git a/src/control/lib/spdk/src/nvme_control.c b/src/control/lib/spdk/src/nvme_control.c index d609e485726..bc2b0019ab5 100644 --- a/src/control/lib/spdk/src/nvme_control.c +++ b/src/control/lib/spdk/src/nvme_control.c @@ -1,5 +1,6 @@ /** * (C) Copyright 2018-2022 Intel Corporation. + * (C) Copyright 2026 Hewlett Packard Enterprise Development LP * (C) Copyright 2025 Google LLC * * SPDX-License-Identifier: BSD-2-Clause-Patent @@ -522,6 +523,7 @@ daos_spdk_init(int mem_sz, char *env_ctx, size_t nr_pcil, char **pcil) } out: + free(opts.pci_allowed); ret->rc = rc; return ret; } diff --git a/src/control/lib/spdk/src/nvme_control_common.c b/src/control/lib/spdk/src/nvme_control_common.c index 11bd00b4bc4..ac0d8eaea88 100644 --- a/src/control/lib/spdk/src/nvme_control_common.c +++ b/src/control/lib/spdk/src/nvme_control_common.c @@ -138,6 +138,8 @@ free_ctrlr_fields(struct nvme_ctrlr_t *ctrlr) free(ctrlr->vendor_id); if (ctrlr->pci_type != NULL) free(ctrlr->pci_type); + if (ctrlr->pci_addr != NULL) + free(ctrlr->pci_addr); } void From 0ef5142a6a1d12a936482e6341a376f7103c7e54 Mon Sep 17 00:00:00 2001 From: Nasf-Fan Date: Mon, 8 Jun 2026 12:59:11 +0800 Subject: [PATCH 03/11] DAOS-19059 vos: cache vos object after DTX commit (#18416) It is unnecessary to evict the vos object from cache after related DTX committed; otherwise, other concurrent modification against the same object shard maybe required to retry. Signed-off-by: Fan Yong --- src/vos/vos_dtx.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 55bf163f2a6..359ae7114f6 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -2688,6 +2688,8 @@ vos_dtx_post_handle(struct vos_container *cont, } for (i = 0; i < count; i++) { + struct vos_dtx_act_ent *dae = NULL; + if (daes[i] == NULL) continue; @@ -2704,9 +2706,18 @@ vos_dtx_post_handle(struct vos_container *cont, } d_iov_set(&kiov, &DAE_XID(daes[i]), sizeof(DAE_XID(daes[i]))); - rc = dbtree_delete(cont->vc_dtx_active_hdl, BTR_PROBE_EQ, - &kiov, NULL); + /* + * For abort case, set @args as NULL, then related vos object will be evicted from + * cache via dbtree_delete(). + */ + rc = dbtree_delete(cont->vc_dtx_active_hdl, BTR_PROBE_EQ, &kiov, + abort ? NULL : &dae); if (rc == 0 || rc == -DER_NONEXIST) { + if (dae != NULL) { + D_ASSERT(dae == daes[i]); + dtx_act_ent_cleanup(cont, dae, false, false); + } + dtx_evict_lid(cont, daes[i]); } else { /* The DTX entry has been committed or aborted, but we From 5500b145b4406fd27809461672bfa7f48fd3a657 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Mon, 8 Jun 2026 08:17:17 -0700 Subject: [PATCH 04/11] DAOS-19085 cq: retire the release/2.6.4-* branches (#18452) Retire these release branches now that 2.6.5 is released. Signed-off-by: Dalton Bohning --- .github/dependabot.yml | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index d51495a070b..44696777ada 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -55,31 +55,3 @@ updates: - daos-stack/actions-watchers commit-message: prefix: "Doc-only: true \n" - - - package-ecosystem: github-actions - target-branch: release/2.6.4-aurora - directory: / - schedule: - interval: weekly - groups: - gha-versions: - patterns: - - "*" - reviewers: - - daos-stack/actions-watchers - commit-message: - prefix: "Doc-only: true \n" - - - package-ecosystem: github-actions - target-branch: release/2.6.4-mfg - directory: / - schedule: - interval: weekly - groups: - gha-versions: - patterns: - - "*" - reviewers: - - daos-stack/actions-watchers - commit-message: - prefix: "Doc-only: true \n" From 1e28381f68b1da9c60df3e13417146e7cecd38c0 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Mon, 8 Jun 2026 11:37:32 -0700 Subject: [PATCH 05/11] DAOS-18903 test: speedup ftest/nvme/io.py (#18370) Speedup ftest/nvme/io.py by: - Increasing the client processes - Reducing some aggregate file sizes Signed-off-by: Dalton Bohning --- src/tests/ftest/nvme/io.py | 12 ++++++++---- src/tests/ftest/nvme/io.yaml | 27 +++++++++++++-------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/tests/ftest/nvme/io.py b/src/tests/ftest/nvme/io.py index 1a26b2c180e..a4238eb4c21 100644 --- a/src/tests/ftest/nvme/io.py +++ b/src/tests/ftest/nvme/io.py @@ -1,5 +1,6 @@ """ (C) Copyright 2020-2023 Intel Corporation. + (C) Copyright 2026 Hewlett Packard Enterprise Development LP SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -47,10 +48,13 @@ def test_nvme_io(self): size_before_ior = self.pool.info # Run ior with the parameters specified for this pass - self.ior_cmd.transfer_size.update(ior_param[1]) - self.ior_cmd.block_size.update(ior_param[2]) - self.ior_cmd.dfs_oclass.update(obj_type) - self.ior_cmd.set_daos_params(self.pool, self.container.identifier) + self.ior_cmd.update_params( + transfer_size=ior_param[1], + block_size=ior_param[2], + dfs_oclass=obj_type, + dfs_pool=self.pool.identifier, + dfs_cont=self.container.identifier + ) self.run_ior(self.get_ior_job_manager_command(), ior_param[3]) # Verify IOR consumed the expected amount from the pool diff --git a/src/tests/ftest/nvme/io.yaml b/src/tests/ftest/nvme/io.yaml index 972149d633b..7e02abfc8df 100644 --- a/src/tests/ftest/nvme/io.yaml +++ b/src/tests/ftest/nvme/io.yaml @@ -2,7 +2,7 @@ hosts: test_servers: 4 test_clients: 1 -timeout: 28800 +timeout: 10800 server_config: name: daos_server @@ -14,14 +14,13 @@ server_config: pool: properties: rd_fac:0,space_rb:0,reclaim:disabled + set_logmasks: False # The test creates ~100 pools so do not set log masks for each pool container: - control_method: daos type: POSIX ior: flags: -w -r -k -vv - repetitions: 1 test_file: /testFile object_type: - SX @@ -36,14 +35,14 @@ ior: dfs_destroy: false ior_sequence: # - [pool_size, transfer_size, block_size, client_slots] - - [75%, 1048576, 17179869184, 1] # [75%, 1M, 16G, 1] - - [900G, 4096, 1073741824, 1] # [900G, 4k, 1G, 1] - - [900G, 2048, 838860800, 1] # [900G, 2k, 800M, 1] - - [4%, 8, 10485760, 1] # [4%, 8B, 10M, 1] - - [4%, 24, 25165824, 1] # [4%, 24B, 24M, 1] - - [4%, 2056, 4227136, 1] # [4%, 2056B, 4M, 1] - - [95%, 134217728, 34359738368, 1] # [95%, 128M, 32G, 1] - - [40%, 32, 44040192, 1] # [40%, 32B, 42M, 1] - - [95%, 2048, 1073741824, 2] # [95%, 2k, 1G, 2] - - [95%, 8, 10485760, 2] # [95%, 8B, 10M, 2] - - [95%, 33554432, 8589934592, 5] # [95%, 32M, 8G, 5] + - [75%, 1048576, 2147483648, 8] # [75%, 1M, 2G, 8] + - [900G, 4096, 134217728, 8] # [900G, 4k, 128K, 8] + - [900G, 2048, 104857600, 8] # [900G, 2k, 100M, 8] + - [4%, 8, 1048576, 8] # [4%, 8B, 1M, 8] + - [4%, 24, 3145728, 8] # [4%, 24B, 3M, 8] + - [4%, 2056, 528392, 8] # [4%, 2056B, 512K, 8] + - [95%, 134217728, 4294967296, 8] # [95%, 128M, 4G, 8] + - [40%, 32, 4194304, 8] # [40%, 32B, 4M, 8] + - [95%, 2048, 268435456, 8] # [95%, 2k, 256K, 8] + - [95%, 8, 1048576, 8] # [95%, 8B, 1M, 8] + - [95%, 33554432, 4294967296, 8] # [95%, 32M, 4G, 8] From 04cdf46643398079b0418c117068f06c4d34663c Mon Sep 17 00:00:00 2001 From: Cedric Koch-Hofer Date: Tue, 9 Jun 2026 12:18:06 +0000 Subject: [PATCH 06/11] DAOS-19122 ddb: Use idiomatic short variable declaration for logger init Replace the two-statement var declaration + assignment with a single ':=' short variable declaration, as suggested in code review. Signed-off-by: Cedric Koch-Hofer --- src/control/cmd/ddb/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/control/cmd/ddb/main.go b/src/control/cmd/ddb/main.go index 97e2603b76a..4983a4f32db 100644 --- a/src/control/cmd/ddb/main.go +++ b/src/control/cmd/ddb/main.go @@ -383,8 +383,8 @@ func runDdb(ctx *DdbContext, args []string) error { return nil } - var log *logging.LeveledLogger - if log, err = newLogger(opts); err != nil { + log, err := newLogger(opts) + if err != nil { return errors.Wrap(err, loggerInitErr) } log.Debug("Logging facilities initialized") From 84f72b5134478f9d5e0f41924e41ed15409844ea Mon Sep 17 00:00:00 2001 From: Cedric Koch-Hofer Date: Tue, 9 Jun 2026 07:52:46 +0000 Subject: [PATCH 07/11] DAOS-19122 ddb: Fix flag name collision between global and subcommand parsers go-flags processes the full argument list before dispatching to grumble. When a subcommand defines a flag with the same name as a global option (e.g. 'rm_pool --db_path', 'open -w', 'feature -s'), go-flags intercepts it as the global option and the subcommand never receives it. This was introduced as a regression by commit 86f31a2bd3 (DAOS-18304), which added a validation that rejected any invocation where --db_path was set (globally, by go-flags) without --vos_path -- even when the user correctly passed --db_path as a subcommand-level argument. Fix: add flags.PassAfterNonOption to the go-flags parser so it stops consuming flags after the first positional argument (the subcommand name). Everything after the subcommand lands in RunCmdArgs and is forwarded to grumble's own flag parser. Also: - Move vosPathMissErr to ddb_commands.go (sole use site after this fix) and add dtxAggrMutuallyExclusiveErr and dtxAggrRequiredOptErr constants to eliminate raw string literals in Go-layer error messages. - Fix a typo in the CLI long description (--vos-path -> --vos_path) and accurately list commands that manage their own pool lifecycle. - Update the two TestDdb_parseOpts cases whose behavior changed with PassAfterNonOption (--help after a subcommand now lands in RunCmdArgs instead of being processed by go-flags). Signed-off-by: Cedric Koch-Hofer --- src/control/cmd/ddb/ddb_commands.go | 8 +++++-- src/control/cmd/ddb/main.go | 11 +++++----- src/control/cmd/ddb/main_test.go | 33 +++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/control/cmd/ddb/ddb_commands.go b/src/control/cmd/ddb/ddb_commands.go index aa7f245be9e..a0e9a2a25d0 100644 --- a/src/control/cmd/ddb/ddb_commands.go +++ b/src/control/cmd/ddb/ddb_commands.go @@ -15,6 +15,10 @@ import ( "github.com/desertbit/grumble" ) +const vosPathMissErr = "Cannot use sys db path without a VOS path" +const dtxAggrMutuallyExclusiveErr = "'--cmt_time' and '--cmt_date' options are mutually exclusive" +const dtxAggrRequiredOptErr = "'--cmt_time' or '--cmt_date' option has to be defined" + func addAppCommands(app *grumble.App, ctx *DdbContext) { // Command: ls app.AddCommand(&grumble.Command{ @@ -445,10 +449,10 @@ the path must include the extent, otherwise, it must not.`, cmtTime := c.Flags.Uint64("cmt_time") cmtDate := c.Flags.String("cmt_date") if cmtTime != math.MaxUint64 && cmtDate != "" { - return fmt.Errorf("'--cmt_time' and '--cmt_date' options are mutually exclusive") + return fmt.Errorf(dtxAggrMutuallyExclusiveErr) } if cmtTime == math.MaxUint64 && cmtDate == "" { - return fmt.Errorf("'--cmt_time' or '--cmt_date' option has to be defined") + return fmt.Errorf(dtxAggrRequiredOptErr) } return ctx.DtxAggr(c.Args.String("path"), cmtTime, cmtDate) }, diff --git a/src/control/cmd/ddb/main.go b/src/control/cmd/ddb/main.go index 4983a4f32db..1fc2f98c3cb 100644 --- a/src/control/cmd/ddb/main.go +++ b/src/control/cmd/ddb/main.go @@ -92,9 +92,11 @@ shell mode. If neither a single command or '-f' option is provided, then the tool will run in interactive mode. In order to modify the VOS file, the '-w' option must be included. -If the command requires it, the VOS file must be provided with the parameter ---vos-path. The VOS file will be opened before any commands are executed. See -the command‑specific help for details. +If the command requires it, the VOS file must be provided with the parameter +--vos_path. The VOS file will be opened before any commands are executed, +except for commands that manage their own pool lifecycle (open, close, feature, +rm_pool, prov_mem, smd_sync, dev_list, dev_replace). See the command-specific +help for details. A DAOS file system can operate in different modes depending on the available hardware resources. The two primary modes are MD-on-SSD and PMEM. In MD-on-SSD mode (the default), metadata is stored @@ -104,7 +106,6 @@ MODE section of the manpage for details. const grumbleUnknownCmdErr = "unknown command, try 'help'" const runCmdArgsErr = "Cannot use both command file and a command string" -const vosPathMissErr = "Cannot use sys db path without a VOS path" const loggerInitErr = "Logging facilities cannot be initialized" const ctxInitErr = "DDB Context cannot be initialized" const vosPathOpenErr = "Error opening VOS path '%s'" @@ -286,7 +287,7 @@ func closePoolIfOpen(ctx *DdbContext, log *logging.LeveledLogger) { func parseOpts(args []string, ctx *DdbContext) (cliOptions, *flags.Parser, error) { var opts cliOptions - parser := flags.NewParser(&opts, flags.HelpFlag|flags.IgnoreUnknown) + parser := flags.NewParser(&opts, flags.HelpFlag|flags.IgnoreUnknown|flags.PassAfterNonOption) parser.Name = "ddb" parser.Usage = "[OPTIONS]" parser.ShortDescription = "daos debug tool" diff --git a/src/control/cmd/ddb/main_test.go b/src/control/cmd/ddb/main_test.go index 03c761b94b8..ba04196ad00 100644 --- a/src/control/cmd/ddb/main_test.go +++ b/src/control/cmd/ddb/main_test.go @@ -46,12 +46,37 @@ func TestDdb_parseOpts(t *testing.T) { expErr: errHelpRequested, }, "Unknown commands with help": { - args: []string{"foo", "--help"}, - expErr: errUnknownCmd, + // With PassAfterNonOption, --help that appears after the subcommand name is no longer + // processed by go-flags. It lands in RunCmdArgs and is forwarded to grumble, which + // handles it (and returns an unknown-command error for "foo"). The full flow is + // exercised in TestDdb_runDdb. + args: []string{"foo", "--help"}, + checkFunc: func(opts *cliOptions) error { + if opts.Args.RunCmd != "foo" { + return fmt.Errorf("expected RunCmd to be 'foo', got %q", opts.Args.RunCmd) + } + if len(opts.Args.RunCmdArgs) == 0 || opts.Args.RunCmdArgs[0] != "--help" { + return fmt.Errorf("expected RunCmdArgs[0] to be '--help', got %v", opts.Args.RunCmdArgs) + } + return nil + }, }, "Unknown commands with help and opt": { - args: []string{"-w", "foo", "--help"}, - expErr: errUnknownCmd, + // Same as above: -w is consumed globally (it appears before the subcommand), + // while --help after "foo" goes into RunCmdArgs. + args: []string{"-w", "foo", "--help"}, + checkFunc: func(opts *cliOptions) error { + if !opts.WriteMode { + return fmt.Errorf("expected WriteMode to be true") + } + if opts.Args.RunCmd != "foo" { + return fmt.Errorf("expected RunCmd to be 'foo', got %q", opts.Args.RunCmd) + } + if len(opts.Args.RunCmdArgs) == 0 || opts.Args.RunCmdArgs[0] != "--help" { + return fmt.Errorf("expected RunCmdArgs[0] to be '--help', got %v", opts.Args.RunCmdArgs) + } + return nil + }, }, "Default option values": { args: []string{"ls", "-d", "-r"}, From 8b05435c28745b28836ef77f485be808b71140f4 Mon Sep 17 00:00:00 2001 From: Cedric Koch-Hofer Date: Tue, 9 Jun 2026 07:54:28 +0000 Subject: [PATCH 08/11] DAOS-19122 ddb: Add regression tests for flag name collision fix - TestDdb_parseOpts: add two cases verifying PassAfterNonOption behavior: - flags after the subcommand name are NOT consumed by go-flags (regression case) - flags before the subcommand name are still consumed globally (validation still fires) - TestDdb_runDdb: complete noAutoOpen coverage for all 8 commands in the noAutoOpen list (close, prov_mem, dev_list, dev_replace were missing). - TestDdb_Cmds: remove the now-obsolete skipCmdLine escape hatch and add regression coverage for the fixed flag conflicts: - open: long/short forms of -w/--write_mode and -p/--db_path now work correctly in command-line mode - rm_pool: --db_path after subcommand correctly reaches grumble - prov_mem: -s/--tmpfs_size flag no longer consumed as global VosPath Also update dtxAggr error assertions to use the new named constants. Signed-off-by: Cedric Koch-Hofer --- src/control/cmd/ddb/ddb_commands_test.go | 89 ++++++++++++++++++------ src/control/cmd/ddb/main_test.go | 60 ++++++++++++++++ 2 files changed, 126 insertions(+), 23 deletions(-) diff --git a/src/control/cmd/ddb/ddb_commands_test.go b/src/control/cmd/ddb/ddb_commands_test.go index a01c5c16d05..47d0768fa92 100644 --- a/src/control/cmd/ddb/ddb_commands_test.go +++ b/src/control/cmd/ddb/ddb_commands_test.go @@ -120,12 +120,8 @@ func TestDdb_Cmds(t *testing.T) { setup func(*testing.T) expStdout []string expErr error - // skipCmdLine skips the command-line sub-test with a message. Use when - // a flag is shared between the CLI layer and the grumble command: go-flags - // consumes it before grumble can see it, making a clean command-line test - // impossible for that particular flag. - skipCmdLine string }{ + // --- ls command --- "ls invalid options": { args: []string{"ls", "--bar"}, expErr: ddbTestErr("invalid flag: --bar"), @@ -167,11 +163,6 @@ func TestDdb_Cmds(t *testing.T) { }, // --- open command --- - // Note: the -w/--write_mode and -p/--db_path flags of the grumble 'open' - // command share names with CLI-level flags that are consumed by go-flags - // before reaching grumble in command-line mode. The command-line test for - // those flags would silently test wrong values. They are correctly exercised - // in command-file mode; see TestRun for CLI-level flag coverage. "open default": { args: []string{"open", "/path/to/vos-0"}, setup: func(t *testing.T) { @@ -179,17 +170,29 @@ func TestDdb_Cmds(t *testing.T) { }, expStdout: []string{"open called"}, }, - "open write mode": { - args: []string{"open", "-w", "/path/to/vos-0"}, - skipCmdLine: "-w is consumed by the CLI write_mode flag before reaching grumble", + "open short write mode": { + args: []string{"open", "-w", "/path/to/vos-0"}, setup: func(t *testing.T) { ddb_run_open_Fn = openFnChecking(t, "/path/to/vos-0", "", true) }, expStdout: []string{"open called"}, }, - "open with db path": { - args: []string{"open", "-p", "/sysdb", "/path/to/vos-0"}, - skipCmdLine: "-p is consumed by the CLI db_path flag before reaching grumble", + "open long write mode": { + args: []string{"open", "--write_mode", "/path/to/vos-0"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnChecking(t, "/path/to/vos-0", "", true) + }, + expStdout: []string{"open called"}, + }, + "open with short db path": { + args: []string{"open", "-p", "/sysdb", "/path/to/vos-0"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnChecking(t, "/path/to/vos-0", "/sysdb", false) + }, + expStdout: []string{"open called"}, + }, + "open with long db path": { + args: []string{"open", "--db_path", "/sysdb", "/path/to/vos-0"}, setup: func(t *testing.T) { ddb_run_open_Fn = openFnChecking(t, "/path/to/vos-0", "/sysdb", false) }, @@ -245,11 +248,11 @@ func TestDdb_Cmds(t *testing.T) { // --cmt_date is provided. These tests exercise that Go-layer validation. "dtx_aggr both cmt_time and cmt_date": { args: []string{"dtx_aggr", "--cmt_time=0", "--cmt_date=2024-01-01"}, - expErr: ddbTestErr("mutually exclusive"), + expErr: ddbTestErr(dtxAggrMutuallyExclusiveErr), }, "dtx_aggr neither cmt_time nor cmt_date": { args: []string{"dtx_aggr"}, - expErr: ddbTestErr("has to be defined"), + expErr: ddbTestErr(dtxAggrRequiredOptErr), }, "dtx_aggr cmt_time": { args: []string{"dtx_aggr", "--cmt_time=1000"}, @@ -297,14 +300,57 @@ func TestDdb_Cmds(t *testing.T) { expStdout: []string{"version called"}, }, + // --- rm_pool command --- + "rm_pool with db_path": { + args: []string{"rm_pool", "--db_path", "/sysdb", "/mnt/pool/rdb-pool"}, + setup: func(t *testing.T) { + ddb_run_rm_pool_Fn = func(path, dbPath string) error { + fmt.Println("rm_pool called") + test.CmpAny(t, "path", "/mnt/pool/rdb-pool", path) + test.CmpAny(t, "dbPath", "/sysdb", dbPath) + return nil + } + }, + expStdout: []string{"rm_pool called"}, + }, + "rm_pool without db_path": { + args: []string{"rm_pool", "/mnt/pool/rdb-pool"}, + setup: func(t *testing.T) { + ddb_run_rm_pool_Fn = func(path, dbPath string) error { + fmt.Println("rm_pool called") + test.CmpAny(t, "path", "/mnt/pool/rdb-pool", path) + test.CmpAny(t, "dbPath", "", dbPath) + return nil + } + }, + expStdout: []string{"rm_pool called"}, + }, + + // --- prov_mem command: flag conflict --- + // -s / --tmpfs_size: short flag -s was consumed as global VosPath before PassAfterNonOption. + "prov_mem with tmpfs_size short flag": { + args: []string{"prov_mem", "-s", "10", "/db", "/mnt"}, + setup: func(t *testing.T) { + ddb_run_prov_mem_Fn = func(dbPath, tmpfsMount string, tmpfsMountSize uint) error { + fmt.Println("prov_mem called") + test.CmpAny(t, "dbPath", "/db", dbPath) + test.CmpAny(t, "tmpfsMount", "/mnt", tmpfsMount) + test.CmpAny(t, "tmpfsMountSize", uint(10), tmpfsMountSize) + return nil + } + }, + expStdout: []string{"prov_mem called"}, + }, + // TODO(follow-up PR): Add TestCmds cases for the remaining commands. // Each new test case follows the same pattern as the cases above: set the // corresponding ddb_run__Fn hook in setup() to verify argument passing, // then add the case to this table. // Commands still to be covered: superblock_dump, value_dump, rm, // value_load, ilog_dump, ilog_commit, ilog_clear, dtx_dump, dtx_cmt_clear, - // smd_sync, vea_dump, vea_update, dtx_act_commit, dtx_act_abort, rm_pool, - // dtx_act_discard_invalid, dev_list, dev_replace, dtx_stat, prov_mem. + // smd_sync, vea_dump, vea_update, dtx_act_commit, dtx_act_abort, + // dtx_act_discard_invalid, dev_list, dev_replace, dtx_stat, + // prov_mem (default, no flag). } { t.Run(name, func(t *testing.T) { checkCmd := func(t *testing.T, stdout string, err error) { @@ -320,9 +366,6 @@ func TestDdb_Cmds(t *testing.T) { } t.Run("command-line", func(t *testing.T) { - if tc.skipCmdLine != "" { - t.Skipf("skipping command-line mode: %s", tc.skipCmdLine) - } ctx := newTestContext(t) if tc.setup != nil { tc.setup(t) diff --git a/src/control/cmd/ddb/main_test.go b/src/control/cmd/ddb/main_test.go index ba04196ad00..fb5b0176e3c 100644 --- a/src/control/cmd/ddb/main_test.go +++ b/src/control/cmd/ddb/main_test.go @@ -219,6 +219,36 @@ func TestDdb_parseOpts(t *testing.T) { return nil }, }, + // PassAfterNonOption regression: a known global flag (--db_path) that appears AFTER + // the subcommand name must NOT be consumed by go-flags. It should land in RunCmdArgs + // so grumble can process it as a command-level flag. + "cmd-level --db_path after subcommand not consumed globally": { + args: []string{"rm_pool", "--db_path", "/sysdb", "/mnt/pool/rdb-pool"}, + checkFunc: func(opts *cliOptions) error { + if opts.SysdbPath != "" { + return fmt.Errorf("SysdbPath should be empty (PassAfterNonOption), got %q", opts.SysdbPath) + } + if opts.Args.RunCmd != "rm_pool" { + return fmt.Errorf("expected RunCmd to be 'rm_pool', got %q", opts.Args.RunCmd) + } + want := []string{"--db_path", "/sysdb", "/mnt/pool/rdb-pool"} + if len(opts.Args.RunCmdArgs) != len(want) { + return fmt.Errorf("expected RunCmdArgs %v, got %v", want, opts.Args.RunCmdArgs) + } + for i, w := range want { + if opts.Args.RunCmdArgs[i] != w { + return fmt.Errorf("RunCmdArgs[%d]: want %q, got %q", i, w, opts.Args.RunCmdArgs[i]) + } + } + return nil + }, + }, + // PassAfterNonOption does not affect flags that appear BEFORE the subcommand: those + // are still consumed globally, so the existing vosPathMissErr validation still fires. + "global --db_path before subcommand still consumed and validation fires": { + args: []string{"--db_path=/sysdb", "rm_pool", "/mnt/pool/rdb-pool"}, + expErr: ddbTestErr(vosPathMissErr), + }, } { t.Run(name, func(t *testing.T) { ctx := newTestContext(t) @@ -366,6 +396,36 @@ func TestDdb_runDdb(t *testing.T) { ddb_run_open_Fn = openFnMustNotBeCalled }, }, + "No auto-open for rm_pool": { + args: []string{"-s", "/foo/vos-0", "rm_pool", "/mnt/rdb-pool"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnMustNotBeCalled + }, + }, + "No auto-open for close": { + args: []string{"-s", "/foo/vos-0", "close"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnMustNotBeCalled + }, + }, + "No auto-open for prov_mem": { + args: []string{"-s", "/foo/vos-0", "prov_mem", "/db", "/mnt"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnMustNotBeCalled + }, + }, + "No auto-open for dev_list": { + args: []string{"-s", "/foo/vos-0", "dev_list", "/db"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnMustNotBeCalled + }, + }, + "No auto-open for dev_replace": { + args: []string{"-s", "/foo/vos-0", "dev_replace", "/db", "old-uuid", "new-uuid"}, + setup: func(t *testing.T) { + ddb_run_open_Fn = openFnMustNotBeCalled + }, + }, "Init failure": { args: []string{"ls"}, expErr: ddbTestErr(ctxInitErr), From c101fde7a1ffb22114113d90c61d6c25f72d63b7 Mon Sep 17 00:00:00 2001 From: Cedric Koch-Hofer Date: Tue, 9 Jun 2026 07:55:49 +0000 Subject: [PATCH 09/11] DAOS-19122 ddb: Improve feature command validation and error messages - Add Go-layer input validation to the feature command Run handler: - Enforce that exactly one of --enable, --disable, --show is provided - Reject --db_path when no VOS path argument is given - Add featureOnlyOneOptErr constant and onlyOne() helper - Add LongHelp to the feature command documenting the validation rules - Replace featureFnCheckingShow with the more general featureFnChecking factory and add string2FlagsCapturing to verify enable/disable routing - Add full test coverage for all feature flags (short/long enable, disable, show, db_path) and all validation error paths Features: recovery Signed-off-by: Cedric Koch-Hofer --- src/control/cmd/ddb/ddb_commands.go | 33 +++++-- src/control/cmd/ddb/ddb_commands_test.go | 108 +++++++++++++++++------ 2 files changed, 108 insertions(+), 33 deletions(-) diff --git a/src/control/cmd/ddb/ddb_commands.go b/src/control/cmd/ddb/ddb_commands.go index a0e9a2a25d0..ee6850e9ef7 100644 --- a/src/control/cmd/ddb/ddb_commands.go +++ b/src/control/cmd/ddb/ddb_commands.go @@ -18,6 +18,17 @@ import ( const vosPathMissErr = "Cannot use sys db path without a VOS path" const dtxAggrMutuallyExclusiveErr = "'--cmt_time' and '--cmt_date' options are mutually exclusive" const dtxAggrRequiredOptErr = "'--cmt_time' or '--cmt_date' option has to be defined" +const featureOnlyOneOptErr = "exactly one of --enable, --disable, --show must be provided" + +func onlyOne(bools ...bool) bool { + count := 0 + for _, b := range bools { + if b { + count++ + } + } + return count == 1 +} func addAppCommands(app *grumble.App, ctx *DdbContext) { // Command: ls @@ -309,10 +320,11 @@ the path must include the extent, otherwise, it must not.`, }) // Command: feature app.AddCommand(&grumble.Command{ - Name: "feature", - Aliases: nil, - Help: "Manage VOS pool features", - LongHelp: "", + Name: "feature", + Aliases: nil, + Help: "Manage VOS pool features", + LongHelp: `Manage VOS pool features. Exactly one of --enable, --disable, or --show must be provided. +If --db_path is provided, a VOS file path must also be given as a positional argument.`, HelpGroup: "vos", Flags: func(f *grumble.Flags) { f.String("e", "enable", "", "Enable VOS pool features") @@ -324,7 +336,18 @@ the path must include the extent, otherwise, it must not.`, a.String("path", "Optional, Path to the VOS file", grumble.Default("")) }, Run: func(c *grumble.Context) error { - return ctx.Feature(c.Args.String("path"), c.Flags.String("db_path"), c.Flags.String("enable"), c.Flags.String("disable"), c.Flags.Bool("show")) + path := c.Args.String("path") + dbPath := c.Flags.String("db_path") + enable := c.Flags.String("enable") + disable := c.Flags.String("disable") + show := c.Flags.Bool("show") + if path == "" && dbPath != "" { + return fmt.Errorf(vosPathMissErr) + } + if !onlyOne(enable != "", disable != "", show) { + return fmt.Errorf(featureOnlyOneOptErr) + } + return ctx.Feature(path, dbPath, enable, disable, show) }, Completer: featureCompleter, }) diff --git a/src/control/cmd/ddb/ddb_commands_test.go b/src/control/cmd/ddb/ddb_commands_test.go index 47d0768fa92..8e865497766 100644 --- a/src/control/cmd/ddb/ddb_commands_test.go +++ b/src/control/cmd/ddb/ddb_commands_test.go @@ -97,9 +97,26 @@ func TestDdb_Cmds(t *testing.T) { } } - featureFnCheckingShow := func(t *testing.T, wantShow bool) func(string, string, string, string, bool) error { - return func(_, _, _, _ string, show bool) error { + string2FlagsCapturing := func(captured *string) func(string) (uint64, uint64, error) { + return func(s string) (uint64, uint64, error) { + *captured = s + return 0, 0, nil + } + } + + featureFnChecking := func(t *testing.T, wantPath, wantDbPath string, + capturedEnable *string, capturedDisable *string, wantFlagValue string, + wantShow bool) func(string, string, string, string, bool) error { + return func(path, dbPath, enable, disable string, show bool) error { fmt.Println("feature called") + test.CmpAny(t, "path", wantPath, path) + test.CmpAny(t, "dbPath", wantDbPath, dbPath) + if capturedEnable != nil { + test.CmpAny(t, "enable", wantFlagValue, *capturedEnable) + } + if capturedDisable != nil { + test.CmpAny(t, "disable", wantFlagValue, *capturedDisable) + } test.CmpAny(t, "show", wantShow, show) return nil } @@ -200,45 +217,80 @@ func TestDdb_Cmds(t *testing.T) { }, // --- feature command --- - // feature --show: verifies the show flag is forwarded to the C layer. - "feature show": { + "feature without flags": { + args: []string{"feature"}, + expErr: ddbTestErr(featureOnlyOneOptErr), + }, + "feature with enable and disable flags": { + args: []string{"feature", "--enable=a", "--disable=b"}, + expErr: ddbTestErr(featureOnlyOneOptErr), + }, + "feature with enable and show flags": { + args: []string{"feature", "--enable=a", "--show"}, + expErr: ddbTestErr(featureOnlyOneOptErr), + }, + "feature with disable and show flags": { + args: []string{"feature", "--disable=a", "--show"}, + expErr: ddbTestErr(featureOnlyOneOptErr), + }, + "feature with db_path but no path": { + args: []string{"feature", "--db_path=/sysdb", "--show"}, + expErr: ddbTestErr(vosPathMissErr), + }, + "feature with long show flag": { args: []string{"feature", "--show"}, setup: func(t *testing.T) { - ddb_run_feature_Fn = featureFnCheckingShow(t, true) + ddb_run_feature_Fn = featureFnChecking(t, "", "", nil, nil, "", true) }, expStdout: []string{"feature called"}, }, - // feature --enable: verifies that the enable string reaches ddb_feature_string2flags. - "feature enable": { + "feature with short show flag": { + args: []string{"feature", "-s"}, + setup: func(t *testing.T) { + ddb_run_feature_Fn = featureFnChecking(t, "", "", nil, nil, "", true) + }, + expStdout: []string{"feature called"}, + }, + "feature with long enable flag": { args: []string{"feature", "--enable=myflag"}, setup: func(t *testing.T) { var capturedFlag string - ddb_feature_string2flags_Fn = func(s string) (uint64, uint64, error) { - capturedFlag = s - return 0, 0, nil - } - ddb_run_feature_Fn = func(path, dbPath, enable, disable string, show bool) error { - fmt.Println("feature called") - test.CmpAny(t, "enable flag string", "myflag", capturedFlag) - return nil - } + ddb_feature_string2flags_Fn = string2FlagsCapturing(&capturedFlag) + ddb_run_feature_Fn = featureFnChecking(t, "", "", &capturedFlag, nil, "myflag", false) }, expStdout: []string{"feature called"}, }, - // feature --disable: verifies that the disable string reaches ddb_feature_string2flags. - "feature disable": { - args: []string{"feature", "--disable=otherflag"}, + "feature with short enable flag": { + args: []string{"feature", "-e", "myflag"}, setup: func(t *testing.T) { var capturedFlag string - ddb_feature_string2flags_Fn = func(s string) (uint64, uint64, error) { - capturedFlag = s - return 0, 0, nil - } - ddb_run_feature_Fn = func(path, dbPath, enable, disable string, show bool) error { - fmt.Println("feature called") - test.CmpAny(t, "disable flag string", "otherflag", capturedFlag) - return nil - } + ddb_feature_string2flags_Fn = string2FlagsCapturing(&capturedFlag) + ddb_run_feature_Fn = featureFnChecking(t, "", "", &capturedFlag, nil, "myflag", false) + }, + expStdout: []string{"feature called"}, + }, + "feature with long disable flag": { + args: []string{"feature", "--disable=myflag"}, + setup: func(t *testing.T) { + var capturedFlag string + ddb_feature_string2flags_Fn = string2FlagsCapturing(&capturedFlag) + ddb_run_feature_Fn = featureFnChecking(t, "", "", nil, &capturedFlag, "myflag", false) + }, + expStdout: []string{"feature called"}, + }, + "feature with short disable flag": { + args: []string{"feature", "-d", "myflag"}, + setup: func(t *testing.T) { + var capturedFlag string + ddb_feature_string2flags_Fn = string2FlagsCapturing(&capturedFlag) + ddb_run_feature_Fn = featureFnChecking(t, "", "", nil, &capturedFlag, "myflag", false) + }, + expStdout: []string{"feature called"}, + }, + "feature with cmd-level db_path": { + args: []string{"feature", "--db_path=/sysdb", "--show", "/path/to/vos-0"}, + setup: func(t *testing.T) { + ddb_run_feature_Fn = featureFnChecking(t, "/path/to/vos-0", "/sysdb", nil, nil, "", true) }, expStdout: []string{"feature called"}, }, From 05984ffd13e8f3ee97901b62e821766be2bfb846 Mon Sep 17 00:00:00 2001 From: Kris Jacque Date: Tue, 9 Jun 2026 09:20:32 -0600 Subject: [PATCH 10/11] DAOS-19084 test: Suppress Cgo syscall false positives (#18448) The syscall function signature changed, so suppressions needed to be updated. Also added a suppression related to an internal runtime context.Context implementation. Signed-off-by: Kris Jacque --- src/cart/utils/memcheck-cart.supp | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/cart/utils/memcheck-cart.supp b/src/cart/utils/memcheck-cart.supp index f79813604ed..5bdf8278a33 100644 --- a/src/cart/utils/memcheck-cart.supp +++ b/src/cart/utils/memcheck-cart.supp @@ -552,10 +552,21 @@ ... } { - Go syscall. + Go syscall write Memcheck:Param write(buf) - fun:syscall.Syscall.abi0 + fun:internal/runtime/*Syscall6 +} +{ + Go syscall read + Memcheck:Param + read(buf) + fun:internal/runtime/*Syscall6 +} +{ + context Err() + Memcheck:Addr8 + fun:context.(*valueCtx).Err } { Racecall cgo malloc @@ -582,12 +593,6 @@ Memcheck:Value8 fun:aeshashbody } -{ - DAOS-15159 - Memcheck:Param - write(buf) - fun:internal/runtime/syscall.Syscall6 -} { DAOS-15548 Memcheck:Addr1 @@ -696,12 +701,6 @@ Memcheck:Addr8 fun:racecalladdr } -{ - Syscall6 param - Memcheck:Param - read(buf) - fun:internal/runtime/syscall.Syscall6 -} { __tsan_go_atomic32_load Memcheck:Addr4 From f8ecfa8aa77dddc4daa8f39ea4fc5d24484bf67f Mon Sep 17 00:00:00 2001 From: enakta <140368024+enakta@users.noreply.github.com> Date: Fri, 12 Jun 2026 02:11:22 +1000 Subject: [PATCH 11/11] DAOS-19058 pydaos: torch surface worker errors in parallel_list (#18414) Worker processes spawned by _Dfs.parallel_list may raise exceptions that never reached the calling process. This results in indefinite hang during Dataset and IterableDataset construction with no surfaced error to the user. Replacing manual Process + Queue scheme and its queued/processed counter with a multiprocessing.Pool driven by imap_unordered. Pool re-raises worker exceptions in the parent when their results are consumed, so a worker error now propagates as a raised OSError instead of a deadlock, and the Pool context manager reaps all workers on any exit path. `concurrent.futures.ProcessPoolExecutor` would be even better but its initializer/initargs arguments are unavailable before Python 3.7, and the target runtime includes EL8.8 / Python 3.6. Signed-off-by: Denis Barakhtanov --- src/client/pydaos/torch/torch_api.py | 166 +++++++++++++++------------ 1 file changed, 92 insertions(+), 74 deletions(-) diff --git a/src/client/pydaos/torch/torch_api.py b/src/client/pydaos/torch/torch_api.py index 922d4e84a40..fe5a1f43cd5 100644 --- a/src/client/pydaos/torch/torch_api.py +++ b/src/client/pydaos/torch/torch_api.py @@ -17,7 +17,7 @@ import os import stat import sys -from multiprocessing import Process, Queue +from multiprocessing import Pool, Process, Queue, current_process from pathlib import Path from torch.utils.data import Dataset as TorchDataset @@ -69,7 +69,8 @@ class Dataset(TorchDataset): Number of directory entries to read for each readdir call. dir_cache_size: int (optional) Number of directory object entries to cache in memory. - + readdir_workers: int (optional) + Number of parallel workers for namespace scanning. Methods ------- @@ -92,7 +93,8 @@ class Dataset(TorchDataset): def __init__(self, pool=None, cont=None, path=None, transform_fn=transform_fn_default, readdir_batch_size=READDIR_BATCH_SIZE, - dir_cache_size=DIR_CACHE_SIZE): + dir_cache_size=DIR_CACHE_SIZE, + readdir_workers=PARALLEL_SCAN_WORKERS): super().__init__() self._pool = pool @@ -102,7 +104,8 @@ def __init__(self, pool=None, cont=None, path=None, self._readdir_batch_size = readdir_batch_size self._closed = False - self.objects = self._dfs.parallel_list(path, readdir_batch_size=self._readdir_batch_size) + self.objects = self._dfs.parallel_list( + path, readdir_batch_size=self._readdir_batch_size, workers=readdir_workers) def __len__(self): """ Returns number of items in this dataset """ @@ -216,6 +219,8 @@ class IterableDataset(TorchIterableDataset): Number of samples to fetch per iteration. dir_cache_size: int (optional) Number of directory object entries to cache in memory. + readdir_workers: int (optional) + Number of parallel workers for namespace scanning. Methods @@ -233,7 +238,8 @@ def __init__(self, pool=None, cont=None, path=None, transform_fn=transform_fn_default, readdir_batch_size=READDIR_BATCH_SIZE, batch_size=ITER_BATCH_SIZE, - dir_cache_size=DIR_CACHE_SIZE): + dir_cache_size=DIR_CACHE_SIZE, + readdir_workers=PARALLEL_SCAN_WORKERS): super().__init__() self._pool = pool @@ -244,7 +250,8 @@ def __init__(self, pool=None, cont=None, path=None, self._batch_size = batch_size self._closed = False - self.objects = self._dfs.parallel_list(path, readdir_batch_size=self._readdir_batch_size) + self.objects = self._dfs.parallel_list( + path, readdir_batch_size=self._readdir_batch_size, workers=readdir_workers) self.workset = self.objects def __iter__(self): @@ -646,6 +653,35 @@ def writer(self, file, ensure_path=True): self._chunks_limit, self._workers) +def _readdir_worker_init(dfs, readdir_batch_size): + """ + Worker init for parallel readdir. + + Receives `self` as an argument to re-init DAOS after fork, per worker process. + + It has to be module function since the multiprocessing.Pool methods to init workers + will pickle instance method with main process's _Dfs class reference. + """ + + dfs.worker_init() + proc = current_process() + proc.dfs = dfs + proc.readdir_batch_size = readdir_batch_size + + +def _readdir_batch(work): + """ + Reads the anchored directory at `path` with `anchor_index` and returns + list of discovered directories and files. + + It has to be module function since the multiprocessing.Pool methods to submit jobs + will pickle instance method with main process's _Dfs class reference. + """ + path, anchor_index = work + proc = current_process() + return proc.dfs.readdir_anchored(path, anchor_index, proc.readdir_batch_size) + + class _Dfs(): """ Class encapsulating libdfs interface to load PyTorch Dataset @@ -676,49 +712,10 @@ def disconnect(self): raise OSError(ret, os.strerror(ret)) self._dfs = None - def list_worker_fn(self, in_work, out_dirs, out_files, readdir_batch_size=READDIR_BATCH_SIZE): - """ - Worker function to scan directory in parallel. - It expects to receive tuples (path, index) to scan the directory with an anchor index, - from the `in_work` queue. - It should emit tuples (scanned, to_scan) to the `out_dirs` queue, where `scanned` is the - number of scanned directories and `to_scan` is the list of directories to scan in parallel. - Upon completion it should emit the list of files in the `out_files` queue. - """ - - self.worker_init() - - result = [] - while True: - work = in_work.get() - if work is None: - break - - (path, index) = work - - dirs = [] - files = [] - ret = torch_shim.torch_list_with_anchor(DAOS_MAGIC, self._dfs, - path, index, files, dirs, readdir_batch_size - ) - if ret != 0: - raise OSError(ret, os.strerror(ret), path) - - dirs = [chunk for d in dirs for chunk in self.split_dir_for_parallel_scan( - os.path.join(path, d)) - ] - # Even if there are no dirs, we should emit the tuple to notify the main process - out_dirs.put((1, dirs)) - - files = [(os.path.join(path, file), size) for (file, size) in files] - result.extend(files) - - out_files.put(result) - def split_dir_for_parallel_scan(self, path): """ Splits dir for parallel readdir. - It returns list of tuples (dirname, anchor index) to be consumed by worker function + It returns list of tuples (dirname, anchor_index) to be consumed by workers """ ret, splits = torch_shim.torch_recommended_dir_split(DAOS_MAGIC, self._dfs, path) @@ -727,6 +724,28 @@ def split_dir_for_parallel_scan(self, path): return [(path, idx) for idx in range(0, splits)] + def readdir_anchored(self, path, anchor_index, readdir_batch_size): + """ + Scans one anchored by index directory at `path`. + + Returns (dirs, files): + `dirs` are (path, anchor_index) work items for directories found in this batch, + `files` is a list of resulting tuples: (full_path, size). + """ + dirs = [] + files = [] + ret = torch_shim.torch_list_with_anchor( + DAOS_MAGIC, self._dfs, path, anchor_index, files, dirs, readdir_batch_size) + if ret != 0: + raise OSError(ret, os.strerror(ret), path) + + subdirs = [split + for name in dirs + for split in self.split_dir_for_parallel_scan(os.path.join(path, name))] + + files = [(os.path.join(path, name), size) for (name, size) in files] + return subdirs, files + def parallel_list(self, path=None, readdir_batch_size=READDIR_BATCH_SIZE, workers=PARALLEL_SCAN_WORKERS): @@ -736,43 +755,42 @@ def parallel_list(self, path=None, To fully use this feature the container should be configured with directory object classes supporting this mode, e.g. OC_SX. + + Using multiprocessing.Pool ensures propagation of errors in the workers and cleaning up + resources, regardless of operation outcome. + + It would be even better to use `concurrent.futures.ProcessPoolExecutor`; however, + its `initializer` and `initargs` arguments are available only in Python 3.7+. + + Although Python 3.6 is EOL, many distributions still ship it by default. + Keeping `_readdir_worker_init` and `_readdir_batch` as module-level functions + instead of private class methods, is a small price that allows us to support + a much broader range of platforms. """ + if path is None: path = os.sep if not path.startswith(os.sep): raise ValueError("relative path is unacceptable") - procs = [] - work = Queue() - dirs = Queue() - files = Queue() - for _ in range(workers): - worker = Process(target=self.list_worker_fn, args=( - work, dirs, files, readdir_batch_size)) - worker.start() - procs.append(worker) - - queued = 0 - processed = 0 - for anchored_dir in self.split_dir_for_parallel_scan(path): - work.put(anchored_dir) - queued += 1 - - while processed < queued: - (scanned, to_scan) = dirs.get() - processed += scanned - for d in to_scan: - work.put(d) - queued += 1 + if readdir_batch_size <= 0: + raise ValueError("readdir batch size should be a positive number") - result = [] - for _ in range(workers): - work.put(None) - result.extend(files.get()) + if workers <= 0: + raise ValueError("at least one worker is required for namespace scanning") - for worker in procs: - worker.join() + result = [] + batch = self.split_dir_for_parallel_scan(path) + with Pool(workers, + initializer=_readdir_worker_init, + initargs=(self, readdir_batch_size)) as pool: + while batch: + next_batch = [] + for dirs, files in pool.imap_unordered(_readdir_batch, batch): + next_batch.extend(dirs) + result.extend(files) + batch = next_batch return result