Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 127 additions & 33 deletions cmd/bgpfinder-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (c ResponseCollector) MarshalJSON() ([]byte, error) {
custom := map[string]interface{}{
"project": c.Project,
"dataTypes": dataTypes,
"aliases": c.Aliases,
}
return json.Marshal(custom)
}
Expand All @@ -247,6 +248,9 @@ type ResponseCollector struct {
// Name of the collector
Name string `json:"name"`

// List of aliases for the collector
Aliases []string `json:"aliases"`

OldestRibsDump string
OldestUpdatesDump string
LatestRibsDump string
Expand All @@ -271,6 +275,18 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc {
return
}

aliases, err := bgpfinder.GetCollectorNameAliases("")
if err != nil {
http.Error(w, fmt.Sprintf("Error checking collector aliases: %v", err), http.StatusInternalServerError)
return
}
invertedAliases := make(map[string][]string)
for alias, realName := range aliases {
if realName != "" {
invertedAliases[realName] = append(invertedAliases[realName], alias)
}
}

projectsMap := make(map[string]map[string]map[string]ResponseCollector)
// Find matching projects
for _, project := range projects {
Expand All @@ -286,6 +302,7 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc {
c := ResponseCollector{
Project: collector.Project,
Name: collector.Name,
Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...),
OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump,
OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump,
LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump,
Expand Down Expand Up @@ -327,13 +344,26 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc
return
}

aliases, err := bgpfinder.GetCollectorNameAliases("")
if err != nil {
http.Error(w, fmt.Sprintf("Error checking collector aliases status: %v", err), http.StatusInternalServerError)
return
}
invertedAliases := make(map[string][]string)
for alias, realName := range aliases {
if realName != "" {
invertedAliases[realName] = append(invertedAliases[realName], alias)
}
}

collectorsMap := make(map[string]ResponseCollector)

for _, collector := range collectors {
if _, ok := oldestLatestDumps[collector.Name]; ok {
collectorsMap[collector.Name] = ResponseCollector{
Project: collector.Project,
Name: collector.Name,
Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...),
OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump,
OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump,
LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump,
Expand All @@ -356,10 +386,15 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc
jsonResponse(w, collectorsResponse)
} else {
// Return specific collector if exists
for _, collector := range collectors {
if collector.Name == collectorName {
jsonResponse(w, collector)
return
if collector, exists := collectorsMap[collectorName]; exists {
jsonResponse(w, collector)
return
} else if alias, avail := aliases[collectorName]; avail {
if alias != "" {
if col, repl := collectorsMap[alias]; repl {
jsonResponse(w, col)
return
}
}
}
http.Error(w, "Collector not found", http.StatusNotFound)
Expand All @@ -374,33 +409,49 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) {
intervalsParams := queryParams["intervals[]"]
collectorsParams := queryParams["collectors[]"]
typesParams := queryParams["types[]"]
if len(typesParams) == 0 {
typesParams=queryParams["type"]
}

projectParams := queryParams["projects[]"]
if len(projectParams) == 0 {
if p := queryParams.Get("project"); p != "" {
projectParams = []string{p}
}
}
query.Projects = projectParams

collectorParam := queryParams.Get("collector")
minInitialTime := queryParams.Get("minInitialTime")
dataAddedSince := queryParams.Get("dataAddedSince")

// Parse interval
// Parse intervals
if len(intervalsParams) == 0 {
return query, fmt.Errorf("at least one interval is required")
}

times := strings.Split(intervalsParams[0], ",")
if len(times) != 2 {
return query, fmt.Errorf("invalid interval format. Expected format: start,end")
}
for _, intervalStr := range intervalsParams {
times := strings.Split(intervalStr, ",")
if len(times) != 2 {
return query, fmt.Errorf("invalid interval format. Expected format: start,end")
}

startInt, err := strconv.ParseInt(times[0], 10, 64)
if err != nil {
return query, fmt.Errorf("invalid start time: %v", err)
}
startInt, err := strconv.ParseInt(times[0], 10, 64)
if err != nil {
return query, fmt.Errorf("invalid start time: %v", err)
}

endInt, err := strconv.ParseInt(times[1], 10, 64)
if err != nil {
return query, fmt.Errorf("invalid end time: %v", err)
}
endInt, err := strconv.ParseInt(times[1], 10, 64)
if err != nil {
return query, fmt.Errorf("invalid end time: %v", err)
}

query.Intervals = append(query.Intervals, bgpfinder.Interval{
From: time.Unix(startInt, 0),
Until: time.Unix(endInt, 0),
})

query.From = time.Unix(startInt, 0)
query.Until = time.Unix(endInt, 0)
}

if minInitialTime != "" {
minInitialTimeInt, err := strconv.ParseInt(minInitialTime, 10, 64)
Expand Down Expand Up @@ -458,29 +509,57 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) {
}
}
} else if collectorParam != "" {
aliases, err := bgpfinder.GetCollectorNameAliases("")
if err != nil {
return query, fmt.Errorf("error fetching defunct collector aliases: %v", err)
}

allCollectors, err := bgpfinder.Collectors("")
found := false
if err != nil {
return query, fmt.Errorf("error fetching collectors: %v", err)
}

collectorMap := make(map[string]bgpfinder.Collector)
for _, c := range allCollectors {
if collectorParam == c.Name {
collectors = append(collectors, c)
found = true
break
}
collectorMap[c.Name] = c
}
if !found {

if collector, exists := collectorMap[collectorParam]; exists {
collectors = append(collectors, collector)
} else if alias, avail := aliases[collectorParam]; avail {
if alias != "" {
if col, repl := collectorMap[alias]; repl {
collectors = append(collectors, col)
} else {
return query, fmt.Errorf("unknown collector alias: %s -> %s", collectorParam, alias)
}
}
} else {
return query, fmt.Errorf("collector not found: %s", collectorParam)
}
} else {
// Use all collectors
var err error
collectors, err = bgpfinder.Collectors("")
if err != nil {
return query, fmt.Errorf("error fetching collectors: %v", err)
}
}

// exclude collectors that are not part of the included projects
if len(query.Projects) > 0 {
projectMap := make(map[string]bool)
for _, p := range query.Projects {
projectMap[p] = true
}
var filtered []bgpfinder.Collector
for _, c := range collectors {
if projectMap[c.Project] {
filtered = append(filtered, c)
}
}
collectors = filtered
}
query.Collectors = collectors

// Parse types
Expand Down Expand Up @@ -509,10 +588,17 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc {

// Log the parsed query details in UTC
evt := logger.Info().
Time("from", query.From.UTC()).
Time("until", query.Until.UTC()).
Str("dump_type", query.DumpType.String()).
Int("collector_count", len(query.Collectors))
Int("collector_count", len(query.Collectors)).
Int("interval_count", len(query.Intervals))

if len(query.Intervals) > 0 {
first := query.Intervals[0]
last := query.Intervals[len(query.Intervals)-1]
evt.Time("first_from", first.From.UTC()).
Time("last_until", last.Until.UTC())
}


if query.MinInitialTime != nil {
evt.Time("minInitialTime", query.MinInitialTime.UTC())
Expand All @@ -523,11 +609,19 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc {
// can be returned (so as to avoid unnecessary scraping
// attempts when a DB lookup returns no results).
results := []bgpfinder.BGPDump{}
if query.MinInitialTime != nil && query.Until.Unix() > 0 && query.MinInitialTime.UTC().After(query.Until.UTC()) {
populateDataResponse(w, Data{results}, query)
return
if query.MinInitialTime != nil && len(query.Intervals) > 0 {
allBefore := true
for _, interval := range query.Intervals {
if interval.Until.Unix() == 0 || !query.MinInitialTime.UTC().After(interval.Until.UTC()) {
allBefore = false
break
}
}
if allBefore {
populateDataResponse(w, Data{results}, query)
return
}
}

// Log collector details
for _, c := range query.Collectors {
logger.Info().
Expand Down
Loading