From ce0540f50be9b59ef556080df0d53edff729f313 Mon Sep 17 00:00:00 2001 From: Konstantin Sykulev Date: Thu, 18 Jun 2026 16:45:30 -0500 Subject: [PATCH 1/3] Refactor makeAndroidAppAvailable to use staggered job queuing --- changes/47543-android-staggered-job-queuing | 1 + server/worker/software_worker.go | 188 ++++++++++---------- server/worker/software_worker_test.go | 100 ++++++----- 3 files changed, 142 insertions(+), 147 deletions(-) create mode 100644 changes/47543-android-staggered-job-queuing diff --git a/changes/47543-android-staggered-job-queuing b/changes/47543-android-staggered-job-queuing new file mode 100644 index 00000000000..7b608cfd8a5 --- /dev/null +++ b/changes/47543-android-staggered-job-queuing @@ -0,0 +1 @@ +- Refactored `makeAndroidAppAvailable` to use staggered job queuing instead of sleeping between batches inside a single worker job. \ No newline at end of file diff --git a/server/worker/software_worker.go b/server/worker/software_worker.go index 334bcea70bd..4b1b2e9582e 100644 --- a/server/worker/software_worker.go +++ b/server/worker/software_worker.go @@ -36,6 +36,7 @@ func (v *SoftwareWorker) Name() string { const ( makeAndroidAppsAvailableForHostTask SoftwareWorkerTask = "make_android_apps_available_for_host" // deprecated makeAndroidAppAvailableTask SoftwareWorkerTask = "make_android_app_available" + makeAndroidAppAvailableBatchTask SoftwareWorkerTask = "make_android_app_available_batch" makeAndroidAppUnavailableTask SoftwareWorkerTask = "make_android_app_unavailable" runAndroidSetupExperienceTask SoftwareWorkerTask = "run_android_setup_experience" bulkSetAndroidAppsAvailableForHostTask SoftwareWorkerTask = "bulk_set_android_apps_available_for_host" @@ -97,6 +98,14 @@ func (v *SoftwareWorker) Run(ctx context.Context, argsJSON json.RawMessage) erro makeAndroidAppAvailableTask, ) + case makeAndroidAppAvailableBatchTask: + return ctxerr.Wrapf( + ctx, + v.makeAndroidAppAvailableBatch(ctx, args.ApplicationID, args.AppTeamID, args.HostUUIDToPolicyID, args.EnterpriseName, args.AppConfigChanged), + "running %s task", + makeAndroidAppAvailableBatchTask, + ) + case makeAndroidAppUnavailableTask: return ctxerr.Wrapf( ctx, @@ -144,55 +153,34 @@ func (v *SoftwareWorker) makeAndroidAppAvailable(ctx context.Context, applicatio if err != nil { return ctxerr.Wrap(ctx, err, "add app store app: getting android hosts in scope") } + if len(hosts) == 0 { + return nil + } config, err := v.Datastore.GetAndroidAppConfigurationByAppTeamID(ctx, appTeamID) if err != nil && !fleet.IsNotFound(err) { return ctxerr.Wrap(ctx, err, "get android app configuration") } - var configByAppID map[string][]byte - if config != nil { - configByAppID = map[string][]byte{ - applicationID: config, - } - } needsPerHostSubstitution := config != nil && variables.ContainsBytes(config) if needsPerHostSubstitution { - return v.makeAndroidAppAvailablePerHost(ctx, applicationID, configByAppID, hosts, enterpriseName, appConfigChanged) - } - - appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, configByAppID, "AVAILABLE") - if err != nil { - return ctxerr.Wrap(ctx, err, "building application policies with config") - } - - // Process hosts in batches to avoid overwhelming the AMAPI. ~10K hosts max - batches := splitHostMap(hosts, v.AndroidBatchSize) - for i, batch := range batches { - if i > 0 { - timer := time.NewTimer(androidSoftwareInstallStaggerInterval) - select { - case <-ctx.Done(): - timer.Stop() - return ctxerr.Wrap(ctx, ctx.Err(), "context done between batches") - case <-timer.C: + batchIdx := 0 + for hostUUID := range hosts { + singleHost := map[string]string{hostUUID: hosts[hostUUID]} + delay := time.Duration(batchIdx) * androidSoftwareInstallStaggerInterval + if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, singleHost, enterpriseName, appConfigChanged, delay); err != nil { + return ctxerr.Wrapf(ctx, err, "queue per-host batch for host %s", hostUUID) } + batchIdx++ } - - policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, batch) - if err != nil { - return ctxerr.Wrap(ctx, err, "add app store app: add app to android policy") - } - - // if this is called from an UPDATE (config changed), mark existing installs - // as "pending" (unless already "failed") and with the correct policy version to verify - if appConfigChanged { - for hostUUID, policyRequest := range policyRequestsByHost { - err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, hostUUID, applicationID, policyRequest.PolicyVersion.V) - if err != nil { - return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", hostUUID, applicationID) - } + } else { + // No variables: batch hosts normally. + batches := splitHostMap(hosts, v.AndroidBatchSize) + for i, batch := range batches { + delay := time.Duration(i) * androidSoftwareInstallStaggerInterval + if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, batch, enterpriseName, appConfigChanged, delay); err != nil { + return ctxerr.Wrap(ctx, err, "queue batch for make android app available") } } } @@ -200,81 +188,72 @@ func (v *SoftwareWorker) makeAndroidAppAvailable(ctx context.Context, applicatio return nil } -// makeAndroidAppAvailablePerHost handles the case where the app config -// contains $FLEET_VAR_HOST_* tokens that must be substituted per-host. -func (v *SoftwareWorker) makeAndroidAppAvailablePerHost( - ctx context.Context, - applicationID string, - configByAppID map[string][]byte, - hosts map[string]string, - enterpriseName string, - appConfigChanged bool, -) error { - // Batch-fetch host details for substitution. - hostUUIDs := make([]string, 0, len(hosts)) - for uuid := range hosts { - hostUUIDs = append(hostUUIDs, uuid) - } - filter := fleet.TeamFilter{User: &fleet.User{GlobalRole: new("admin")}} - hostDetails, err := v.Datastore.ListHostsLiteByUUIDs(ctx, filter, hostUUIDs) - if err != nil { - return ctxerr.Wrap(ctx, err, "list hosts lite by uuids for fleet var substitution") +func (v *SoftwareWorker) makeAndroidAppAvailableBatch(ctx context.Context, applicationID string, appTeamID uint, hostUUIDToPolicyID map[string]string, enterpriseName string, appConfigChanged bool) error { + config, err := v.Datastore.GetAndroidAppConfigurationByAppTeamID(ctx, appTeamID) + if err != nil && !fleet.IsNotFound(err) { + return ctxerr.Wrap(ctx, err, "get android app configuration") } - hostByUUID := make(map[string]*fleet.Host, len(hostDetails)) - for _, h := range hostDetails { - hostByUUID[h.UUID] = h + var configByAppID map[string][]byte + if config != nil { + configByAppID = map[string][]byte{applicationID: config} } - // TODO(#47543): refactor to use staggered job queuing instead of in-job sleep. - batchSize := v.AndroidBatchSize - if batchSize <= 0 { - batchSize = len(hostByUUID) - } - hostCount := 0 - for hostUUID := range hosts { - h, ok := hostByUUID[hostUUID] - if !ok { - continue // host may have been deleted since the job was queued - } + needsPerHostSubstitution := config != nil && variables.ContainsBytes(config) - if hostCount > 0 && hostCount%batchSize == 0 { - timer := time.NewTimer(androidSoftwareInstallStaggerInterval) - select { - case <-ctx.Done(): - timer.Stop() - return ctxerr.Wrap(ctx, ctx.Err(), "context done between batches") - case <-timer.C: + if needsPerHostSubstitution { + for hostUUID := range hostUUIDToPolicyID { + androidHost, err := v.Datastore.AndroidHostLiteByHostUUID(ctx, hostUUID) + if err != nil { + if fleet.IsNotFound(err) { + continue // host deleted since the job was queued + } + return ctxerr.Wrapf(ctx, err, "get android host for variable substitution (host %s)", hostUUID) } - } - hostCount++ - subHost := profiles.AndroidAppConfigSubstitutionHost{ - UUID: h.UUID, - HardwareSerial: h.HardwareSerial, - Platform: h.Platform, - } + subHost := profiles.AndroidAppConfigSubstitutionHost{ + UUID: androidHost.Host.UUID, + HardwareSerial: androidHost.Host.HardwareSerial, + Platform: androidHost.Host.Platform, + } + substituted, err := v.substituteFleetVarsInConfigs(ctx, configByAppID, subHost) + if err != nil { + return ctxerr.Wrapf(ctx, err, "substitute fleet vars for host %s", hostUUID) + } - substituted, err := v.substituteFleetVarsInConfigs(ctx, configByAppID, subHost) - if err != nil { - return ctxerr.Wrapf(ctx, err, "substitute fleet vars for host %s", hostUUID) - } + appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, substituted, "AVAILABLE") + if err != nil { + return ctxerr.Wrapf(ctx, err, "building application policies with config for host %s", hostUUID) + } - appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, substituted, "AVAILABLE") + singleHost := map[string]string{hostUUID: hostUUIDToPolicyID[hostUUID]} + policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, singleHost) + if err != nil { + return ctxerr.Wrapf(ctx, err, "add app to android policy for host %s", hostUUID) + } + + if appConfigChanged { + for uuid, policyRequest := range policyRequestsByHost { + if err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, uuid, applicationID, policyRequest.PolicyVersion.V); err != nil { + return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", uuid, applicationID) + } + } + } + } + } else { + appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, configByAppID, "AVAILABLE") if err != nil { - return ctxerr.Wrapf(ctx, err, "building application policies with config for host %s", hostUUID) + return ctxerr.Wrap(ctx, err, "building application policies with config") } - singleHost := map[string]string{hostUUID: hosts[hostUUID]} - policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, singleHost) + policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, hostUUIDToPolicyID) if err != nil { - return ctxerr.Wrapf(ctx, err, "add app to android policy for host %s", hostUUID) + return ctxerr.Wrap(ctx, err, "add app store app: add app to android policy") } if appConfigChanged { - for uuid, policyRequest := range policyRequestsByHost { - err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, uuid, applicationID, policyRequest.PolicyVersion.V) - if err != nil { - return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", uuid, applicationID) + for hostUUID, policyRequest := range policyRequestsByHost { + if err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, hostUUID, applicationID, policyRequest.PolicyVersion.V); err != nil { + return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", hostUUID, applicationID) } } } @@ -283,6 +262,19 @@ func (v *SoftwareWorker) makeAndroidAppAvailablePerHost( return nil } +func queueMakeAndroidAppAvailableBatch(ctx context.Context, ds fleet.Datastore, applicationID string, appTeamID uint, hostUUIDToPolicyID map[string]string, enterpriseName string, appConfigChanged bool, delay time.Duration) error { + args := &softwareWorkerArgs{ + Task: makeAndroidAppAvailableBatchTask, + ApplicationID: applicationID, + AppTeamID: appTeamID, + HostUUIDToPolicyID: hostUUIDToPolicyID, + EnterpriseName: enterpriseName, + AppConfigChanged: appConfigChanged, + } + _, err := QueueJobWithDelay(ctx, ds, softwareWorkerJobName, args, delay) + return err +} + // this is called when an app is removed from Fleet. func (v *SoftwareWorker) makeAndroidAppUnavailable(ctx context.Context, applicationID string, hostUUIDToPolicyID map[string]string, enterpriseName string) error { // Update Android MDM policy to remove the app from the hosts diff --git a/server/worker/software_worker_test.go b/server/worker/software_worker_test.go index f667fa78a79..3fac832bc78 100644 --- a/server/worker/software_worker_test.go +++ b/server/worker/software_worker_test.go @@ -5,9 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" - "sync/atomic" "testing" - "testing/synctest" "time" "github.com/fleetdm/fleet/v4/server/datastore/mysql/mysqltest" @@ -226,61 +224,65 @@ func TestSplitHostMap(t *testing.T) { } func TestMakeAndroidAppAvailableBatching(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - var callCount atomic.Int32 - var totalHosts atomic.Int32 - - androidModule := &mockAndroidModule{ - addAppsToAndroidPolicyFunc: func(ctx context.Context, enterpriseName string, appPolicies []*androidmanagement.ApplicationPolicy, hostUUIDs map[string]string) (map[string]*android.MDMAndroidPolicyRequest, error) { - callCount.Add(1) - totalHosts.Add(int32(len(hostUUIDs))) //nolint:gosec // test with small host counts - return make(map[string]*android.MDMAndroidPolicyRequest), nil - }, - } + ds := new(mock.Store) - ds := new(mock.Store) - // 5 hosts in scope - ds.GetIncludedHostUUIDMapForAppStoreAppFunc = func(ctx context.Context, appTeamID uint) (map[string]string, error) { - hosts := make(map[string]string, 5) - for i := range 5 { - hosts[fmt.Sprintf("host-%d", i)] = fmt.Sprintf("host-%d", i) - } - return hosts, nil - } - ds.GetAndroidAppConfigurationByAppTeamIDFunc = func(ctx context.Context, appTeamID uint) ([]byte, error) { - return nil, nil + // 5 hosts in scope + ds.GetIncludedHostUUIDMapForAppStoreAppFunc = func(ctx context.Context, appTeamID uint) (map[string]string, error) { + hosts := make(map[string]string, 5) + for i := range 5 { + hosts[fmt.Sprintf("host-%d", i)] = fmt.Sprintf("host-%d", i) } + return hosts, nil + } + ds.GetAndroidAppConfigurationByAppTeamIDFunc = func(ctx context.Context, appTeamID uint) ([]byte, error) { + return nil, nil // no config, no variables + } - w := &SoftwareWorker{ - Datastore: ds, - AndroidModule: androidModule, - Log: slog.New(slog.DiscardHandler), - AndroidBatchSize: 2, // batch size of 2 → 3 batches (2+2+1) - } + var jobs []*fleet.Job + ds.NewJobFunc = func(ctx context.Context, job *fleet.Job) (*fleet.Job, error) { + job.ID = uint(len(jobs) + 1) + jobs = append(jobs, job) + return job, nil + } - ctx := t.Context() - errCh := make(chan error, 1) - go func() { - errCh <- w.makeAndroidAppAvailable(ctx, "com.example.app", 1, "enterprises/test", false) - }() + w := &SoftwareWorker{ + Datastore: ds, + AndroidModule: &mockAndroidModule{}, + Log: slog.New(slog.DiscardHandler), + AndroidBatchSize: 2, // batch size of 2 → 3 batches (2+2+1) + } - // First batch runs immediately. - synctest.Wait() - require.Equal(t, int32(1), callCount.Load(), "first batch should run immediately") + err := w.makeAndroidAppAvailable(t.Context(), "com.example.app", 1, "enterprises/test", false) + require.NoError(t, err) - // Advance past first stagger interval → second batch. - time.Sleep(androidSoftwareInstallStaggerInterval) - synctest.Wait() - require.Equal(t, int32(2), callCount.Load(), "second batch after first sleep") + // Phase 1 should queue 3 batch jobs (2+2+1 hosts), no AMAPI calls. + require.Len(t, jobs, 3, "expected 3 batch jobs queued") - // Advance past second stagger interval → third batch. - time.Sleep(androidSoftwareInstallStaggerInterval) - synctest.Wait() - require.Equal(t, int32(3), callCount.Load(), "third batch after second sleep") + // Verify staggered delays: 0s, 60s, 120s + for i, job := range jobs { + var args softwareWorkerArgs + require.NoError(t, json.Unmarshal(*job.Args, &args)) + require.Equal(t, makeAndroidAppAvailableBatchTask, args.Task) + require.Equal(t, "com.example.app", args.ApplicationID) + require.Equal(t, "enterprises/test", args.EnterpriseName) + + if i == 0 { + require.True(t, job.NotBefore.IsZero(), "first batch should have no delay") + } else { + expectedDelay := time.Duration(i) * androidSoftwareInstallStaggerInterval + require.WithinDuration(t, time.Now().Add(expectedDelay), job.NotBefore, 5*time.Second, + "batch %d should be delayed by %s", i, expectedDelay) + } + } - require.NoError(t, <-errCh) - require.Equal(t, int32(5), totalHosts.Load(), "all 5 hosts processed") - }) + // Count total hosts across all batches + totalHosts := 0 + for _, job := range jobs { + var args softwareWorkerArgs + require.NoError(t, json.Unmarshal(*job.Args, &args)) + totalHosts += len(args.HostUUIDToPolicyID) + } + require.Equal(t, 5, totalHosts, "all 5 hosts should be distributed across batches") } func TestQueueBulkSetAndroidAppsAvailableForHostsChunking(t *testing.T) { From 54a84901a57c6018649d2162e6e09c9e7dff0757 Mon Sep 17 00:00:00 2001 From: Konstantin Sykulev Date: Fri, 19 Jun 2026 10:40:21 -0500 Subject: [PATCH 2/3] ai feedback --- .../integration_mdm_setup_experience_test.go | 56 +++++++----- server/worker/software_worker.go | 61 ++++++------- server/worker/software_worker_test.go | 89 +++++++++++++++++++ 3 files changed, 148 insertions(+), 58 deletions(-) diff --git a/server/service/integration_mdm_setup_experience_test.go b/server/service/integration_mdm_setup_experience_test.go index 1bca0142abc..c7c4eb176b9 100644 --- a/server/service/integration_mdm_setup_experience_test.go +++ b/server/service/integration_mdm_setup_experience_test.go @@ -4526,31 +4526,39 @@ func (s *integrationMDMTestSuite) TestAndroidAppConfiguration() { s.runWorkerUntilDoneWithChecks(true) - // worker should have: - // 1. made each app available to the included hosts (for self-service), so 2 entries for that (from the PATCH apps to set the config) - // (this is because I made the worker run after host enrollment, if there were no host, the task would have nothing to do) - // 2. added the Fleet agent to the host's policy (from the host enrollment, via ensureHostSpecificPolicyIsApplied) - // 3. made all apps available to the enrolled host (for self-service), from the host enrollment - // 4. installed the apps, from the host enrollment + // worker should have (in any order due to staggered job queuing): + // - made each app available to the included hosts (for self-service), so 2 entries for that (from the PATCH apps to set the config) + // - added the Fleet agent to the host's policy (from the host enrollment, via ensureHostSpecificPolicyIsApplied) + // - made all apps available to the enrolled host (for self-service), from the host enrollment + // - installed the apps, from the host enrollment require.Len(t, patchAppsPolicies, 5) - require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{ - {PackageName: app1.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`1`)}, - }, patchAppsPolicies[0]) - require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{ - {PackageName: app2.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`2`)}, - }, patchAppsPolicies[1]) - // Fleet agent is added during enrollment before self-service apps - require.Len(t, patchAppsPolicies[2], 1) - require.Equal(t, "com.fleetdm.agent", patchAppsPolicies[2][0].PackageName) - require.Equal(t, "FORCE_INSTALLED", patchAppsPolicies[2][0].InstallType) - require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{ - {PackageName: app1.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`1`)}, - {PackageName: app2.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`2`)}, - }, patchAppsPolicies[3]) - require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{ - {PackageName: app1.VPPAppID.AdamID, InstallType: "PREINSTALLED", ManagedConfiguration: googleapi.RawMessage(`1`)}, - {PackageName: app2.VPPAppID.AdamID, InstallType: "PREINSTALLED", ManagedConfiguration: googleapi.RawMessage(`2`)}, - }, patchAppsPolicies[4]) + + // Flatten all AMAPI calls into a set of (packageName, installType) pairs + // to verify the right calls were made regardless of order. + type appCall struct { + PackageName string + InstallType string + } + var allCalls []appCall + for _, policies := range patchAppsPolicies { + for _, p := range policies { + allCalls = append(allCalls, appCall{p.PackageName, p.InstallType}) + } + } + require.ElementsMatch(t, []appCall{ + // app1 made available individually (from PATCH config change) + {app1.VPPAppID.AdamID, "AVAILABLE"}, + // app2 made available individually (from PATCH config change) + {app2.VPPAppID.AdamID, "AVAILABLE"}, + // Fleet agent added during enrollment + {"com.fleetdm.agent", "FORCE_INSTALLED"}, + // app1+app2 made available during enrollment (self-service) + {app1.VPPAppID.AdamID, "AVAILABLE"}, + {app2.VPPAppID.AdamID, "AVAILABLE"}, + // app1+app2 installed during enrollment (setup experience) + {app1.VPPAppID.AdamID, "PREINSTALLED"}, + {app2.VPPAppID.AdamID, "PREINSTALLED"}, + }, allCalls) patchAppsPolicies = nil diff --git a/server/worker/software_worker.go b/server/worker/software_worker.go index 4b1b2e9582e..8a300b9412f 100644 --- a/server/worker/software_worker.go +++ b/server/worker/software_worker.go @@ -157,31 +157,13 @@ func (v *SoftwareWorker) makeAndroidAppAvailable(ctx context.Context, applicatio return nil } - config, err := v.Datastore.GetAndroidAppConfigurationByAppTeamID(ctx, appTeamID) - if err != nil && !fleet.IsNotFound(err) { - return ctxerr.Wrap(ctx, err, "get android app configuration") - } - - needsPerHostSubstitution := config != nil && variables.ContainsBytes(config) - - if needsPerHostSubstitution { - batchIdx := 0 - for hostUUID := range hosts { - singleHost := map[string]string{hostUUID: hosts[hostUUID]} - delay := time.Duration(batchIdx) * androidSoftwareInstallStaggerInterval - if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, singleHost, enterpriseName, appConfigChanged, delay); err != nil { - return ctxerr.Wrapf(ctx, err, "queue per-host batch for host %s", hostUUID) - } - batchIdx++ - } - } else { - // No variables: batch hosts normally. - batches := splitHostMap(hosts, v.AndroidBatchSize) - for i, batch := range batches { - delay := time.Duration(i) * androidSoftwareInstallStaggerInterval - if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, batch, enterpriseName, appConfigChanged, delay); err != nil { - return ctxerr.Wrap(ctx, err, "queue batch for make android app available") - } + // Queue staggered batch jobs. The phase-2 handler handles per-host + // variable substitution within each batch, so we always chunk the same way. + batches := splitHostMap(hosts, v.AndroidBatchSize) + for i, batch := range batches { + delay := time.Duration(i) * androidSoftwareInstallStaggerInterval + if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, batch, enterpriseName, appConfigChanged, delay); err != nil { + return ctxerr.Wrap(ctx, err, "queue batch for make android app available") } } @@ -201,19 +183,30 @@ func (v *SoftwareWorker) makeAndroidAppAvailableBatch(ctx context.Context, appli needsPerHostSubstitution := config != nil && variables.ContainsBytes(config) if needsPerHostSubstitution { + hostUUIDs := make([]string, 0, len(hostUUIDToPolicyID)) + for uuid := range hostUUIDToPolicyID { + hostUUIDs = append(hostUUIDs, uuid) + } + filter := fleet.TeamFilter{User: &fleet.User{GlobalRole: new("admin")}} + hostDetails, err := v.Datastore.ListHostsLiteByUUIDs(ctx, filter, hostUUIDs) + if err != nil { + return ctxerr.Wrap(ctx, err, "batch fetch host details for variable substitution") + } + hostByUUID := make(map[string]*fleet.Host, len(hostDetails)) + for _, h := range hostDetails { + hostByUUID[h.UUID] = h + } + for hostUUID := range hostUUIDToPolicyID { - androidHost, err := v.Datastore.AndroidHostLiteByHostUUID(ctx, hostUUID) - if err != nil { - if fleet.IsNotFound(err) { - continue // host deleted since the job was queued - } - return ctxerr.Wrapf(ctx, err, "get android host for variable substitution (host %s)", hostUUID) + h, ok := hostByUUID[hostUUID] + if !ok { + continue // host deleted since the job was queued } subHost := profiles.AndroidAppConfigSubstitutionHost{ - UUID: androidHost.Host.UUID, - HardwareSerial: androidHost.Host.HardwareSerial, - Platform: androidHost.Host.Platform, + UUID: h.UUID, + HardwareSerial: h.HardwareSerial, + Platform: h.Platform, } substituted, err := v.substituteFleetVarsInConfigs(ctx, configByAppID, subHost) if err != nil { diff --git a/server/worker/software_worker_test.go b/server/worker/software_worker_test.go index 3fac832bc78..d20aec5def6 100644 --- a/server/worker/software_worker_test.go +++ b/server/worker/software_worker_test.go @@ -2,6 +2,7 @@ package worker import ( "context" + "database/sql" "encoding/json" "fmt" "log/slog" @@ -285,6 +286,94 @@ func TestMakeAndroidAppAvailableBatching(t *testing.T) { require.Equal(t, 5, totalHosts, "all 5 hosts should be distributed across batches") } +func TestMakeAndroidAppAvailableBatchNoVars(t *testing.T) { + var addAppsCalled bool + var capturedHosts map[string]string + + androidModule := &mockAndroidModule{ + addAppsToAndroidPolicyFunc: func(ctx context.Context, enterpriseName string, appPolicies []*androidmanagement.ApplicationPolicy, hostUUIDs map[string]string) (map[string]*android.MDMAndroidPolicyRequest, error) { + addAppsCalled = true + capturedHosts = hostUUIDs + result := make(map[string]*android.MDMAndroidPolicyRequest) + for uuid := range hostUUIDs { + result[uuid] = &android.MDMAndroidPolicyRequest{PolicyVersion: sql.Null[int64]{V: 42, Valid: true}} + } + return result, nil + }, + } + + ds := new(mock.Store) + ds.GetAndroidAppConfigurationByAppTeamIDFunc = func(ctx context.Context, appTeamID uint) ([]byte, error) { + return nil, nil // no config + } + + var pendingConfigs []string + ds.SetAndroidAppInstallPendingApplyConfigFunc = func(ctx context.Context, hostUUID, applicationID string, policyVersion int64) error { + pendingConfigs = append(pendingConfigs, hostUUID) + return nil + } + + w := &SoftwareWorker{Datastore: ds, AndroidModule: androidModule, Log: slog.New(slog.DiscardHandler)} + + hosts := map[string]string{"host-1": "host-1", "host-2": "host-2"} + err := w.makeAndroidAppAvailableBatch(t.Context(), "com.example.app", 1, hosts, "enterprises/test", true) + require.NoError(t, err) + + require.True(t, addAppsCalled, "should call AddAppsToAndroidPolicy") + require.Equal(t, hosts, capturedHosts, "all hosts should be sent in one call") + require.Len(t, pendingConfigs, 2, "appConfigChanged=true should update both hosts") +} + +func TestMakeAndroidAppAvailableBatchWithVars(t *testing.T) { + var addAppsCalls int + var capturedConfigs []string + + androidModule := &mockAndroidModule{ + addAppsToAndroidPolicyFunc: func(ctx context.Context, enterpriseName string, appPolicies []*androidmanagement.ApplicationPolicy, hostUUIDs map[string]string) (map[string]*android.MDMAndroidPolicyRequest, error) { + addAppsCalls++ + if len(appPolicies) > 0 { + capturedConfigs = append(capturedConfigs, string(appPolicies[0].ManagedConfiguration)) + } + result := make(map[string]*android.MDMAndroidPolicyRequest) + for uuid := range hostUUIDs { + result[uuid] = &android.MDMAndroidPolicyRequest{PolicyVersion: sql.Null[int64]{V: 1, Valid: true}} + } + return result, nil + }, + } + + ds := new(mock.Store) + // Config with a fleet variable + ds.GetAndroidAppConfigurationByAppTeamIDFunc = func(ctx context.Context, appTeamID uint) ([]byte, error) { + return []byte(`{"managedConfiguration": {"deviceId": "$FLEET_VAR_HOST_UUID"}}`), nil + } + // Batch fetch returns two hosts with different UUIDs + ds.ListHostsLiteByUUIDsFunc = func(ctx context.Context, filter fleet.TeamFilter, uuids []string) ([]*fleet.Host, error) { + var hosts []*fleet.Host + for _, uuid := range uuids { + hosts = append(hosts, &fleet.Host{UUID: uuid, Platform: "android", HardwareSerial: "SN-" + uuid}) + } + return hosts, nil + } + ds.SetAndroidAppInstallPendingApplyConfigFunc = func(ctx context.Context, hostUUID, applicationID string, policyVersion int64) error { + return nil + } + + w := &SoftwareWorker{Datastore: ds, AndroidModule: androidModule, Log: slog.New(slog.DiscardHandler)} + + hosts := map[string]string{"uuid-aaa": "uuid-aaa", "uuid-bbb": "uuid-bbb"} + err := w.makeAndroidAppAvailableBatch(t.Context(), "com.example.app", 1, hosts, "enterprises/test", false) + require.NoError(t, err) + + // Per-host substitution: one AMAPI call per host + require.Equal(t, 2, addAppsCalls, "should call AddAppsToAndroidPolicy once per host") + + // Each call should have the host's UUID substituted in, not the literal variable + for _, cfg := range capturedConfigs { + require.NotContains(t, cfg, "$FLEET_VAR_HOST_UUID", "variable should be substituted") + } +} + func TestQueueBulkSetAndroidAppsAvailableForHostsChunking(t *testing.T) { ds := new(mock.Store) From eaf0f008670a8d5251e58d582b15585c8fc03124 Mon Sep 17 00:00:00 2001 From: Konstantin Sykulev Date: Fri, 19 Jun 2026 13:31:52 -0500 Subject: [PATCH 3/3] ai feedback --- .../integration_mdm_setup_experience_test.go | 36 +++++++++++-------- server/worker/software_worker.go | 11 ++++-- server/worker/software_worker_test.go | 26 +++++++------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/server/service/integration_mdm_setup_experience_test.go b/server/service/integration_mdm_setup_experience_test.go index c7c4eb176b9..ae8bc0588c0 100644 --- a/server/service/integration_mdm_setup_experience_test.go +++ b/server/service/integration_mdm_setup_experience_test.go @@ -4533,32 +4533,38 @@ func (s *integrationMDMTestSuite) TestAndroidAppConfiguration() { // - installed the apps, from the host enrollment require.Len(t, patchAppsPolicies, 5) - // Flatten all AMAPI calls into a set of (packageName, installType) pairs - // to verify the right calls were made regardless of order. type appCall struct { - PackageName string - InstallType string + PackageName string + InstallType string + ManagedConfiguration string } - var allCalls []appCall + var appCalls []appCall + var fleetAgentCount int for _, policies := range patchAppsPolicies { for _, p := range policies { - allCalls = append(allCalls, appCall{p.PackageName, p.InstallType}) + if p.PackageName == "com.fleetdm.agent" { + fleetAgentCount++ + require.Equal(t, "FORCE_INSTALLED", p.InstallType) + require.Contains(t, string(p.ManagedConfiguration), "server_url") + require.Contains(t, string(p.ManagedConfiguration), "host_uuid") + continue + } + appCalls = append(appCalls, appCall{p.PackageName, p.InstallType, string(p.ManagedConfiguration)}) } } + require.Equal(t, 1, fleetAgentCount, "fleet agent should be added exactly once") require.ElementsMatch(t, []appCall{ // app1 made available individually (from PATCH config change) - {app1.VPPAppID.AdamID, "AVAILABLE"}, + {app1.VPPAppID.AdamID, "AVAILABLE", "1"}, // app2 made available individually (from PATCH config change) - {app2.VPPAppID.AdamID, "AVAILABLE"}, - // Fleet agent added during enrollment - {"com.fleetdm.agent", "FORCE_INSTALLED"}, + {app2.VPPAppID.AdamID, "AVAILABLE", "2"}, // app1+app2 made available during enrollment (self-service) - {app1.VPPAppID.AdamID, "AVAILABLE"}, - {app2.VPPAppID.AdamID, "AVAILABLE"}, + {app1.VPPAppID.AdamID, "AVAILABLE", "1"}, + {app2.VPPAppID.AdamID, "AVAILABLE", "2"}, // app1+app2 installed during enrollment (setup experience) - {app1.VPPAppID.AdamID, "PREINSTALLED"}, - {app2.VPPAppID.AdamID, "PREINSTALLED"}, - }, allCalls) + {app1.VPPAppID.AdamID, "PREINSTALLED", "1"}, + {app2.VPPAppID.AdamID, "PREINSTALLED", "2"}, + }, appCalls) patchAppsPolicies = nil diff --git a/server/worker/software_worker.go b/server/worker/software_worker.go index 8a300b9412f..2cdb93f5372 100644 --- a/server/worker/software_worker.go +++ b/server/worker/software_worker.go @@ -159,7 +159,11 @@ func (v *SoftwareWorker) makeAndroidAppAvailable(ctx context.Context, applicatio // Queue staggered batch jobs. The phase-2 handler handles per-host // variable substitution within each batch, so we always chunk the same way. - batches := splitHostMap(hosts, v.AndroidBatchSize) + batchSize := v.AndroidBatchSize + if batchSize <= 0 { + batchSize = defaultAndroidBatchSize + } + batches := splitHostMap(hosts, batchSize) for i, batch := range batches { delay := time.Duration(i) * androidSoftwareInstallStaggerInterval if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, batch, enterpriseName, appConfigChanged, delay); err != nil { @@ -744,7 +748,10 @@ func (v *SoftwareWorker) bulkSetAndroidAppsAvailableForHosts(ctx context.Context return nil } -const androidSoftwareInstallStaggerInterval = 60 * time.Second +const ( + androidSoftwareInstallStaggerInterval = 60 * time.Second + defaultAndroidBatchSize = 1000 +) func QueueBulkSetAndroidAppsAvailableForHosts( ctx context.Context, diff --git a/server/worker/software_worker_test.go b/server/worker/software_worker_test.go index d20aec5def6..05f4e394375 100644 --- a/server/worker/software_worker_test.go +++ b/server/worker/software_worker_test.go @@ -325,14 +325,16 @@ func TestMakeAndroidAppAvailableBatchNoVars(t *testing.T) { } func TestMakeAndroidAppAvailableBatchWithVars(t *testing.T) { - var addAppsCalls int - var capturedConfigs []string + // Capture per-host AMAPI calls: host UUID → rendered managed config + capturedConfigByHost := make(map[string]string) androidModule := &mockAndroidModule{ addAppsToAndroidPolicyFunc: func(ctx context.Context, enterpriseName string, appPolicies []*androidmanagement.ApplicationPolicy, hostUUIDs map[string]string) (map[string]*android.MDMAndroidPolicyRequest, error) { - addAppsCalls++ - if len(appPolicies) > 0 { - capturedConfigs = append(capturedConfigs, string(appPolicies[0].ManagedConfiguration)) + // Each call should target exactly one host (per-host substitution) + require.Len(t, hostUUIDs, 1) + require.Len(t, appPolicies, 1) + for uuid := range hostUUIDs { + capturedConfigByHost[uuid] = string(appPolicies[0].ManagedConfiguration) } result := make(map[string]*android.MDMAndroidPolicyRequest) for uuid := range hostUUIDs { @@ -343,11 +345,9 @@ func TestMakeAndroidAppAvailableBatchWithVars(t *testing.T) { } ds := new(mock.Store) - // Config with a fleet variable ds.GetAndroidAppConfigurationByAppTeamIDFunc = func(ctx context.Context, appTeamID uint) ([]byte, error) { return []byte(`{"managedConfiguration": {"deviceId": "$FLEET_VAR_HOST_UUID"}}`), nil } - // Batch fetch returns two hosts with different UUIDs ds.ListHostsLiteByUUIDsFunc = func(ctx context.Context, filter fleet.TeamFilter, uuids []string) ([]*fleet.Host, error) { var hosts []*fleet.Host for _, uuid := range uuids { @@ -365,13 +365,11 @@ func TestMakeAndroidAppAvailableBatchWithVars(t *testing.T) { err := w.makeAndroidAppAvailableBatch(t.Context(), "com.example.app", 1, hosts, "enterprises/test", false) require.NoError(t, err) - // Per-host substitution: one AMAPI call per host - require.Equal(t, 2, addAppsCalls, "should call AddAppsToAndroidPolicy once per host") - - // Each call should have the host's UUID substituted in, not the literal variable - for _, cfg := range capturedConfigs { - require.NotContains(t, cfg, "$FLEET_VAR_HOST_UUID", "variable should be substituted") - } + require.Len(t, capturedConfigByHost, 2, "should have called AMAPI for both hosts") + require.Contains(t, capturedConfigByHost["uuid-aaa"], "uuid-aaa", "host uuid-aaa should appear in its config") + require.NotContains(t, capturedConfigByHost["uuid-aaa"], "$FLEET_VAR_HOST_UUID") + require.Contains(t, capturedConfigByHost["uuid-bbb"], "uuid-bbb", "host uuid-bbb should appear in its config") + require.NotContains(t, capturedConfigByHost["uuid-bbb"], "$FLEET_VAR_HOST_UUID") } func TestQueueBulkSetAndroidAppsAvailableForHostsChunking(t *testing.T) {