Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
363041b
internal complete code migration
pavan-adari-meesho Jan 30, 2026
614d483
internal components interface methods
pavan-adari-meesho Feb 2, 2026
c3762fd
revert pre-commit config
pavan-adari-meesho Feb 2, 2026
bb796ed
removed redundant structs
pavan-adari-meesho Feb 2, 2026
f380623
error formatting fixes
pavan-adari-meesho Feb 3, 2026
c7ee967
predator sync
pavan-adari-meesho Feb 3, 2026
1aa8d32
predator init fixes
pavan-adari-meesho Feb 3, 2026
19c0d77
code rabbit issues
pavan-adari-meesho Feb 3, 2026
0423a9e
bulk delete changes
pavan-adari-meesho Feb 3, 2026
9754fcb
schema client separation and fixes
pavan-adari-meesho Feb 3, 2026
b845141
removing redundant functions and error formatting
pavan-adari-meesho Feb 3, 2026
2199ed2
minor bug fix
pavan-adari-meesho Feb 3, 2026
8a8157a
further refractoring for coderabbit changes
pavan-adari-meesho Feb 3, 2026
340983e
model name extraction fix
pavan-adari-meesho Feb 4, 2026
8ecb0ea
return error on no model files found
pavan-adari-meesho Feb 4, 2026
649e89d
capitilization
pavan-adari-meesho Feb 4, 2026
4ec6b12
schema client refractor
pavan-adari-meesho Feb 5, 2026
1303975
int to preprod name convention
pavan-adari-meesho Feb 5, 2026
7135c92
etcd name fixes and refractors
pavan-adari-meesho Feb 9, 2026
a07f6f2
refractors and dev toggle script fix
pavan-adari-meesho Feb 9, 2026
16aeb6c
predator handler refractor and gcs client minor fixes
pavan-adari-meesho Feb 10, 2026
2968ade
inferflow refractor into multiple files
pavan-adari-meesho Feb 10, 2026
a254d78
Edit instance count created
paras-agarwal-meesho Feb 10, 2026
bac72ce
Instance count updation while preserving original formatting
paras-agarwal-meesho Feb 11, 2026
17f3abf
replaceInstanceCountInConfigPreservingFormat refined and test cases c…
paras-agarwal-meesho Feb 11, 2026
b83d40a
Real config.pbtxt test cases added
paras-agarwal-meesho Feb 12, 2026
efcb9b6
Merge branch 'develop' into feature/edit-instance-count
paras-agarwal-meesho Feb 13, 2026
9b9f087
Outdated code removed
paras-agarwal-meesho Feb 13, 2026
57f1b94
Regex usage replaced with proto for horizon
paras-agarwal-meesho Feb 20, 2026
d16a7f2
Refactoring
paras-agarwal-meesho Feb 20, 2026
dec6fbc
Merge branch 'develop' into feature/edit-instance-count
paras-agarwal-meesho Feb 20, 2026
f5993a1
Improvements
paras-agarwal-meesho Feb 20, 2026
0aeabbf
Merge branch 'develop' into feature/edit-instance-count
pavan-adari-meesho Mar 2, 2026
d97b8f0
Add Edit Instance Count flow for promote requests
paras-agarwal-meesho Mar 3, 2026
721efd6
Add Intance Count Text Field in Edit Model Modal
paras-agarwal-meesho Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 33 additions & 42 deletions horizon/internal/externalcall/gcs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
"github.com/rs/zerolog/log"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/prototext"
)

type GCSClientInterface interface {
Expand Down Expand Up @@ -269,7 +270,10 @@ func (g *GCSClient) transferSingleConfigFile(objAttrs storage.ObjectAttrs, srcBu

// Replace model name
log.Info().Msgf("Processing config.pbtxt file: %s -> %s", objAttrs.Name, destObjectPath)
modified := replaceModelNameInConfig(content, destModelName)
modified, err := ReplaceModelNameInConfig(content, destModelName)
if err != nil {
return fmt.Errorf("failed to replace model name: %w", err)
}

// Upload modified content
destWriter := g.client.Bucket(destBucket).Object(destObjectPath).NewWriter(g.ctx)
Expand Down Expand Up @@ -465,49 +469,36 @@ func (g *GCSClient) TransferAndDeleteFolder(srcBucket, srcPath, srcModelName, de

// replaceModelNameInConfig modifies only the top-level `name:` field in config.pbtxt content
// It replaces only the first occurrence to avoid modifying nested names in inputs/outputs/instance_groups
func replaceModelNameInConfig(data []byte, destModelName string) []byte {
content := string(data)
lines := strings.Split(content, "\n")

for i, line := range lines {
trimmed := strings.TrimSpace(line)
// Match top-level "name:" field - should be at the start of line (or minimal indentation)
// Skip nested names which are typically indented with 2+ spaces
if strings.HasPrefix(trimmed, "name:") {
// Check indentation: top-level fields have minimal/no indentation
leadingWhitespace := len(line) - len(strings.TrimLeft(line, " \t"))
// Skip if heavily indented (nested field)
if leadingWhitespace >= 2 {
continue
}
func ReplaceModelNameInConfig(data []byte, destModelName string) ([]byte, error) {
if destModelName == "" {
return nil, fmt.Errorf("destination model name cannot be empty")
}

// Match the first occurrence of name: "value" pattern
namePattern := regexp.MustCompile(`name\s*:\s*"([^"]+)"`)
matches := namePattern.FindStringSubmatch(line)
if len(matches) > 1 {
oldModelName := matches[1]
// Replace only the FIRST occurrence to avoid replacing nested names
loc := namePattern.FindStringIndex(line)
if loc != nil {
// Replace only the matched portion (first occurrence)
before := line[:loc[0]]
matched := line[loc[0]:loc[1]]
after := line[loc[1]:]
// Replace the value inside quotes while preserving the "name:" format
valuePattern := regexp.MustCompile(`"([^"]+)"`)
valueReplaced := valuePattern.ReplaceAllString(matched, fmt.Sprintf(`"%s"`, destModelName))
lines[i] = before + valueReplaced + after
} else {
// Fallback: replace all (shouldn't happen with valid input)
lines[i] = namePattern.ReplaceAllString(line, fmt.Sprintf(`name: "%s"`, destModelName))
}
log.Info().Msgf("Replacing top-level model name in config.pbtxt: '%s' -> '%s'", oldModelName, destModelName)
break
}
}
var modelConfig protogen.ModelConfig

if err := prototext.Unmarshal(data, &modelConfig); err != nil {
return nil, fmt.Errorf("failed to parse config.pbtxt: %w", err)
}

return []byte(strings.Join(lines, "\n"))
oldModelName := modelConfig.Name
modelConfig.Name = destModelName

opts := prototext.MarshalOptions{
Multiline: true,
Indent: " ",
}

out, err := opts.Marshal(&modelConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal updated config.pbtxt: %w", err)
}

log.Info().
Str("old_model_name", oldModelName).
Str("new_model_name", destModelName).
Msg("replaced top-level model name in config.pbtxt")

return out, nil
}

func (g *GCSClient) ListFolders(bucket, prefix string) ([]string, error) {
Expand Down
64 changes: 37 additions & 27 deletions horizon/internal/externalcall/gcs_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,60 @@ import (

func TestReplaceModelNameInConfig(t *testing.T) {
tests := []struct {
name string
data []byte
destModelName string
expectContains string
name string
data []byte
destModelName string
wantTopLevel string
wantNested string
expectError bool
}{
{
name: "replaces top-level name only",
data: []byte(`name: "old_model"
data: []byte(`
name: "old_model"
instance_group {
name: "old_model"
name: "nested_model"
}
`),
destModelName: "new_model",
expectContains: `name: "new_model"`,
destModelName: "new_model",
wantTopLevel: `"new_model"`,
wantNested: "nested_model",
},
{
name: "preserves nested name with indentation",
data: []byte(`name: "top_level"
instance_group {
name: "nested_name"
}
`),
destModelName: "replaced",
expectContains: `name: "replaced"`,
name: "single line config",
data: []byte(`name: "single_model"` + "\n"),
destModelName: "replaced_model",
wantTopLevel: `"replaced_model"`,
},
{
name: "single line config",
data: []byte(`name: "single_model"` + "\n"),
destModelName: "replaced_model",
expectContains: `name: "replaced_model"`,
name: "empty dest model name returns error",
data: []byte(`name: "some_model"`),
destModelName: "",
expectError: true,
},
{
name: "no name field returns unchanged",
data: []byte(`platform: "tensorflow"
version: 1
`),
name: "malformed pbtxt returns error",
data: []byte(`name: "unclosed`),
destModelName: "any",
expectContains: `platform: "tensorflow"`,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := replaceModelNameInConfig(tt.data, tt.destModelName)
assert.Contains(t, string(got), tt.expectContains)
got, err := ReplaceModelNameInConfig(tt.data, tt.destModelName)

if tt.expectError {
require.Error(t, err)
return
}

require.NoError(t, err)
assert.Contains(t, string(got), tt.wantTopLevel)

if tt.wantNested != "" {
assert.Contains(t, string(got), tt.wantNested)
}
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion horizon/internal/predator/handler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package handler
import (
"encoding/json"
"time"

"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

type Payload struct {
Expand Down Expand Up @@ -169,7 +171,7 @@ type ModelParamsResponse struct {
Backend string `json:"backend"`
DynamicBatchingEnabled bool `json:"dynamic_batching_enabled"`
Platform string `json:"platform"`
EnsembleScheduling *ModelEnsembling `json:"ensemble_scheduling,omitempty"`
EnsembleScheduling *protogen.ModelEnsembling `json:"ensemble_scheduling,omitempty"`
}

type RequestGenerationRequest struct {
Expand Down
11 changes: 6 additions & 5 deletions horizon/internal/predator/handler/predator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/Meesho/BharatMLStack/horizon/pkg/random"
"github.com/Meesho/BharatMLStack/horizon/pkg/serializer"
"github.com/rs/zerolog/log"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

type Predator struct {
Expand Down Expand Up @@ -353,7 +354,7 @@ func (p *Predator) FetchModelConfig(req FetchModelConfigRequest) (ModelParamsRes
}
}

var modelConfig ModelConfig
var modelConfig protogen.ModelConfig
if err := prototext.Unmarshal(configData, &modelConfig); err != nil {
return ModelParamsResponse{}, http.StatusInternalServerError, fmt.Errorf(errUnmarshalProtoFormat, err)
}
Expand Down Expand Up @@ -391,7 +392,7 @@ func parseModelPath(modelPath string) (bucket, objectPath string) {
return parts[0], parts[1]
}

func validateModelConfig(cfg *ModelConfig) error {
func validateModelConfig(cfg *protogen.ModelConfig) error {
switch {
case cfg.Name == constant.EmptyString:
return errors.New(errModelNameMissing)
Expand Down Expand Up @@ -423,7 +424,7 @@ func convertFields(name string, dims []int64, dataType string) (IO, bool) {
}, true
}

func convertInputWithFeatures(fields []*ModelInput, featureMap map[string][]string) []IO {
func convertInputWithFeatures(fields []*protogen.ModelInput, featureMap map[string][]string) []IO {
ios := make([]IO, 0, len(fields))
for _, f := range fields {
if io, ok := convertFields(f.Name, f.Dims, f.DataType.String()); ok {
Expand All @@ -437,7 +438,7 @@ func convertInputWithFeatures(fields []*ModelInput, featureMap map[string][]stri
return ios
}

func convertOutput(fields []*ModelOutput) []IO {
func convertOutput(fields []*protogen.ModelOutput) []IO {
ios := make([]IO, 0, len(fields))
for _, f := range fields {
if io, ok := convertFields(f.Name, f.Dims, f.DataType.String()); ok {
Expand All @@ -447,7 +448,7 @@ func convertOutput(fields []*ModelOutput) []IO {
return ios
}

func createModelParamsResponse(modelConfig *ModelConfig, objectPath string, inputs, outputs []IO) ModelParamsResponse {
func createModelParamsResponse(modelConfig *protogen.ModelConfig, objectPath string, inputs, outputs []IO) ModelParamsResponse {
var resp ModelParamsResponse

if len(modelConfig.InstanceGroup) > 0 {
Expand Down
73 changes: 73 additions & 0 deletions horizon/internal/predator/handler/predator_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"time"

Expand All @@ -13,7 +14,9 @@ import (
"github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/predatorconfig"
"github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/predatorrequest"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/prototext"
"gorm.io/gorm"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

func (p *Predator) processRequest(requestIdPayloadMap map[uint]*Payload, predatorRequestList []predatorrequest.PredatorRequest, req ApproveRequest) {
Expand Down Expand Up @@ -179,6 +182,10 @@ func (p *Predator) processEditGCSCopyStage(requestIdPayloadMap map[uint]*Payload
// Extract model folder name from source path and copy to target with the same model name
pathSegments := strings.Split(strings.TrimSuffix(sourcePath, "/"), "/")
sourceModelName := pathSegments[len(pathSegments)-1]
if sourceModelName == "" {
log.Error().Msgf("Source model name is empty for request ID %d (path: %s)", requestModel.RequestID, normalizedModelSource)
return transferredGcsModelData, fmt.Errorf("source model name is empty for request ID %d", requestModel.RequestID)
}
sourceBasePath := strings.TrimSuffix(sourcePath, "/"+sourceModelName)

if isNotProd {
Expand All @@ -191,6 +198,12 @@ func (p *Predator) processEditGCSCopyStage(requestIdPayloadMap map[uint]*Payload
} else {
configBucket := pred.GcsConfigBucket
configPath := pred.GcsConfigBasePath
if configBucket != "" && configPath != "" && payload.MetaData.InstanceCount > 0 {
if err := p.updateInstanceCountInConfigSource(configBucket, configPath, sourceModelName, payload.MetaData.InstanceCount); err != nil {
log.Error().Err(err).Msgf("Failed to update instance count in config-source for model %s", sourceModelName)
return transferredGcsModelData, err
}
}
if err := p.GcsClient.TransferFolderWithSplitSources(
sourceBucket, sourceBasePath, configBucket, configPath,
sourceModelName, targetBucket, targetPath, modelName,
Expand All @@ -212,6 +225,53 @@ func (p *Predator) processEditGCSCopyStage(requestIdPayloadMap map[uint]*Payload
return transferredGcsModelData, nil
}

func (p *Predator) updateInstanceCountInConfigSource(bucket, basePath, modelName string, instanceCount int) error {
if modelName == "" {
return fmt.Errorf("model name is empty, required to update instance count in config-source")
}

configPath := path.Join(basePath, modelName, configFile)
configData, err := p.GcsClient.ReadFile(bucket, configPath)
if err != nil {
return fmt.Errorf("failed to read config.pbtxt from config-source for model %s: %w", modelName, err)
}

var modelConfig protogen.ModelConfig
if err := prototext.Unmarshal(configData, &modelConfig); err != nil {
return fmt.Errorf("failed to parse config.pbtxt from config-source for model %s: %w", modelName, err)
}
if len(modelConfig.InstanceGroup) == 0 {
return fmt.Errorf("%s (model %s)", errNoInstanceGroup, modelName)
}

currentCount := modelConfig.InstanceGroup[0].Count
if currentCount == int32(instanceCount) {
log.Info().
Str("model", modelName).
Int("instance_count", instanceCount).
Msg("instance_count unchanged, skipping config update")
return nil
}

modelConfig.InstanceGroup[0].Count = int32(instanceCount)

opts := prototext.MarshalOptions{
Multiline: true,
Indent: " ",
}

newConfigData, err := opts.Marshal(&modelConfig)
if err != nil {
return fmt.Errorf("failed to marshal config.pbtxt for model %s: %w", modelName, err)
}
if err := p.GcsClient.UploadFile(bucket, configPath, newConfigData); err != nil {
return fmt.Errorf("failed to upload config.pbtxt to config-source for model %s: %w", modelName, err)
}

log.Info().Msgf("Updated instance_count to %d in config-source for model %s", instanceCount, modelName)
return nil
}

// processEditDBUpdateStage updates predator config for edit approval
// This updates the existing predator config with new config.pbtxt and metadata.json changes
func (p *Predator) processEditDBUpdateStage(requestIdPayloadMap map[uint]*Payload, predatorRequestList []predatorrequest.PredatorRequest, approvedBy string) error {
Expand Down Expand Up @@ -498,6 +558,19 @@ func (p *Predator) processGCSCloneStage(requestIdPayloadMap map[uint]*Payload, p
return transferredGcsModelData, err
}
} else {
payload := requestIdPayloadMap[requestModel.RequestID]
if payload == nil {
log.Error().Msgf("Payload not found for request ID %d", requestModel.RequestID)
return transferredGcsModelData, fmt.Errorf("payload not found for request ID %d", requestModel.RequestID)
}
if requestModel.RequestType == PromoteRequestType &&
pred.GcsConfigBucket != "" && pred.GcsConfigBasePath != "" &&
payload.MetaData.InstanceCount > 0 {
if err := p.updateInstanceCountInConfigSource(pred.GcsConfigBucket, pred.GcsConfigBasePath, srcModelName, payload.MetaData.InstanceCount); err != nil {
log.Error().Err(err).Msgf("Failed to update instance count in config-source for model %s", srcModelName)
return transferredGcsModelData, err
}
}
if err := p.GcsClient.TransferFolderWithSplitSources(
srcBucket, srcPath, pred.GcsConfigBucket, pred.GcsConfigBasePath,
srcModelName, destBucket, destPath, destModelName,
Expand Down
Loading