diff --git a/command/server/config/config.go b/command/server/config/config.go index bc64e99b62..f9f7b30e32 100644 --- a/command/server/config/config.go +++ b/command/server/config/config.go @@ -13,28 +13,29 @@ import ( // Config defines the server configuration params type Config struct { - GenesisPath string `json:"chain_config" yaml:"chain_config"` - SecretsConfigPath string `json:"secrets_config" yaml:"secrets_config"` - DataDir string `json:"data_dir" yaml:"data_dir"` - BlockGasTarget string `json:"block_gas_target" yaml:"block_gas_target"` - GRPCAddr string `json:"grpc_addr" yaml:"grpc_addr"` - JSONRPCAddr string `json:"jsonrpc_addr" yaml:"jsonrpc_addr"` - Telemetry *Telemetry `json:"telemetry" yaml:"telemetry"` - Network *Network `json:"network" yaml:"network"` - DataFeed *DataFeed `json:"data_feed" yaml:"data_feed"` - ShouldSeal bool `json:"seal" yaml:"seal"` - TxPool *TxPool `json:"tx_pool" yaml:"tx_pool"` - LogLevel string `json:"log_level" yaml:"log_level"` - RestoreFile string `json:"restore_file" yaml:"restore_file"` - BlockTime uint64 `json:"block_time_s" yaml:"block_time_s"` - Headers *Headers `json:"headers" yaml:"headers"` - LogFilePath string `json:"log_to" yaml:"log_to"` - JSONRPCBatchRequestLimit uint64 `json:"json_rpc_batch_request_limit" yaml:"json_rpc_batch_request_limit"` - JSONRPCBlockRangeLimit uint64 `json:"json_rpc_block_range_limit" yaml:"json_rpc_block_range_limit"` - JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"` - RPCNrAppName string `json:"rpc_nr_app_name" yaml:"rpc_nr_app_name"` - RPCNrLicenseKey string `json:"rpc_nr_license_key" yaml:"rpc_nr_license_key"` - GasPriceBlockUtilizationMinimum float64 `json:"gasprice_block_utilization_threshold" yaml:"gasprice_block_utilization_threshold"` //nolint:lll + GenesisPath string `json:"chain_config" yaml:"chain_config"` + SecretsConfigPath string `json:"secrets_config" yaml:"secrets_config"` + DataDir string `json:"data_dir" yaml:"data_dir"` + BlockGasTarget string `json:"block_gas_target" yaml:"block_gas_target"` + GRPCAddr string `json:"grpc_addr" yaml:"grpc_addr"` + JSONRPCAddr string `json:"jsonrpc_addr" yaml:"jsonrpc_addr"` + Telemetry *Telemetry `json:"telemetry" yaml:"telemetry"` + Network *Network `json:"network" yaml:"network"` + DataFeed *DataFeed `json:"data_feed" yaml:"data_feed"` + Monitoring *Monitoring `json:"monitoring" yaml:"monitoring"` + ShouldSeal bool `json:"seal" yaml:"seal"` + TxPool *TxPool `json:"tx_pool" yaml:"tx_pool"` + LogLevel string `json:"log_level" yaml:"log_level"` + RestoreFile string `json:"restore_file" yaml:"restore_file"` + BlockTime uint64 `json:"block_time_s" yaml:"block_time_s"` + Headers *Headers `json:"headers" yaml:"headers"` + LogFilePath string `json:"log_to" yaml:"log_to"` + JSONRPCBatchRequestLimit uint64 `json:"json_rpc_batch_request_limit" yaml:"json_rpc_batch_request_limit"` + JSONRPCBlockRangeLimit uint64 `json:"json_rpc_block_range_limit" yaml:"json_rpc_block_range_limit"` + JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"` + RPCNrAppName string `json:"rpc_nr_app_name" yaml:"rpc_nr_app_name"` + RPCNrLicenseKey string `json:"rpc_nr_license_key" yaml:"rpc_nr_license_key"` + GasPriceBlockUtilizationMinimum float64 `json:"gasprice_block_utilization_threshold" yaml:"gasprice_block_utilization_threshold"` //nolint:lll } // Telemetry holds the config details for metric services. @@ -64,6 +65,14 @@ type DataFeed struct { SXNodeAddress string `json:"sx_node_address" yaml:"sx_node_address"` } +type Monitoring struct { + IsEnable bool `json:"is_enable" yaml:"is_enable"` + DelayInSecondsProfile uint64 `json:"delay_in_seconds_profile" yaml:"delay_in_seconds_profile"` + IsMemStressTestEnable bool `json:"is_mem_stress_test_enable" yaml:"is_mem_stress_test_enable"` + DelayInSecondsStats uint64 `json:"delay_in_seconds_stats" yaml:"delay_in_seconds_stats"` + Threshold float64 `json:"threshold" yaml:"threshold"` +} + // TxPool defines the TxPool configuration params type TxPool struct { PriceLimit uint64 `json:"price_limit" yaml:"price_limit"` @@ -125,6 +134,13 @@ func DefaultConfig() *Config { OutcomeReporterAddress: "", SXNodeAddress: "", }, + Monitoring: &Monitoring{ + IsEnable: false, + DelayInSecondsProfile: 60, + IsMemStressTestEnable: false, + DelayInSecondsStats: 60, + Threshold: 0.8, // 80% + }, LogLevel: "INFO", RestoreFile: "", BlockTime: DefaultBlockTime, diff --git a/command/server/init.go b/command/server/init.go index f5c9f939df..21dc5c7e95 100644 --- a/command/server/init.go +++ b/command/server/init.go @@ -232,6 +232,10 @@ func (p *serverParams) initAddresses() error { return err } + if err := p.initMonitoringParams(); err != nil { + return err + } + // if err := p.initCustomContractAddress(); err != nil { // return err // } @@ -326,6 +330,16 @@ func (p *serverParams) initDataFeedParams() error { return nil } +func (p *serverParams) initMonitoringParams() error { + p.isEnable = p.rawConfig.Monitoring.IsEnable + p.delayInSecondsProfile = p.rawConfig.Monitoring.DelayInSecondsProfile + p.isMemStressTestEnable = p.rawConfig.Monitoring.IsMemStressTestEnable + p.delayInSecondsStats = p.rawConfig.Monitoring.DelayInSecondsStats + p.threshold = p.rawConfig.Monitoring.Threshold + + return nil +} + func (p *serverParams) initGRPCAddress() error { var parseErr error diff --git a/command/server/params.go b/command/server/params.go index 7ae6a4eabd..57fd633f0a 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -47,6 +47,11 @@ const ( verifyOutcomeAPIURLFlag = "verify-outcome-api-url" outcomeReporterAddressFlag = "outcome-reporter-address" sxNodeAddressFlag = "sx-node-address" + isEnableFlag = "is-enable" + delayInSecondsProfileFlag = "delay-in-seconds" + isMemStressTestEnableFlag = "is-mem-stress-test-enable" + delayInSecondsStatsFlag = "ticker-in-seconds" + thresholdFlag = "threshold" ) // Flags that are deprecated, but need to be preserved for @@ -102,6 +107,12 @@ type serverParams struct { dataFeedOutcomeReporterAddress string dataFeedSXNodeAddress string + isEnable bool + delayInSecondsProfile uint64 + isMemStressTestEnable bool + delayInSecondsStats uint64 + threshold float64 + ibftBaseTimeoutLegacy uint64 genesisConfig *chain.Chain @@ -197,6 +208,13 @@ func (p *serverParams) generateConfig() *server.Config { OutcomeReporterAddress: p.dataFeedOutcomeReporterAddress, SXNodeAddress: p.dataFeedSXNodeAddress, }, + Monitoring: &server.Monitoring{ + IsEnable: p.isEnable, + DelayInSecondsProfile: p.delayInSecondsProfile, + IsMemStressTestEnable: p.isMemStressTestEnable, + DelayInSecondsStats: p.delayInSecondsStats, + Threshold: p.threshold, + }, DataDir: p.rawConfig.DataDir, Seal: p.rawConfig.ShouldSeal, PriceLimit: p.rawConfig.TxPool.PriceLimit, diff --git a/command/server/server.go b/command/server/server.go index c346ff2d65..6744cd5abf 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -292,6 +292,41 @@ func setFlags(cmd *cobra.Command) { "the address of the SXNode contract, used by DataFeedService reporting txs", ) + cmd.Flags().BoolVar( + ¶ms.isEnable, + isEnableFlag, + defaultConfig.Monitoring.IsEnable, + "indicates when pprof should start the snapshot profile routine", + ) + + cmd.Flags().Uint64Var( + ¶ms.delayInSecondsProfile, + delayInSecondsProfileFlag, + defaultConfig.Monitoring.DelayInSecondsProfile, + "indicates the delay time the prof will get from the snapshot profile", + ) + + cmd.Flags().BoolVar( + ¶ms.isMemStressTestEnable, + isMemStressTestEnableFlag, + defaultConfig.Monitoring.IsMemStressTestEnable, + "flag that triggers the memory stress test function", + ) + + cmd.Flags().Uint64Var( + ¶ms.delayInSecondsStats, + delayInSecondsStatsFlag, + defaultConfig.Monitoring.DelayInSecondsStats, + "indicates the delay time that the memory stress test function will allocate memory", + ) + + cmd.Flags().Float64Var( + ¶ms.threshold, + thresholdFlag, + defaultConfig.Monitoring.Threshold, + "the threshold for memory usage, specified as a percentage between 0 and 1", + ) + setLegacyFlags(cmd) setDevFlags(cmd) diff --git a/consensus/ibft/verifier.go b/consensus/ibft/verifier.go index 3761075616..3311e60875 100644 --- a/consensus/ibft/verifier.go +++ b/consensus/ibft/verifier.go @@ -124,14 +124,14 @@ func (i *backendIBFT) IsProposer(id []byte, height, round uint64) bool { return false } - + nextProposer := CalcProposer( i.currentValidators, round, previousProposer, ) - // store nextProposer in case we need to ID them as offline + // store nextProposer in case we need to ID them as offline i.nextProposer = nextProposer.Addr() return types.BytesToAddress(id) == nextProposer.Addr() diff --git a/monitoring/pprof.go b/monitoring/pprof.go new file mode 100644 index 0000000000..4fd3e4b9c1 --- /dev/null +++ b/monitoring/pprof.go @@ -0,0 +1,79 @@ +package monitoring + +import ( + "fmt" + "os" + "path/filepath" + "runtime/pprof" + "time" + + "github.com/hashicorp/go-hclog" +) + +type Profile struct { + Logger hclog.Logger + IsEnable bool + DelayInSecondsProfile uint64 + Goroutine *pprof.Profile + Heap *pprof.Profile +} + +/* + SetupPprofProfiles configures the pprof profiles for monitoring different aspects of the program. + It creates a directory to store the profile files and starts collecting + the specified profiles in separate goroutines +*/ +func (profile *Profile) SetupPprofProfiles() { + // Get the directory of the executable + exeDir, err := filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + profile.Logger.Error("Error getting executable directory", "error", err) + return + } + + // Directory to store profiling files + profileDir := filepath.Join(exeDir, "../pprof") + if err := os.MkdirAll(profileDir, 0755); err != nil { + profile.Logger.Error("Error creating profile directory", "error", err) + return + } + + // Start collecting profiles + if profile.IsEnable { + go collectProfile(profile.Heap, filepath.Join(profileDir, "heap"), profile.Logger, profile.DelayInSecondsProfile) + go collectProfile(profile.Goroutine, filepath.Join(profileDir, "goroutine"), profile.Logger, profile.DelayInSecondsProfile) + } +} + +/* + Collects the specified pprof profile data at regular intervals + and writes it to a new file in the specified profile directory +*/ +func collectProfile(profile *pprof.Profile, profileDir string, logger hclog.Logger, delayInSecondsProfile uint64) { + for { + // Generate a unique filename for the profile + profileFileName := fmt.Sprintf(filepath.Join(profileDir, "%s_%s.prof"), profile.Name(), time.Now().Format("20060102-1504")) + + // Create the profile directory if it does not exist + if err := os.MkdirAll(profileDir, 0755); err != nil { + logger.Error("Error creating profile directory:", err) + return + } + + // Create a new file to write the profile data + profileFile, err := os.Create(profileFileName) + if err != nil { + logger.Error("Error creating profile file:", err) + return + } + + // Write the profile data to the file + if err := profile.WriteTo(profileFile, 0); err != nil { + logger.Error("Error writing profile data:", err) + } + + profileFile.Close() + logger.Info(fmt.Sprintf("Profile file created: %s", profileFileName)) + time.Sleep(time.Second * time.Duration(delayInSecondsProfile)) + } +} diff --git a/monitoring/stats.go b/monitoring/stats.go new file mode 100644 index 0000000000..3b2b906ee5 --- /dev/null +++ b/monitoring/stats.go @@ -0,0 +1,67 @@ +package monitoring + +import ( + "fmt" + "os" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/shirou/gopsutil/mem" +) + +type Stats struct { + Logger hclog.Logger + IsMemStressTestEnable bool + DelayInSecondsStats uint64 + Threshold float64 +} + +func (stats *Stats) TrackMemoryUsage() { + if stats.IsMemStressTestEnable { + go func() { + heapMemoryStressTest(stats.Logger) + }() + } + + for { + // Get the virtual memory stats + vm, err := mem.VirtualMemory() + if err != nil { + stats.Logger.Error("Error getting VirtualMemory", err) + time.Sleep(time.Second * time.Duration(stats.DelayInSecondsStats)) // Wait for the specified interval + continue + } + + // Calculate the memory usage percentage + memUsage := float64(vm.Used) / float64(vm.Total) + stats.Logger.Info(fmt.Sprintf("Memory usage: %.2f%% (%v bytes), Total Memory: %v bytes, Threshold: %.2f%%", memUsage*100, vm.Used, vm.Total, stats.Threshold*100)) + + // Check if memory usage exceeds the threshold + if memUsage > stats.Threshold { + stats.Logger.Warn("Memory usage exceeds threshold. Performing graceful shutdown...") + gracefulShutdown(stats.Logger) + return + } + + time.Sleep(time.Second * time.Duration(stats.DelayInSecondsStats)) // Wait for the specified interval + } +} + +func gracefulShutdown(logger hclog.Logger) { + logger.Warn("Graceful shutdown completed") + os.Exit(0) +} + +func heapMemoryStressTest(logger hclog.Logger) { + var memorySlice [][]byte + + // Loop 1000 times to force memory allocations + for i := 0; i < 1000; i++ { + // Allocate a large slice of bytes (100 MB) + memory := make([]byte, 1024*1024*1000) // 100 MB + memorySlice = append(memorySlice, memory) + + logger.Info(fmt.Sprintf("Iteration %d - Allocated %d MB", i+1, len(memorySlice)*100)) + time.Sleep(time.Second * 5) + } +} diff --git a/server/config.go b/server/config.go index f7925af8c4..11a5ef2083 100644 --- a/server/config.go +++ b/server/config.go @@ -38,6 +38,8 @@ type Config struct { DataFeed *DataFeed + Monitoring *Monitoring + Seal bool SecretsManager *secrets.SecretsManagerConfig @@ -71,3 +73,11 @@ type DataFeed struct { OutcomeReporterAddress string SXNodeAddress string } + +type Monitoring struct { + IsEnable bool + DelayInSecondsProfile uint64 + IsMemStressTestEnable bool + DelayInSecondsStats uint64 + Threshold float64 +} diff --git a/server/server.go b/server/server.go index c46cd0d2ba..7c792e1298 100644 --- a/server/server.go +++ b/server/server.go @@ -21,6 +21,7 @@ import ( configHelper "github.com/0xPolygon/polygon-edge/helper/config" "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/jsonrpc" + "github.com/0xPolygon/polygon-edge/monitoring" "github.com/0xPolygon/polygon-edge/network" "github.com/0xPolygon/polygon-edge/secrets" "github.com/0xPolygon/polygon-edge/server/proto" @@ -34,6 +35,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" + + _ "net/http/pprof" + "runtime/pprof" ) // Server is the central manager of the blockchain client @@ -279,6 +283,11 @@ func NewServer(config *Config) (*Server, error) { return nil, err } + // setup and start monitoring consumer + if err := m.setupMonitoring(); err != nil { + return nil, err + } + // setup and start grpc server if err := m.setupGRPC(); err != nil { return nil, err @@ -718,9 +727,9 @@ func (s *Server) setupDataFeedService() error { QueueName: s.config.DataFeed.DataFeedAMQPQueueName, }, }, - VerifyOutcomeURI: s.config.DataFeed.VerifyOutcomeURI, - OutcomeReporterAddress: s.config.DataFeed.OutcomeReporterAddress, - SXNodeAddress: s.config.DataFeed.SXNodeAddress, + VerifyOutcomeURI: s.config.DataFeed.VerifyOutcomeURI, + OutcomeReporterAddress: s.config.DataFeed.OutcomeReporterAddress, + SXNodeAddress: s.config.DataFeed.SXNodeAddress, } datafeedService, err := datafeed.NewDataFeedService( @@ -738,6 +747,28 @@ func (s *Server) setupDataFeedService() error { return nil } +func (s *Server) setupMonitoring() error { + profile := &monitoring.Profile{ + Logger: s.logger.Named("monitoring.pprof"), + IsEnable: s.config.Monitoring.IsEnable, + DelayInSecondsProfile: s.config.Monitoring.DelayInSecondsProfile, + Goroutine: pprof.Lookup("goroutine"), + Heap: pprof.Lookup("heap"), + } + + stats := &monitoring.Stats{ + Logger: s.logger.Named("monitoring.stats"), + IsMemStressTestEnable: s.config.Monitoring.IsMemStressTestEnable, + DelayInSecondsStats: s.config.Monitoring.DelayInSecondsStats, + Threshold: s.config.Monitoring.Threshold, + } + + go profile.SetupPprofProfiles() + go stats.TrackMemoryUsage() + + return nil +} + // setupGRPC sets up the grpc server and listens on tcp func (s *Server) setupGRPC() error { proto.RegisterSystemServer(s.grpcServer, &systemService{server: s})