Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,16 @@ error = '''
failed to convert a path to absolute path
'''

["PD:forcemerge:ErrForceMergeRangeContent"]
error = '''
invalid force merge range content, %s
'''

["PD:forcemerge:ErrLoadForceMergeRange"]
error = '''
load force merge range failed
'''

["PD:gin:ErrBindJSON"]
error = '''
bind JSON error
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ var (
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
)

// force merge errors
var (
ErrForceMergeRangeContent = errors.Normalize("invalid force merge range content, %s", errors.RFCCodeText("PD:forcemerge:ErrForceMergeRangeContent"))
ErrLoadForceMergeRange = errors.Normalize("load force merge range failed", errors.RFCCodeText("PD:forcemerge:ErrLoadForceMergeRange"))
)

// region label errors
var (
ErrRegionRuleContent = errors.Normalize("invalid region rule content, %s", errors.RFCCodeText("PD:region:ErrRegionRuleContent"))
Expand Down
11 changes: 10 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/id"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/pkdbforcemerge"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/statistics/buckets"
Expand All @@ -49,6 +50,7 @@ type Cluster struct {
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
ForceMergeManager *pkdbforcemerge.Manager
*labeler.RegionLabeler
*statistics.HotStat
*config.PersistOptions
Expand All @@ -61,6 +63,7 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
store := storage.NewStorageWithMemoryBackend()
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
Expand All @@ -76,7 +79,8 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
}
// It should be updated to the latest feature version.
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, storage.NewStorageWithMemoryBackend(), time.Second*5)
clus.ForceMergeManager, _ = pkdbforcemerge.NewManager(store)
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, store, time.Second*5)
return clus
}

Expand Down Expand Up @@ -200,6 +204,11 @@ func (mc *Cluster) GetRuleManager() *placement.RuleManager {
return mc.RuleManager
}

// GetForceMergeManager returns the force merge manager of the cluster.
func (mc *Cluster) GetForceMergeManager() *pkdbforcemerge.Manager {
return mc.ForceMergeManager
}

// GetRegionLabeler returns the region labeler of the cluster.
func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
return mc.RegionLabeler
Expand Down
66 changes: 66 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -231,6 +232,11 @@ type regionHandler struct {
rd *render.Render
}

type addForceMergeRangesRequest struct {
StartKeysHex []string `json:"start_keys"`
EndKeysHex []string `json:"end_keys"`
}

func newRegionHandler(svr *server.Server, rd *render.Render) *regionHandler {
return &regionHandler{
svr: svr,
Expand Down Expand Up @@ -941,6 +947,66 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter
h.rd.Text(w, http.StatusOK, msgBuilder.String())
}

// @Tags region
// @Summary Add force merge ranges.
// @Accept json
// @Param body body object true "json params"
// @Produce plain
// @Success 200 {string} string "Add force merge ranges successfully."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /regions/force-merge [post]
func (h *regionsHandler) AddForceMergeRanges(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
var input addForceMergeRangesRequest
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}
if rc.GetOpts().IsCrossTableMergeEnabled() {
h.rd.JSON(w, http.StatusBadRequest, "enable-cross-table-merge is true")
return
}
if keyType := rc.GetOpts().GetKeyType(); keyType != core.Table {
h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("key-type %s does not support force merge", keyType.String()))
return
}

manager := rc.GetForceMergeManager()
if manager == nil {
h.rd.JSON(w, http.StatusInternalServerError, "force merge manager is not initialized")
return
}
if err := manager.AddRanges(input.StartKeysHex, input.EndKeysHex); err != nil {
if errs.ErrForceMergeRangeContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
} else {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
}
h.rd.Text(w, http.StatusOK, "Add force merge ranges successfully.")
}

// @Tags region
// @Summary Clear force merge ranges.
// @Produce plain
// @Success 200 {string} string "Clear force merge ranges successfully."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /regions/force-merge [delete]
func (h *regionsHandler) DeleteForceMergeRanges(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
manager := rc.GetForceMergeManager()
if manager == nil {
h.rd.JSON(w, http.StatusInternalServerError, "force merge manager is not initialized")
return
}
if err := manager.ClearRanges(); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.Text(w, http.StatusOK, "Clear force merge ranges successfully.")
}

func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) {
rc := getCluster(r)
limit := defaultRegionLimit
Expand Down
160 changes: 160 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,166 @@ func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() {
suite.Len(idList, 4)
}

func (suite *regionTestSuite) TestAddForceMergeRanges() {
re := suite.Require()
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
defer func() {
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
}()

manager := suite.svr.GetRaftCluster().GetForceMergeManager()
re.NotNil(manager)

startKey := []byte("force-merge/a")
middleKey := []byte("force-merge/b")
endKey := []byte("force-merge/c")
cleanupEndKey := []byte("force-merge/d")
body, err := json.Marshal(addForceMergeRangesRequest{
StartKeysHex: []string{
hex.EncodeToString(startKey),
hex.EncodeToString(middleKey),
},
EndKeysHex: []string{
hex.EncodeToString(middleKey),
hex.EncodeToString(endKey),
},
})
suite.NoError(err)

err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix), body, tu.StatusOK(re))
suite.NoError(err)

exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10001, StartKey: startKey, EndKey: endKey}, nil)
suite.True(manager.SolveRegion(exactRegion))

cleanupRegion := core.NewRegionInfo(&metapb.Region{Id: 10002, StartKey: startKey, EndKey: cleanupEndKey}, nil)
suite.False(manager.SolveRegion(cleanupRegion))
suite.False(manager.SolveRegion(exactRegion))
}

func (suite *regionTestSuite) TestAddForceMergeRangesValidation() {
re := suite.Require()
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
defer func() {
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
}()

startKey := []byte("force-merge/x")
endKey := []byte("force-merge/y")
body, err := json.Marshal(addForceMergeRangesRequest{
StartKeysHex: []string{hex.EncodeToString(startKey)},
})
suite.NoError(err)

err = tu.CheckPostJSON(
testDialClient,
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
body,
tu.Status(re, http.StatusBadRequest),
tu.StringContain(re, "start key count 1 does not match end key count 0"),
)
suite.NoError(err)

exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10003, StartKey: startKey, EndKey: endKey}, nil)
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
}

func (suite *regionTestSuite) TestAddForceMergeRangesCrossTableMergeEnabled() {
re := suite.Require()
startKey := []byte("force-merge/enable/x")
endKey := []byte("force-merge/enable/y")
body, err := json.Marshal(addForceMergeRangesRequest{
StartKeysHex: []string{hex.EncodeToString(startKey)},
EndKeysHex: []string{hex.EncodeToString(endKey)},
})
suite.NoError(err)

err = tu.CheckPostJSON(
testDialClient,
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
body,
tu.Status(re, http.StatusBadRequest),
tu.StringContain(re, "enable-cross-table-merge is true"),
)
suite.NoError(err)

exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10004, StartKey: startKey, EndKey: endKey}, nil)
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
}

func (suite *regionTestSuite) TestAddForceMergeRangesNonTableKeyType() {
re := suite.Require()
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
defer func() {
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
}()

originalCfg := suite.svr.GetConfig().PDServerCfg
rawCfg := originalCfg
rawCfg.KeyType = core.Raw.String()
suite.NoError(suite.svr.SetPDServerConfig(rawCfg))
defer func() {
suite.NoError(suite.svr.SetPDServerConfig(originalCfg))
}()

startKey := []byte("force-merge/raw/x")
endKey := []byte("force-merge/raw/y")
body, err := json.Marshal(addForceMergeRangesRequest{
StartKeysHex: []string{hex.EncodeToString(startKey)},
EndKeysHex: []string{hex.EncodeToString(endKey)},
})
suite.NoError(err)

err = tu.CheckPostJSON(
testDialClient,
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
body,
tu.Status(re, http.StatusBadRequest),
tu.StringContain(re, "key-type raw does not support force merge"),
)
suite.NoError(err)

exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10005, StartKey: startKey, EndKey: endKey}, nil)
suite.False(suite.svr.GetRaftCluster().GetForceMergeManager().SolveRegion(exactRegion))
}

func (suite *regionTestSuite) TestDeleteForceMergeRanges() {
re := suite.Require()
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"false"}`), tu.StatusOK(re)))
defer func() {
suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))
}()

startKey := []byte("force-merge/delete/x")
endKey := []byte("force-merge/delete/y")
body, err := json.Marshal(addForceMergeRangesRequest{
StartKeysHex: []string{hex.EncodeToString(startKey)},
EndKeysHex: []string{hex.EncodeToString(endKey)},
})
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(
testDialClient,
fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix),
body,
tu.StatusOK(re),
))

manager := suite.svr.GetRaftCluster().GetForceMergeManager()
exactRegion := core.NewRegionInfo(&metapb.Region{Id: 10006, StartKey: startKey, EndKey: endKey}, nil)
suite.True(manager.SolveRegion(exactRegion))

suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-cross-table-merge":"true"}`), tu.StatusOK(re)))

req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/regions/force-merge", suite.urlPrefix), nil)
suite.NoError(err)
resp, err := testDialClient.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)

suite.False(manager.SolveRegion(exactRegion))
}

func (suite *regionTestSuite) TestScatterRegions() {
re := suite.Require()
r1 := newTestRegionInfo(601, 13, []byte("b1"), []byte("b2"))
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/sibling/{id}", regionsHandler.GetRegionSiblings, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/accelerate-schedule/batch", regionsHandler.AccelerateRegionsScheduleInRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/force-merge", regionsHandler.AddForceMergeRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/force-merge", regionsHandler.DeleteForceMergeRanges, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/scatter", regionsHandler.ScatterRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
11 changes: 11 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/tikv/pd/server/schedule/checker"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/pkdbforcemerge"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -143,6 +144,7 @@ type RaftCluster struct {
hotStat *statistics.HotStat
hotBuckets *buckets.HotBucketCache
ruleManager *placement.RuleManager
forceMergeManager *pkdbforcemerge.Manager
regionLabeler *labeler.RegionLabeler
replicationMode *replication.ModeManager
unsafeRecoveryController *unsafeRecoveryController
Expand Down Expand Up @@ -264,6 +266,10 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
}
c.forceMergeManager, err = pkdbforcemerge.NewManager(c.storage)
if err != nil {
return err
}

c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
if err != nil {
Expand Down Expand Up @@ -646,6 +652,11 @@ func (c *RaftCluster) GetRuleManager() *placement.RuleManager {
return c.ruleManager
}

// GetForceMergeManager returns the force merge manager reference.
func (c *RaftCluster) GetForceMergeManager() *pkdbforcemerge.Manager {
return c.forceMergeManager
}

// GetRegionLabeler returns the region labeler.
func (c *RaftCluster) GetRegionLabeler() *labeler.RegionLabeler {
return c.regionLabeler
Expand Down
Loading
Loading