Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/47543-android-staggered-job-queuing
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Refactored `makeAndroidAppAvailable` to use staggered job queuing instead of sleeping between batches inside a single worker job.
62 changes: 38 additions & 24 deletions server/service/integration_mdm_setup_experience_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4526,31 +4526,45 @@ func (s *integrationMDMTestSuite) TestAndroidAppConfiguration() {

s.runWorkerUntilDoneWithChecks(true)

// worker should have:
// 1. made each app available to the included hosts (for self-service), so 2 entries for that (from the PATCH apps to set the config)
// (this is because I made the worker run after host enrollment, if there were no host, the task would have nothing to do)
// 2. added the Fleet agent to the host's policy (from the host enrollment, via ensureHostSpecificPolicyIsApplied)
// 3. made all apps available to the enrolled host (for self-service), from the host enrollment
// 4. installed the apps, from the host enrollment
// worker should have (in any order due to staggered job queuing):
// - made each app available to the included hosts (for self-service), so 2 entries for that (from the PATCH apps to set the config)
// - added the Fleet agent to the host's policy (from the host enrollment, via ensureHostSpecificPolicyIsApplied)
// - made all apps available to the enrolled host (for self-service), from the host enrollment
// - installed the apps, from the host enrollment
require.Len(t, patchAppsPolicies, 5)
require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{
{PackageName: app1.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`1`)},
}, patchAppsPolicies[0])
require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{
{PackageName: app2.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`2`)},
}, patchAppsPolicies[1])
// Fleet agent is added during enrollment before self-service apps
require.Len(t, patchAppsPolicies[2], 1)
require.Equal(t, "com.fleetdm.agent", patchAppsPolicies[2][0].PackageName)
require.Equal(t, "FORCE_INSTALLED", patchAppsPolicies[2][0].InstallType)
require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{
{PackageName: app1.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`1`)},
{PackageName: app2.VPPAppID.AdamID, InstallType: "AVAILABLE", ManagedConfiguration: googleapi.RawMessage(`2`)},
}, patchAppsPolicies[3])
require.ElementsMatch(t, []*androidmanagement.ApplicationPolicy{
{PackageName: app1.VPPAppID.AdamID, InstallType: "PREINSTALLED", ManagedConfiguration: googleapi.RawMessage(`1`)},
{PackageName: app2.VPPAppID.AdamID, InstallType: "PREINSTALLED", ManagedConfiguration: googleapi.RawMessage(`2`)},
}, patchAppsPolicies[4])

type appCall struct {
PackageName string
InstallType string
ManagedConfiguration string
}
var appCalls []appCall
var fleetAgentCount int
for _, policies := range patchAppsPolicies {
for _, p := range policies {
if p.PackageName == "com.fleetdm.agent" {
fleetAgentCount++
require.Equal(t, "FORCE_INSTALLED", p.InstallType)
require.Contains(t, string(p.ManagedConfiguration), "server_url")
require.Contains(t, string(p.ManagedConfiguration), "host_uuid")
continue
}
appCalls = append(appCalls, appCall{p.PackageName, p.InstallType, string(p.ManagedConfiguration)})
}
}
require.Equal(t, 1, fleetAgentCount, "fleet agent should be added exactly once")
require.ElementsMatch(t, []appCall{
// app1 made available individually (from PATCH config change)
{app1.VPPAppID.AdamID, "AVAILABLE", "1"},
// app2 made available individually (from PATCH config change)
{app2.VPPAppID.AdamID, "AVAILABLE", "2"},
// app1+app2 made available during enrollment (self-service)
{app1.VPPAppID.AdamID, "AVAILABLE", "1"},
{app2.VPPAppID.AdamID, "AVAILABLE", "2"},
// app1+app2 installed during enrollment (setup experience)
{app1.VPPAppID.AdamID, "PREINSTALLED", "1"},
{app2.VPPAppID.AdamID, "PREINSTALLED", "2"},
}, appCalls)

patchAppsPolicies = nil

Expand Down
196 changes: 94 additions & 102 deletions server/worker/software_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (v *SoftwareWorker) Name() string {
const (
makeAndroidAppsAvailableForHostTask SoftwareWorkerTask = "make_android_apps_available_for_host" // deprecated
makeAndroidAppAvailableTask SoftwareWorkerTask = "make_android_app_available"
makeAndroidAppAvailableBatchTask SoftwareWorkerTask = "make_android_app_available_batch"
makeAndroidAppUnavailableTask SoftwareWorkerTask = "make_android_app_unavailable"
runAndroidSetupExperienceTask SoftwareWorkerTask = "run_android_setup_experience"
bulkSetAndroidAppsAvailableForHostTask SoftwareWorkerTask = "bulk_set_android_apps_available_for_host"
Expand Down Expand Up @@ -97,6 +98,14 @@ func (v *SoftwareWorker) Run(ctx context.Context, argsJSON json.RawMessage) erro
makeAndroidAppAvailableTask,
)

case makeAndroidAppAvailableBatchTask:
return ctxerr.Wrapf(
ctx,
v.makeAndroidAppAvailableBatch(ctx, args.ApplicationID, args.AppTeamID, args.HostUUIDToPolicyID, args.EnterpriseName, args.AppConfigChanged),
"running %s task",
makeAndroidAppAvailableBatchTask,
)

case makeAndroidAppUnavailableTask:
return ctxerr.Wrapf(
ctx,
Expand Down Expand Up @@ -144,137 +153,104 @@ func (v *SoftwareWorker) makeAndroidAppAvailable(ctx context.Context, applicatio
if err != nil {
return ctxerr.Wrap(ctx, err, "add app store app: getting android hosts in scope")
}
if len(hosts) == 0 {
return nil
}

// Queue staggered batch jobs. The phase-2 handler handles per-host
// variable substitution within each batch, so we always chunk the same way.
batchSize := v.AndroidBatchSize
if batchSize <= 0 {
batchSize = defaultAndroidBatchSize
}
batches := splitHostMap(hosts, batchSize)
for i, batch := range batches {
delay := time.Duration(i) * androidSoftwareInstallStaggerInterval
if err := queueMakeAndroidAppAvailableBatch(ctx, v.Datastore, applicationID, appTeamID, batch, enterpriseName, appConfigChanged, delay); err != nil {
return ctxerr.Wrap(ctx, err, "queue batch for make android app available")
}
}

return nil
}

func (v *SoftwareWorker) makeAndroidAppAvailableBatch(ctx context.Context, applicationID string, appTeamID uint, hostUUIDToPolicyID map[string]string, enterpriseName string, appConfigChanged bool) error {
config, err := v.Datastore.GetAndroidAppConfigurationByAppTeamID(ctx, appTeamID)
if err != nil && !fleet.IsNotFound(err) {
return ctxerr.Wrap(ctx, err, "get android app configuration")
}
Comment thread
ksykulev marked this conversation as resolved.
var configByAppID map[string][]byte
if config != nil {
configByAppID = map[string][]byte{
applicationID: config,
}
configByAppID = map[string][]byte{applicationID: config}
}

needsPerHostSubstitution := config != nil && variables.ContainsBytes(config)

if needsPerHostSubstitution {
return v.makeAndroidAppAvailablePerHost(ctx, applicationID, configByAppID, hosts, enterpriseName, appConfigChanged)
}

appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, configByAppID, "AVAILABLE")
if err != nil {
return ctxerr.Wrap(ctx, err, "building application policies with config")
}

// Process hosts in batches to avoid overwhelming the AMAPI. ~10K hosts max
batches := splitHostMap(hosts, v.AndroidBatchSize)
for i, batch := range batches {
if i > 0 {
timer := time.NewTimer(androidSoftwareInstallStaggerInterval)
select {
case <-ctx.Done():
timer.Stop()
return ctxerr.Wrap(ctx, ctx.Err(), "context done between batches")
case <-timer.C:
}
hostUUIDs := make([]string, 0, len(hostUUIDToPolicyID))
for uuid := range hostUUIDToPolicyID {
hostUUIDs = append(hostUUIDs, uuid)
}

policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, batch)
filter := fleet.TeamFilter{User: &fleet.User{GlobalRole: new("admin")}}
Comment thread
ksykulev marked this conversation as resolved.
hostDetails, err := v.Datastore.ListHostsLiteByUUIDs(ctx, filter, hostUUIDs)
if err != nil {
return ctxerr.Wrap(ctx, err, "add app store app: add app to android policy")
return ctxerr.Wrap(ctx, err, "batch fetch host details for variable substitution")
}

// if this is called from an UPDATE (config changed), mark existing installs
// as "pending" (unless already "failed") and with the correct policy version to verify
if appConfigChanged {
for hostUUID, policyRequest := range policyRequestsByHost {
err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, hostUUID, applicationID, policyRequest.PolicyVersion.V)
if err != nil {
return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", hostUUID, applicationID)
}
}
hostByUUID := make(map[string]*fleet.Host, len(hostDetails))
for _, h := range hostDetails {
hostByUUID[h.UUID] = h
}
}

return nil
}
for hostUUID := range hostUUIDToPolicyID {
h, ok := hostByUUID[hostUUID]
if !ok {
continue // host deleted since the job was queued
}

// makeAndroidAppAvailablePerHost handles the case where the app config
// contains $FLEET_VAR_HOST_* tokens that must be substituted per-host.
func (v *SoftwareWorker) makeAndroidAppAvailablePerHost(
ctx context.Context,
applicationID string,
configByAppID map[string][]byte,
hosts map[string]string,
enterpriseName string,
appConfigChanged bool,
) error {
// Batch-fetch host details for substitution.
hostUUIDs := make([]string, 0, len(hosts))
for uuid := range hosts {
hostUUIDs = append(hostUUIDs, uuid)
}
filter := fleet.TeamFilter{User: &fleet.User{GlobalRole: new("admin")}}
hostDetails, err := v.Datastore.ListHostsLiteByUUIDs(ctx, filter, hostUUIDs)
if err != nil {
return ctxerr.Wrap(ctx, err, "list hosts lite by uuids for fleet var substitution")
}
hostByUUID := make(map[string]*fleet.Host, len(hostDetails))
for _, h := range hostDetails {
hostByUUID[h.UUID] = h
}
subHost := profiles.AndroidAppConfigSubstitutionHost{
UUID: h.UUID,
HardwareSerial: h.HardwareSerial,
Platform: h.Platform,
}
substituted, err := v.substituteFleetVarsInConfigs(ctx, configByAppID, subHost)
if err != nil {
return ctxerr.Wrapf(ctx, err, "substitute fleet vars for host %s", hostUUID)
}

// TODO(#47543): refactor to use staggered job queuing instead of in-job sleep.
batchSize := v.AndroidBatchSize
if batchSize <= 0 {
batchSize = len(hostByUUID)
}
hostCount := 0
for hostUUID := range hosts {
h, ok := hostByUUID[hostUUID]
if !ok {
continue // host may have been deleted since the job was queued
}

if hostCount > 0 && hostCount%batchSize == 0 {
timer := time.NewTimer(androidSoftwareInstallStaggerInterval)
select {
case <-ctx.Done():
timer.Stop()
return ctxerr.Wrap(ctx, ctx.Err(), "context done between batches")
case <-timer.C:
appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, substituted, "AVAILABLE")
if err != nil {
return ctxerr.Wrapf(ctx, err, "building application policies with config for host %s", hostUUID)
}
}
hostCount++

subHost := profiles.AndroidAppConfigSubstitutionHost{
UUID: h.UUID,
HardwareSerial: h.HardwareSerial,
Platform: h.Platform,
}
singleHost := map[string]string{hostUUID: hostUUIDToPolicyID[hostUUID]}
policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, singleHost)
if err != nil {
return ctxerr.Wrapf(ctx, err, "add app to android policy for host %s", hostUUID)
}

substituted, err := v.substituteFleetVarsInConfigs(ctx, configByAppID, subHost)
if err != nil {
return ctxerr.Wrapf(ctx, err, "substitute fleet vars for host %s", hostUUID)
if appConfigChanged {
for uuid, policyRequest := range policyRequestsByHost {
if err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, uuid, applicationID, policyRequest.PolicyVersion.V); err != nil {
return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", uuid, applicationID)
}
}
}
}

appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, substituted, "AVAILABLE")
} else {
appPolicies, err := buildApplicationPolicyWithConfig(ctx, []string{applicationID}, configByAppID, "AVAILABLE")
if err != nil {
return ctxerr.Wrapf(ctx, err, "building application policies with config for host %s", hostUUID)
return ctxerr.Wrap(ctx, err, "building application policies with config")
}

singleHost := map[string]string{hostUUID: hosts[hostUUID]}
policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, singleHost)
policyRequestsByHost, err := v.AndroidModule.AddAppsToAndroidPolicy(ctx, enterpriseName, appPolicies, hostUUIDToPolicyID)
if err != nil {
return ctxerr.Wrapf(ctx, err, "add app to android policy for host %s", hostUUID)
return ctxerr.Wrap(ctx, err, "add app store app: add app to android policy")
}

if appConfigChanged {
for uuid, policyRequest := range policyRequestsByHost {
err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, uuid, applicationID, policyRequest.PolicyVersion.V)
if err != nil {
return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", uuid, applicationID)
for hostUUID, policyRequest := range policyRequestsByHost {
if err := v.Datastore.SetAndroidAppInstallPendingApplyConfig(ctx, hostUUID, applicationID, policyRequest.PolicyVersion.V); err != nil {
return ctxerr.Wrapf(ctx, err, "set android app install pending apply config for host %s and app %s", hostUUID, applicationID)
}
}
}
Expand All @@ -283,6 +259,19 @@ func (v *SoftwareWorker) makeAndroidAppAvailablePerHost(
return nil
}

func queueMakeAndroidAppAvailableBatch(ctx context.Context, ds fleet.Datastore, applicationID string, appTeamID uint, hostUUIDToPolicyID map[string]string, enterpriseName string, appConfigChanged bool, delay time.Duration) error {
args := &softwareWorkerArgs{
Task: makeAndroidAppAvailableBatchTask,
ApplicationID: applicationID,
AppTeamID: appTeamID,
HostUUIDToPolicyID: hostUUIDToPolicyID,
EnterpriseName: enterpriseName,
AppConfigChanged: appConfigChanged,
}
_, err := QueueJobWithDelay(ctx, ds, softwareWorkerJobName, args, delay)
return err
}

// this is called when an app is removed from Fleet.
func (v *SoftwareWorker) makeAndroidAppUnavailable(ctx context.Context, applicationID string, hostUUIDToPolicyID map[string]string, enterpriseName string) error {
// Update Android MDM policy to remove the app from the hosts
Expand Down Expand Up @@ -759,7 +748,10 @@ func (v *SoftwareWorker) bulkSetAndroidAppsAvailableForHosts(ctx context.Context
return nil
}

const androidSoftwareInstallStaggerInterval = 60 * time.Second
const (
androidSoftwareInstallStaggerInterval = 60 * time.Second
defaultAndroidBatchSize = 1000

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given current rate limits should this batch size perhaps be smaller? Feels like this leaves little overhead

)

func QueueBulkSetAndroidAppsAvailableForHosts(
ctx context.Context,
Expand Down
Loading
Loading