Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/忙闲时切换事件.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# 在平台下发了忙闲时配置后,需要在后台运行一个协程任务,该任务主要执行如下流程:
- 检测当前处于哪种时间状态:忙时还是闲时
- 当当前时间从忙时变成了闲时,或者从闲时变成了忙时,均需要刷新updater中的相关限速配置。只有状态变化才刷新相关限速配置

这个任务的生命周期如下:
- 生命周期不得长于prepare_dist_upgrade这个下载任务
- 这个任务的运行依赖于平台下发的限速配置中,开启了闲时或者忙时限速。
- 当下载job结束后,这个忙闲时切换监控流程也需要结束
- 如果远程全局、闲时和忙时均没有开启,则使用本地限速

术语规范:闲时-offPeak,忙时-peak

```
applyOnlineRateLimit()
├── AllDayRateLimit.Enable == true → 使用全局限速
└── else → getCurrentTimeState()
├── PeakTimeRateLimit.Enable && 时间在忙时范围 → 返回 timeStateBusy → 使用忙时限速
├── OffPeakTimeRateLimit.Enable && 时间在闲时范围 → 返回 timeStateIdle → 使用闲时限速
└── 其他情况 → 返回 timeStateUnknown → getDownloadSpeedLimitConfigByTimeState() 的 default 分支 → 使用本地限速
```
95 changes: 51 additions & 44 deletions src/lastore-daemon/manager_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,56 @@ func (m *Manager) applyOnlineRateLimit(downloadSpeed *downloadSpeedLimitConfig,
downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.AllDayRateLimit.Bps)
downloadSpeed.IsOnlineSpeedLimit = true
downloadSpeed.DownloadSpeedLimitEnabled = true
} else if isTimeInRange(nowTime, onlineRateLimit.PeakTimeRateLimit.StartTime, onlineRateLimit.PeakTimeRateLimit.EndTime) &&
onlineRateLimit.PeakTimeRateLimit.Enable {
downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.PeakTimeRateLimit.Bps)
downloadSpeed.IsOnlineSpeedLimit = true
downloadSpeed.DownloadSpeedLimitEnabled = true
} else if isTimeInRange(nowTime, onlineRateLimit.OffPeakTimeRateLimit.StartTime, onlineRateLimit.OffPeakTimeRateLimit.EndTime) &&
onlineRateLimit.OffPeakTimeRateLimit.Enable {
downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.OffPeakTimeRateLimit.Bps)
downloadSpeed.IsOnlineSpeedLimit = true
downloadSpeed.DownloadSpeedLimitEnabled = true
} else {
err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), downloadSpeed)
timeState := m.getCurrentTimeState(nowTime)
*downloadSpeed = getDownloadSpeedLimitConfigByTimeState(m, timeState)
}
}

const (
timeStateUnknown = iota
timeStatePeak
timeStateOffPeak
)

func getDownloadSpeedLimitConfigByTimeState(m *Manager, timeState int) downloadSpeedLimitConfig {
switch timeState {
case timeStatePeak:
return downloadSpeedLimitConfig{
LimitSpeed: strconv.Itoa(m.updatePlatform.OnlineRateLimit.PeakTimeRateLimit.Bps),
IsOnlineSpeedLimit: true,
DownloadSpeedLimitEnabled: true,
}
case timeStateOffPeak:
return downloadSpeedLimitConfig{
LimitSpeed: strconv.Itoa(m.updatePlatform.OnlineRateLimit.OffPeakTimeRateLimit.Bps),
IsOnlineSpeedLimit: true,
DownloadSpeedLimitEnabled: true,
}
default:
var downloadSpeed downloadSpeedLimitConfig
err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), &downloadSpeed)
if err != nil {
downloadSpeed.IsOnlineSpeedLimit = false
downloadSpeed.LimitSpeed = strconv.FormatInt(defaultSpeedLimit, 10)
downloadSpeed.DownloadSpeedLimitEnabled = true
}
return downloadSpeed
}
}

func (m *Manager) getCurrentTimeState(nowTime string) int {
onlineRateLimit := m.updatePlatform.OnlineRateLimit
if isTimeInRange(nowTime, onlineRateLimit.PeakTimeRateLimit.StartTime, onlineRateLimit.PeakTimeRateLimit.EndTime) &&
onlineRateLimit.PeakTimeRateLimit.Enable {
return timeStatePeak
} else if isTimeInRange(nowTime, onlineRateLimit.OffPeakTimeRateLimit.StartTime, onlineRateLimit.OffPeakTimeRateLimit.EndTime) &&
onlineRateLimit.OffPeakTimeRateLimit.Enable {
return timeStateOffPeak
}
return timeStateUnknown
}

func isTimeInRange(nowTimeStr, startStr, endStr string) bool {
now, err := parseTime(nowTimeStr)
if err != nil {
Expand Down Expand Up @@ -166,42 +196,19 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp
m.statusManager.SetUpdateStatus(mode, system.DownloadErr)
return nil, dbusutil.ToError(errors.New(string(errStr)))
}
done := make(chan struct{}, 1)
notifyDone := func() {
select {
case done <- struct{}{}:
default:
}
}
var peakOffPeakMonitor *PeakOffPeakMonitor
if m.config.IntranetUpdate {
//私有化更新有忙闲时下载限速的功能,需要在真正开始下载前刷新一下线上限速配置
if err = m.refreshThrottlingFromPlatform(); err != nil {
logger.Warning("updatePlatform gen download speed limit failed", err)
} else {
go func() {
ticker := time.NewTicker(5 * time.Second)
startTime := time.Now()
defer ticker.Stop()
var count int
layout := "15:04:05"
for {
select {
case <-done:
logger.Info("online rate limit ticker stopped")
return
case t := <-ticker.C:
count++
downloadStartServiceTime, err := time.ParseInLocation(layout, m.updatePlatform.OnlineRateLimit.ServerTime, time.Local)
if err != nil {
logger.Warningf("format OnlineRateLimit service time failed, %v", err)
return
}
logger.Infof("downloadStartServiceTime %v", downloadStartServiceTime)
nowTime := downloadStartServiceTime.Add(t.Sub(startTime))
m.setEffectiveOnlineRateLimit(nowTime.Format(layout))
}
}
}()
peakOffPeakMonitor = NewPeakOffPeakMonitor(m, m.updatePlatform.OnlineRateLimit.ServerTime)
peakOffPeakMonitor.Start()
}
}
stopPeakOffPeakMonitor := func() {
if peakOffPeakMonitor != nil {
peakOffPeakMonitor.Stop()
}
}

Expand Down Expand Up @@ -304,7 +311,7 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp
},
string(system.FailedStatus): func() error {
if m.config.IntranetUpdate {
notifyDone()
stopPeakOffPeakMonitor()
cacheFile := "/tmp/checkpolicy.cache"
_ = os.RemoveAll(cacheFile)
}
Expand Down Expand Up @@ -476,7 +483,7 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp
if j.next == nil {
logger.Info("running in last end hook")
if m.config.IntranetUpdate {
notifyDone()
stopPeakOffPeakMonitor()
}
// 如果出现单项失败,其他的状态需要修改,IsDownloading->notDownload
// 如果已经有单项下载完成,然后取消下载,DownloadPause->notDownload
Expand Down
118 changes: 118 additions & 0 deletions src/lastore-daemon/peakOffPeakMonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: 2024 UnionTech Software Technology Co., Ltd.
//
// SPDX-License-Identifier: GPL-3.0-or-later

package main

import (
"sync"
"time"
)

type PeakOffPeakMonitor struct {
manager *Manager
done chan struct{}
wg sync.WaitGroup
startTime time.Time
serverTime string
checkInterval time.Duration
stopped bool
stopMu sync.Mutex
lastTimeState int
}

func NewPeakOffPeakMonitor(m *Manager, serverTime string) *PeakOffPeakMonitor {
return &PeakOffPeakMonitor{
manager: m,
done: make(chan struct{}),
startTime: time.Now(),
serverTime: serverTime,
checkInterval: 5 * time.Second,
}
}

func (m *PeakOffPeakMonitor) Start() {
if m.isAllDayRateLimit() {
logger.Info("all-day rate limit enabled, apply and skip monitor")
m.manager.setEffectiveOnlineRateLimit(m.serverTime)
m.stopMu.Lock()
m.stopped = true
m.stopMu.Unlock()
return
}

if !m.needMonitor() {
logger.Info("peak/off-peak rate limit not enabled, skip starting monitor")
m.stopMu.Lock()
m.stopped = true
m.stopMu.Unlock()
return
}

m.lastTimeState = m.manager.getCurrentTimeState(m.getCurrentTime())
logger.Infof("peak/off-peak monitor started, initial time state: %d", m.lastTimeState)
m.manager.setEffectiveOnlineRateLimit(m.serverTime)

m.wg.Add(1)
go m.run()
}

func (m *PeakOffPeakMonitor) Stop() {
m.stopMu.Lock()
defer m.stopMu.Unlock()
if m.stopped {
return
}
m.stopped = true
close(m.done)
m.wg.Wait()
logger.Info("peak/off-peak monitor stopped")
}

func (m *PeakOffPeakMonitor) isAllDayRateLimit() bool {
return m.manager.updatePlatform.OnlineRateLimit.AllDayRateLimit.Enable
}

func (m *PeakOffPeakMonitor) needMonitor() bool {
onlineRateLimit := m.manager.updatePlatform.OnlineRateLimit
return onlineRateLimit.PeakTimeRateLimit.Enable || onlineRateLimit.OffPeakTimeRateLimit.Enable
}

func (m *PeakOffPeakMonitor) run() {
defer m.wg.Done()

ticker := time.NewTicker(m.checkInterval)
defer ticker.Stop()

for {
select {
case <-m.done:
return
case <-ticker.C:
m.refresh()
}
}
}

func (m *PeakOffPeakMonitor) refresh() {
nowTime := m.getCurrentTime()
currentState := m.manager.getCurrentTimeState(nowTime)

if currentState != m.lastTimeState {
logger.Infof("time state changed: %d -> %d, refreshing rate limit", m.lastTimeState, currentState)
m.manager.setEffectiveOnlineRateLimit(nowTime)
m.lastTimeState = currentState
}
}

func (m *PeakOffPeakMonitor) getCurrentTime() string {
layout := "15:04:05"
downloadStartServiceTime, err := time.ParseInLocation(layout, m.serverTime, time.Local)
if err != nil {
logger.Warningf("format server time failed: %v", err)
return time.Now().Format(layout)
}
now := time.Now()
nowTime := downloadStartServiceTime.Add(now.Sub(m.startTime))
return nowTime.Format(layout)
}
Loading