diff --git "a/docs/\345\277\231\351\227\262\346\227\266\345\210\207\346\215\242\344\272\213\344\273\266.md" "b/docs/\345\277\231\351\227\262\346\227\266\345\210\207\346\215\242\344\272\213\344\273\266.md" new file mode 100644 index 000000000..b153e1000 --- /dev/null +++ "b/docs/\345\277\231\351\227\262\346\227\266\345\210\207\346\215\242\344\272\213\344\273\266.md" @@ -0,0 +1,20 @@ +# 在平台下发了忙闲时配置后,需要在后台运行一个协程任务,该任务主要执行如下流程: +- 检测当前处于哪种时间状态:忙时还是闲时 +- 当当前时间从忙时变成了闲时,或者从闲时变成了忙时,均需要刷新updater中的相关限速配置。只有状态变化才刷新相关限速配置 + +这个任务的生命周期如下: +- 生命周期不得长于prepare_dist_upgrade这个下载任务 +- 这个任务的运行依赖于平台下发的限速配置中,开启了闲时或者忙时限速。 +- 当下载job结束后,这个忙闲时切换监控流程也需要结束 +- 如果远程全局、闲时和忙时均没有开启,则使用本地限速 + +术语规范:闲时-offPeak,忙时-peak + +``` +applyOnlineRateLimit() + ├── AllDayRateLimit.Enable == true → 使用全局限速 + └── else → getCurrentTimeState() + ├── PeakTimeRateLimit.Enable && 时间在忙时范围 → 返回 timeStateBusy → 使用忙时限速 + ├── OffPeakTimeRateLimit.Enable && 时间在闲时范围 → 返回 timeStateIdle → 使用闲时限速 + └── 其他情况 → 返回 timeStateUnknown → getDownloadSpeedLimitConfigByTimeState() 的 default 分支 → 使用本地限速 +``` diff --git a/src/lastore-daemon/manager_download.go b/src/lastore-daemon/manager_download.go index 8fbd00ae1..a030a82eb 100644 --- a/src/lastore-daemon/manager_download.go +++ b/src/lastore-daemon/manager_download.go @@ -44,26 +44,56 @@ func (m *Manager) applyOnlineRateLimit(downloadSpeed *downloadSpeedLimitConfig, downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.AllDayRateLimit.Bps) downloadSpeed.IsOnlineSpeedLimit = true downloadSpeed.DownloadSpeedLimitEnabled = true - } else if isTimeInRange(nowTime, onlineRateLimit.PeakTimeRateLimit.StartTime, onlineRateLimit.PeakTimeRateLimit.EndTime) && - onlineRateLimit.PeakTimeRateLimit.Enable { - downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.PeakTimeRateLimit.Bps) - downloadSpeed.IsOnlineSpeedLimit = true - downloadSpeed.DownloadSpeedLimitEnabled = true - } else if isTimeInRange(nowTime, onlineRateLimit.OffPeakTimeRateLimit.StartTime, onlineRateLimit.OffPeakTimeRateLimit.EndTime) && - onlineRateLimit.OffPeakTimeRateLimit.Enable { - downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.OffPeakTimeRateLimit.Bps) - downloadSpeed.IsOnlineSpeedLimit = true - downloadSpeed.DownloadSpeedLimitEnabled = true } else { - err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), downloadSpeed) + timeState := m.getCurrentTimeState(nowTime) + *downloadSpeed = getDownloadSpeedLimitConfigByTimeState(m, timeState) + } +} + +const ( + timeStateUnknown = iota + timeStatePeak + timeStateOffPeak +) + +func getDownloadSpeedLimitConfigByTimeState(m *Manager, timeState int) downloadSpeedLimitConfig { + switch timeState { + case timeStatePeak: + return downloadSpeedLimitConfig{ + LimitSpeed: strconv.Itoa(m.updatePlatform.OnlineRateLimit.PeakTimeRateLimit.Bps), + IsOnlineSpeedLimit: true, + DownloadSpeedLimitEnabled: true, + } + case timeStateOffPeak: + return downloadSpeedLimitConfig{ + LimitSpeed: strconv.Itoa(m.updatePlatform.OnlineRateLimit.OffPeakTimeRateLimit.Bps), + IsOnlineSpeedLimit: true, + DownloadSpeedLimitEnabled: true, + } + default: + var downloadSpeed downloadSpeedLimitConfig + err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), &downloadSpeed) if err != nil { downloadSpeed.IsOnlineSpeedLimit = false downloadSpeed.LimitSpeed = strconv.FormatInt(defaultSpeedLimit, 10) downloadSpeed.DownloadSpeedLimitEnabled = true } + return downloadSpeed } } +func (m *Manager) getCurrentTimeState(nowTime string) int { + onlineRateLimit := m.updatePlatform.OnlineRateLimit + if isTimeInRange(nowTime, onlineRateLimit.PeakTimeRateLimit.StartTime, onlineRateLimit.PeakTimeRateLimit.EndTime) && + onlineRateLimit.PeakTimeRateLimit.Enable { + return timeStatePeak + } else if isTimeInRange(nowTime, onlineRateLimit.OffPeakTimeRateLimit.StartTime, onlineRateLimit.OffPeakTimeRateLimit.EndTime) && + onlineRateLimit.OffPeakTimeRateLimit.Enable { + return timeStateOffPeak + } + return timeStateUnknown +} + func isTimeInRange(nowTimeStr, startStr, endStr string) bool { now, err := parseTime(nowTimeStr) if err != nil { @@ -166,42 +196,19 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp m.statusManager.SetUpdateStatus(mode, system.DownloadErr) return nil, dbusutil.ToError(errors.New(string(errStr))) } - done := make(chan struct{}, 1) - notifyDone := func() { - select { - case done <- struct{}{}: - default: - } - } + var peakOffPeakMonitor *PeakOffPeakMonitor if m.config.IntranetUpdate { //私有化更新有忙闲时下载限速的功能,需要在真正开始下载前刷新一下线上限速配置 if err = m.refreshThrottlingFromPlatform(); err != nil { logger.Warning("updatePlatform gen download speed limit failed", err) } else { - go func() { - ticker := time.NewTicker(5 * time.Second) - startTime := time.Now() - defer ticker.Stop() - var count int - layout := "15:04:05" - for { - select { - case <-done: - logger.Info("online rate limit ticker stopped") - return - case t := <-ticker.C: - count++ - downloadStartServiceTime, err := time.ParseInLocation(layout, m.updatePlatform.OnlineRateLimit.ServerTime, time.Local) - if err != nil { - logger.Warningf("format OnlineRateLimit service time failed, %v", err) - return - } - logger.Infof("downloadStartServiceTime %v", downloadStartServiceTime) - nowTime := downloadStartServiceTime.Add(t.Sub(startTime)) - m.setEffectiveOnlineRateLimit(nowTime.Format(layout)) - } - } - }() + peakOffPeakMonitor = NewPeakOffPeakMonitor(m, m.updatePlatform.OnlineRateLimit.ServerTime) + peakOffPeakMonitor.Start() + } + } + stopPeakOffPeakMonitor := func() { + if peakOffPeakMonitor != nil { + peakOffPeakMonitor.Stop() } } @@ -304,7 +311,7 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp }, string(system.FailedStatus): func() error { if m.config.IntranetUpdate { - notifyDone() + stopPeakOffPeakMonitor() cacheFile := "/tmp/checkpolicy.cache" _ = os.RemoveAll(cacheFile) } @@ -476,7 +483,7 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp if j.next == nil { logger.Info("running in last end hook") if m.config.IntranetUpdate { - notifyDone() + stopPeakOffPeakMonitor() } // 如果出现单项失败,其他的状态需要修改,IsDownloading->notDownload // 如果已经有单项下载完成,然后取消下载,DownloadPause->notDownload diff --git a/src/lastore-daemon/peakOffPeakMonitor.go b/src/lastore-daemon/peakOffPeakMonitor.go new file mode 100644 index 000000000..c65f57b81 --- /dev/null +++ b/src/lastore-daemon/peakOffPeakMonitor.go @@ -0,0 +1,118 @@ +// SPDX-FileCopyrightText: 2024 UnionTech Software Technology Co., Ltd. +// +// SPDX-License-Identifier: GPL-3.0-or-later + +package main + +import ( + "sync" + "time" +) + +type PeakOffPeakMonitor struct { + manager *Manager + done chan struct{} + wg sync.WaitGroup + startTime time.Time + serverTime string + checkInterval time.Duration + stopped bool + stopMu sync.Mutex + lastTimeState int +} + +func NewPeakOffPeakMonitor(m *Manager, serverTime string) *PeakOffPeakMonitor { + return &PeakOffPeakMonitor{ + manager: m, + done: make(chan struct{}), + startTime: time.Now(), + serverTime: serverTime, + checkInterval: 5 * time.Second, + } +} + +func (m *PeakOffPeakMonitor) Start() { + if m.isAllDayRateLimit() { + logger.Info("all-day rate limit enabled, apply and skip monitor") + m.manager.setEffectiveOnlineRateLimit(m.serverTime) + m.stopMu.Lock() + m.stopped = true + m.stopMu.Unlock() + return + } + + if !m.needMonitor() { + logger.Info("peak/off-peak rate limit not enabled, skip starting monitor") + m.stopMu.Lock() + m.stopped = true + m.stopMu.Unlock() + return + } + + m.lastTimeState = m.manager.getCurrentTimeState(m.getCurrentTime()) + logger.Infof("peak/off-peak monitor started, initial time state: %d", m.lastTimeState) + m.manager.setEffectiveOnlineRateLimit(m.serverTime) + + m.wg.Add(1) + go m.run() +} + +func (m *PeakOffPeakMonitor) Stop() { + m.stopMu.Lock() + defer m.stopMu.Unlock() + if m.stopped { + return + } + m.stopped = true + close(m.done) + m.wg.Wait() + logger.Info("peak/off-peak monitor stopped") +} + +func (m *PeakOffPeakMonitor) isAllDayRateLimit() bool { + return m.manager.updatePlatform.OnlineRateLimit.AllDayRateLimit.Enable +} + +func (m *PeakOffPeakMonitor) needMonitor() bool { + onlineRateLimit := m.manager.updatePlatform.OnlineRateLimit + return onlineRateLimit.PeakTimeRateLimit.Enable || onlineRateLimit.OffPeakTimeRateLimit.Enable +} + +func (m *PeakOffPeakMonitor) run() { + defer m.wg.Done() + + ticker := time.NewTicker(m.checkInterval) + defer ticker.Stop() + + for { + select { + case <-m.done: + return + case <-ticker.C: + m.refresh() + } + } +} + +func (m *PeakOffPeakMonitor) refresh() { + nowTime := m.getCurrentTime() + currentState := m.manager.getCurrentTimeState(nowTime) + + if currentState != m.lastTimeState { + logger.Infof("time state changed: %d -> %d, refreshing rate limit", m.lastTimeState, currentState) + m.manager.setEffectiveOnlineRateLimit(nowTime) + m.lastTimeState = currentState + } +} + +func (m *PeakOffPeakMonitor) getCurrentTime() string { + layout := "15:04:05" + downloadStartServiceTime, err := time.ParseInLocation(layout, m.serverTime, time.Local) + if err != nil { + logger.Warningf("format server time failed: %v", err) + return time.Now().Format(layout) + } + now := time.Now() + nowTime := downloadStartServiceTime.Add(now.Sub(m.startTime)) + return nowTime.Format(layout) +}