Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/mcs/utils/constant"
servercluster "github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
)

func newTestServer(t *testing.T, keyspaceGroupEnabled, tsoDynamicSwitchingEnabled bool) *Server {
t.Helper()

cfg := config.NewConfig()
cfg.Microservice.EnableTSODynamicSwitching = tsoDynamicSwitchingEnabled
s := &Server{
ctx: context.Background(),
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
isKeyspaceGroupEnabled: keyspaceGroupEnabled,
}
atomic.StoreInt64(&s.isRunning, 1)
return s
}

func TestIsServiceIndependent(t *testing.T) {
re := require.New(t)

// Keyspace groups disabled: always false even if cluster says independent.
s := newTestServer(t, false, false)
s.cluster = &servercluster.RaftCluster{}
s.cluster.SetServiceIndependent(constant.TSOServiceName)
re.False(s.IsServiceIndependent(constant.TSOServiceName))

// Keyspace groups enabled, dynamic switching disabled: TSO always independent
// (microservice is expected to be running).
s2 := newTestServer(t, true, false)
re.True(s2.IsServiceIndependent(constant.TSOServiceName))

// Keyspace groups enabled, dynamic switching enabled: depends on cluster state.
s3 := newTestServer(t, true, true)
s3.cluster = &servercluster.RaftCluster{}
re.False(s3.IsServiceIndependent(constant.TSOServiceName))

// Service set independent: true.
s3.cluster.SetServiceIndependent(constant.TSOServiceName)
re.True(s3.IsServiceIndependent(constant.TSOServiceName))

// Server closed: false.
atomic.StoreInt64(&s3.isRunning, 0)
re.False(s3.IsServiceIndependent(constant.TSOServiceName))
}
211 changes: 211 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
sd "github.com/tikv/pd/client/servicediscovery"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/keyspace/constant"
mcsconst "github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
Expand Down Expand Up @@ -712,6 +713,216 @@
re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode"))
}

// TestDynamicSwitchingPDToTSO tests that when dynamic switching is enabled and a TSO
// microservice starts, PD stops serving TSO locally, sets ServiceIndependent,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three new integration tests (TestDynamicSwitchingPDToTSO, TestDynamicSwitchingTSOToPDFallback, TestDynamicSwitchingWithLeaderTransfer) enable the usePDServiceMode failpoint, which pins the client to PD_SVC_MODE by short-circuiting updateServiceModeLoop(). This means the tests do not exercise the client-side service-mode discovery path at all.

If the intent is to test server-side dynamic switching behavior only, please add a comment clarifying this scope limitation and noting that client-side service-mode discovery is covered by TestTSOServiceSwitch in tests/integrations/mcs/tso/server_test.go.

Otherwise, if these tests are meant to be end-to-end dynamic switching tests, the failpoint should be removed.

// and the TSO microservice serves timestamps successfully.
//
// NOTE: This test uses the usePDServiceMode failpoint to pin the client to PD_SVC_MODE,
// so it only validates server-side dynamic switching behavior. Client-side service-mode
// discovery is covered by TestTSOServiceSwitch in tests/integrations/mcs/tso/server_test.go.
func TestDynamicSwitchingPDToTSO(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode"))
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create PD cluster with dynamic switching enabled.
pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Microservice.EnableTSODynamicSwitching = true
})
re.NoError(err)
defer pdCluster.Destroy()
re.NoError(pdCluster.RunInitialServers())
leaderName := pdCluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := pdCluster.GetServer(leaderName)
re.NoError(pdLeader.BootstrapCluster())
backendEndpoints := pdLeader.GetAddr()

// Create a PD client.
pdClient, err := pd.NewClientWithContext(ctx,
caller.TestComponent,
[]string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1))
re.NoError(err)
defer pdClient.Close()

// Without TSO microservice, PD should serve TSO locally.
var globalLastTS uint64
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)
re.False(pdLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName))

// Start TSO microservice.
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
defer tsoCluster.Destroy()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test (and the two below) only uses WaitForTSOServiceAvailable() which is essentially Eventually(client.GetTS() == nil) — it proves TSO becomes available at some point, but does not verify that timestamps remain monotonically increasing across the switch.

The repo already has a stronger helper checkTSOMonotonic() (used in TestTSOServiceSwitch). Consider tracking a globalLastTS across the switch boundary and asserting monotonicity:

var globalLastTS uint64
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) // before switch
// ... start TSO microservice ...
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) // after switch

Without this, a timestamp regression during the switch would not be caught.

tsoCluster.WaitForDefaultPrimaryServing(re)

// ServiceIndependent should be set and TSO should remain monotonically increasing.
testutil.Eventually(re, func() bool {
return pdLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName)
})
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)
}

// TestDynamicSwitchingTSOToPDFallback tests that when a TSO microservice goes away,
// PD resumes serving TSO locally and timestamps remain monotonically increasing.
//
// NOTE: This test uses the usePDServiceMode failpoint to pin the client to PD_SVC_MODE,
// so it only validates server-side dynamic switching behavior. Client-side service-mode
// discovery is covered by TestTSOServiceSwitch in tests/integrations/mcs/tso/server_test.go.
func TestDynamicSwitchingTSOToPDFallback(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode"))
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create PD cluster with dynamic switching enabled.
pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Microservice.EnableTSODynamicSwitching = true
})
re.NoError(err)
defer pdCluster.Destroy()
re.NoError(pdCluster.RunInitialServers())
leaderName := pdCluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := pdCluster.GetServer(leaderName)
re.NoError(pdLeader.BootstrapCluster())
backendEndpoints := pdLeader.GetAddr()

// Create a PD client.
pdClient, err := pd.NewClientWithContext(ctx,
caller.TestComponent,
[]string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1))
re.NoError(err)
defer pdClient.Close()

// Start TSO microservice and wait for transition.
var globalLastTS uint64
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)
testutil.Eventually(re, func() bool {
return pdLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName)
})
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)

// Destroy TSO microservice.
tsoCluster.Destroy()

// PD should resume serving TSO locally with monotonically increasing timestamps.
testutil.Eventually(re, func() bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Missing blank line between TestDynamicSwitchingTSOToPDFallback and TestDynamicSwitchingWithLeaderTransfer.

return !pdLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName)
})
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)
}

// TestDynamicSwitchingWithLeaderTransfer tests that TSO service survives
// PD leader transfers when a TSO microservice is active, and timestamps
// remain monotonically increasing across transfers.
//
// NOTE: This test uses the usePDServiceMode failpoint to pin the client to PD_SVC_MODE,
// so it only validates server-side dynamic switching behavior. Client-side service-mode
// discovery is covered by TestTSOServiceSwitch in tests/integrations/mcs/tso/server_test.go.
func TestDynamicSwitchingWithLeaderTransfer(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode"))
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a 3-node PD cluster with dynamic switching enabled.
pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Microservice.EnableTSODynamicSwitching = true
})
re.NoError(err)
defer pdCluster.Destroy()
re.NoError(pdCluster.RunInitialServers())
leaderName := pdCluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := pdCluster.GetServer(leaderName)
re.NoError(pdLeader.BootstrapCluster())
backendEndpoints := pdLeader.GetAddr()

// PD client.
pdClient, err := pd.NewClientWithContext(ctx,
caller.TestComponent,
[]string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1))
re.NoError(err)
defer pdClient.Close()

// PD should start serving TSO.
var globalLastTS uint64
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The loop doesn't assert that leadership actually changed after ResignLeaderWithRetry(). Consider capturing oldLeaderName before resign and asserting re.NotEqual(oldLeaderName, leaderName) to ensure the test is truly exercising a leader transfer scenario.

oldLeaderName := pdCluster.WaitLeader()
err = pdCluster.GetServer(oldLeaderName).ResignLeaderWithRetry()
re.NoError(err)
leaderName = pdCluster.WaitLeader()
re.NotEqual(oldLeaderName, leaderName)

// Start TSO microservice and wait for ServiceIndependent.
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
testutil.Eventually(re, func() bool {
return pdLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName)
})

// Resign the leader and verify TSO service stays available with monotonic timestamps.
for range 2 {
leaderName = pdCluster.WaitLeader()
re.NotEmpty(leaderName)
err = pdCluster.GetServer(leaderName).ResignLeaderWithRetry()
re.NoError(err)
leaderName = pdCluster.WaitLeader()
re.NotEmpty(leaderName)

// ServiceIndependent must remain set after leader resignation.
newLeader := pdCluster.GetServer(leaderName)
testutil.Eventually(re, func() bool {
return newLeader.GetServer().IsServiceIndependent(mcsconst.TSOServiceName)
})
// TSO should remain available and monotonic after each leader transfer.
waitAndCheckTSOMonotonic(ctx, re, pdClient, &globalLastTS, 10)
}
}

// waitAndCheckTSOMonotonic waits for TSO to become available and then verifies that
// `successCount` consecutive successful TSO requests return globally increasing timestamps
// relative to *globalLastTS. Unlike calling WaitForTSOServiceAvailable + a separate
// monotonicity check, this function validates every successful GetTS (including the first
// one after a switchover) against globalLastTS, so a regression at the exact transition
// boundary cannot slip through.
func waitAndCheckTSOMonotonic(
ctx context.Context, re *require.Assertions, pdClient pd.Client, globalLastTS *uint64, successCount int,

Check failure on line 909 in tests/integrations/tso/client_test.go

View workflow job for this annotation

GitHub Actions / statics

waitAndCheckTSOMonotonic - successCount always receives 10 (unparam)
) {
var successes int
testutil.Eventually(re, func() bool {
physical, logical, err := pdClient.GetTS(ctx)
if err != nil {
return false
}
ts := tsoutil.ComposeTS(physical, logical)
re.Greater(ts, *globalLastTS,
"TSO is not globally increasing: last %d, current %d", *globalLastTS, ts)
*globalLastTS = ts
successes++
return successes >= successCount
})
}

func checkTSO(
ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string,
) {
Expand Down
Loading