From 01da8491017157355dadf47176ebc5588564270a Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Fri, 27 Feb 2026 12:31:46 +1300 Subject: [PATCH 1/8] Limit query responses to 500 resource URLs (by default) The limit can be changed through an environment variable, if required. Limit is applied post-filtering (unlike the bgpstream broker), but otherwise replicates the observed behavior of bgpstream (i.e. avoiding the inclusion of partial resource lists for the last timestamp). Also added a performance improvement to the main SQL query in the finder so that the index on the timestamp is better utilized. --- db.go | 122 +++++++++++++++++++++++++++++++----------------- finder.go | 70 +++++++++++++++++++++++++++ multi_finder.go | 2 +- 3 files changed, 149 insertions(+), 45 deletions(-) diff --git a/db.go b/db.go index 06f4db6..8c66a79 100644 --- a/db.go +++ b/db.go @@ -130,17 +130,19 @@ func UpsertBGPDumps(ctx context.Context, logger *logging.Logger, db *pgxpool.Poo return nil } -// FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types. -func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) { - +// fetchDataFromDBInternal retrieves BGP dump data for a specific time range. +func fetchDataFromDBInternal(ctx context.Context, db *pgxpool.Pool, query Query, until time.Time) ([]BGPDump, error) { var args []interface{} paramCounter := 1 + // Use a 24-hour buffer on the start of the timestamp scan to allow Postgres + // to use the B-tree index on the timestamp column. sqlQuery := ` SELECT url, dump_type, duration, collector_name, EXTRACT(EPOCH FROM timestamp)::bigint FROM bgp_dumps WHERE collector_name = ANY($1) + AND timestamp >= to_timestamp($2) - interval '24 hours' AND timestamp + duration >= to_timestamp($2) - ` + ` // Extract collector names from the query collectorNames := make([]string, len(query.Collectors)) @@ -151,10 +153,9 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD paramCounter = 3 // Check if Until is 0, if not, add an end range - if query.Until.Unix() != 0 { - sqlQuery += `` + if until.Unix() != 0 { sqlQuery += fmt.Sprintf(" AND timestamp <= to_timestamp($%d)", paramCounter) - args = append(args, query.Until.Unix()) + args = append(args, until.Unix()) paramCounter++ } @@ -166,9 +167,9 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD } if query.MinInitialTime != nil { - if query.Until.Unix() == 0 || query.MinInitialTime.Unix() <= query.Until.Unix() { + if until.Unix() == 0 || query.MinInitialTime.Unix() <= until.Unix() { sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d)", paramCounter) - paramCounter ++ + paramCounter++ args = append(args, query.MinInitialTime.Unix()) } } @@ -182,42 +183,43 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD // Alternative implementation to above?: /* - if query.MinInitialTime != nil { - if query.DataAddedSince == nil { - sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)", paramCounter, paramCounter+1) - paramCounter += 2 - args = append(args, query.MinInitialTime.Unix(), query.MinInitialTime.Add(time.Duration(2)*time.Hour) - } else { - sqlQuery += fmt.Sprintf( - ` AND ( - (timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)) - OR - (timestamp < to_timestamp($%d) AND timestamp > to_timestamp($%d) AND cdate > to_timestamp($%d) AND cdate < to_timestamp($%d)) - ) - `, - paramCounter, - paramCounter+1, - paramCounter+2, - paramCounter+3, - paramCounter+4, - paramCounter+5) - - paramCounter += 6 - - args = append(args, - query.MinInitialTime.Unix(), - query.MinInitialTime.Add(time.Duration(2)*time.Hour).Unix(), - query.MinInitialTime.Unix(), - query.MinInitialTime.Add(-time.Duration(86400)*time.Second).Unix(), - query.DataAddedSince.Unix(), - query.ResponseTime.Unix(), - ) - } - } - */ + if query.MinInitialTime != nil { + if query.DataAddedSince == nil { + sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)", paramCounter, paramCounter+1) + paramCounter += 2 + args = append(args, query.MinInitialTime.Unix(), query.MinInitialTime.Add(time.Duration(2)*time.Hour) + } else { + sqlQuery += fmt.Sprintf( + ` AND ( + (timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)) + OR + (timestamp < to_timestamp($%d) AND timestamp > to_timestamp($%d) AND cdate > to_timestamp($%d) AND cdate < to_timestamp($%d)) + ) + `, + paramCounter, + paramCounter+1, + paramCounter+2, + paramCounter+3, + paramCounter+4, + paramCounter+5) + + paramCounter += 6 + + args = append(args, + query.MinInitialTime.Unix(), + query.MinInitialTime.Add(time.Duration(2)*time.Hour).Unix(), + query.MinInitialTime.Unix(), + query.MinInitialTime.Add(-time.Duration(86400)*time.Second).Unix(), + query.DataAddedSince.Unix(), + query.ResponseTime.Unix(), + ) + } + } + */ - sqlQuery += " ORDER BY timestamp ASC, dump_type ASC" // This ORDER BY may be bad for performance? But putting it there to match bgpstream ordering (which I think this is) + sqlQuery += " ORDER BY timestamp ASC, dump_type ASC" + sqlQuery += fmt.Sprintf(" LIMIT %d", HardLimit) rows, err := db.Query(ctx, sqlQuery, args...) if err != nil { @@ -249,10 +251,42 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD Project: determineProjectName(collectorName), }) } - return results, nil } +// FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types. +func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) { + internalUntil := query.Until + isWindowed := false + + // If the requested range is excessively large (or open-ended), try a smaller initial window + // to reduce database scan load if the result cap is quickly met. + if query.Until.IsZero() || query.Until.Sub(query.From) > MaxScanWindow { + internalUntil = query.From.Add(MaxScanWindow) + isWindowed = true + } + + results, err := fetchDataFromDBInternal(ctx, db, query, internalUntil) + if err != nil { + return nil, err + } + + // If the windowed query didn't fill the target limit (and it wasn't the full requested range), + // retry once with the full range. + if isWindowed && len(results) < TargetLimit && !query.Until.IsZero() { + results, err = fetchDataFromDBInternal(ctx, db, query, query.Until) + if err != nil { + return nil, err + } + } else if isWindowed && len(results) < TargetLimit && query.Until.IsZero() { + // For open-ended queries, we could potentially keep windowing or just return what we found. + // For now, let's just do one more larger window or just return. + // Usually a 1-month window is plenty for most BGP data requests. + } + + return ApplyResultCap(results), nil +} + func determineProjectName(collector string) string { if strings.HasPrefix(collector, "rrc") { return "ris" diff --git a/finder.go b/finder.go index 8bb5de8..ce2480a 100644 --- a/finder.go +++ b/finder.go @@ -3,11 +3,47 @@ package bgpfinder import ( "encoding/json" "fmt" + "os" + "sort" "strconv" "strings" "time" ) +var ( + // TargetLimit is the preferred maximum number of results to return. + // Can be overridden by BGPFINDER_TARGET_LIMIT env var. + TargetLimit = 500 + + // HardLimit is the absolute maximum number of results to return, + // even when extending to include all results with the same timestamp. + // Can be overridden by BGPFINDER_HARD_LIMIT env var. + HardLimit = 1000 + + // MaxScanWindow is the maximum time range to scan in a single database query. + // Queries exceeding this will be windowed for efficiency. + // Can be overridden by BGPFINDER_MAX_SCAN_WINDOW (in days) env var. + MaxScanWindow = 30 * 24 * time.Hour +) + +func init() { + if tl := os.Getenv("BGPFINDER_TARGET_LIMIT"); tl != "" { + if val, err := strconv.Atoi(tl); err == nil { + TargetLimit = val + } + } + if hl := os.Getenv("BGPFINDER_HARD_LIMIT"); hl != "" { + if val, err := strconv.Atoi(hl); err == nil { + HardLimit = val + } + } + if mw := os.Getenv("BGPFINDER_MAX_SCAN_WINDOW"); mw != "" { + if val, err := strconv.Atoi(mw); err == nil { + MaxScanWindow = time.Duration(val) * 24 * time.Hour + } + } +} + // Finder Just a sketch of what the base Finder interface might look like. Everything // gets built on top of (or under, I guess) this. type Finder interface { @@ -173,3 +209,37 @@ func dateInRange(date time.Time, query Query) bool { } return unixTime >= startTime } + +// ApplyResultCap applies the pagination capping logic to a list of BGP dumps. +func ApplyResultCap(results []BGPDump) []BGPDump { + if len(results) <= TargetLimit { + return results + } + + // Ensure results are sorted by timestamp and then dump type + sort.Slice(results, func(i, j int) bool { + if results[i].Timestamp == results[j].Timestamp { + return results[i].DumpType < results[j].DumpType + } + return results[i].Timestamp < results[j].Timestamp + }) + + limitTs := results[TargetLimit].Timestamp + + if results[0].Timestamp == limitTs { + // The first timestamp already crosses/hits the target limit. + // Include all results with this timestamp, up to HardLimit. + i := 0 + for i < len(results) && results[i].Timestamp == limitTs && i < HardLimit { + i++ + } + return results[:i] + } + + // Otherwise, exclude all results that have the timestamp that crosses the limit. + j := TargetLimit + for j > 0 && results[j-1].Timestamp == limitTs { + j-- + } + return results[:j] +} diff --git a/multi_finder.go b/multi_finder.go index 113632f..3f2eb53 100644 --- a/multi_finder.go +++ b/multi_finder.go @@ -173,7 +173,7 @@ func (m *MultiFinder) Find(query Query) ([]BGPDump, error) { dumps = append(dumps, dump...) } - return dumps, nil + return ApplyResultCap(dumps), nil } func (m *MultiFinder) getFinderByProject(projName string) (Finder, bool) { From ca4a8b5010e5b7661e0f720e970c464c100db216 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 2 Mar 2026 19:46:54 -0500 Subject: [PATCH 2/8] Add missing support for "type" parameter to data/ API service --- cmd/bgpfinder-server/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 6f3cd90..7116929 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -374,6 +374,9 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { intervalsParams := queryParams["intervals[]"] collectorsParams := queryParams["collectors[]"] typesParams := queryParams["types[]"] + if len(typesParams) == 0 { + typesParams=queryParams["type"] + } collectorParam := queryParams.Get("collector") minInitialTime := queryParams.Get("minInitialTime") From 4ba09ebdda31cb18e9a707e243d5280ac95b20d5 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 2 Mar 2026 20:02:57 -0500 Subject: [PATCH 3/8] Add support for project(s) query parameter for data/ API --- cmd/bgpfinder-server/main.go | 23 +++++++++++++++++++++++ finder.go | 28 +++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 7116929..7b0bf1a 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -378,6 +378,14 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { typesParams=queryParams["type"] } + projectParams := queryParams["projects[]"] + if len(projectParams) == 0 { + if p := queryParams.Get("project"); p != "" { + projectParams = []string{p} + } + } + query.Projects = projectParams + collectorParam := queryParams.Get("collector") minInitialTime := queryParams.Get("minInitialTime") dataAddedSince := queryParams.Get("dataAddedSince") @@ -484,6 +492,21 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { return query, fmt.Errorf("error fetching collectors: %v", err) } } + + // exclude collectors that are not part of the included projects + if len(query.Projects) > 0 { + projectMap := make(map[string]bool) + for _, p := range query.Projects { + projectMap[p] = true + } + var filtered []bgpfinder.Collector + for _, c := range collectors { + if projectMap[c.Project] { + filtered = append(filtered, c) + } + } + collectors = filtered + } query.Collectors = collectors // Parse types diff --git a/finder.go b/finder.go index ce2480a..335c78b 100644 --- a/finder.go +++ b/finder.go @@ -132,6 +132,9 @@ type Query struct { // Dump type to search for. Any type if unset DumpType DumpType + // Projects to search for. All projects if empty or unset + Projects []string + // Min initial time MinInitialTime *time.Time @@ -147,9 +150,32 @@ type Query struct { func (q Query) MarshalJSON() ([]byte, error) { custom := make(map[string]interface{}) if !(q.From.IsZero() && q.Until.IsZero()) { - custom["intervals"] = strconv.FormatInt(q.From.Unix(), 10) + "," + strconv.FormatInt(q.Until.Unix(), 10) + custom["intervals"] = []string{strconv.FormatInt(q.From.Unix(), 10) + "," + strconv.FormatInt(q.Until.Unix(), 10)} } custom["human"] = false + custom["projects"] = q.Projects + if len(q.Projects) == 1 { + custom["project"] = q.Projects[0] + } else { + custom["project"] = nil + } + collectorNames := make([]string, len(q.Collectors)) + for i, c := range q.Collectors { + collectorNames[i] = c.Name + } + custom["collectors"] = collectorNames + if len(q.Collectors) == 1 { + custom["collector"] = q.Collectors[0].Name + } else { + custom["collector"] = nil + } + + custom["types"] = []string{q.DumpType.String()} + if q.DumpType != DumpTypeAny { + custom["type"] = q.DumpType.String() + } else { + custom["type"] = nil + } return json.Marshal(custom) } From 91f107504639876e9047fb724204e6a185a4239e Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 2 Mar 2026 23:19:16 -0500 Subject: [PATCH 4/8] Add support for multiple intervals Also remove unnecessary duplication in the queryParameters returned by the finder service. Keeping track of whether the client used the array notation or just a single string in their query just for the sake of being consistent with the older broker seems like a waste of effort to me. --- cmd/bgpfinder-server/main.go | 65 ++++++++++++++++--------- db.go | 94 ++++++++++++++++++++++++------------ finder.go | 92 ++++++++++++++++++++++------------- scrape_collectors.go | 6 ++- 4 files changed, 170 insertions(+), 87 deletions(-) diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 7b0bf1a..89c5aa7 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -390,28 +390,33 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { minInitialTime := queryParams.Get("minInitialTime") dataAddedSince := queryParams.Get("dataAddedSince") - // Parse interval + // Parse intervals if len(intervalsParams) == 0 { return query, fmt.Errorf("at least one interval is required") } - times := strings.Split(intervalsParams[0], ",") - if len(times) != 2 { - return query, fmt.Errorf("invalid interval format. Expected format: start,end") - } + for _, intervalStr := range intervalsParams { + times := strings.Split(intervalStr, ",") + if len(times) != 2 { + return query, fmt.Errorf("invalid interval format. Expected format: start,end") + } - startInt, err := strconv.ParseInt(times[0], 10, 64) - if err != nil { - return query, fmt.Errorf("invalid start time: %v", err) - } + startInt, err := strconv.ParseInt(times[0], 10, 64) + if err != nil { + return query, fmt.Errorf("invalid start time: %v", err) + } - endInt, err := strconv.ParseInt(times[1], 10, 64) - if err != nil { - return query, fmt.Errorf("invalid end time: %v", err) - } + endInt, err := strconv.ParseInt(times[1], 10, 64) + if err != nil { + return query, fmt.Errorf("invalid end time: %v", err) + } - query.From = time.Unix(startInt, 0) - query.Until = time.Unix(endInt, 0) + query.Intervals = append(query.Intervals, bgpfinder.Interval{ + From: time.Unix(startInt, 0), + Until: time.Unix(endInt, 0), + }) + + } if minInitialTime != "" { minInitialTimeInt, err := strconv.ParseInt(minInitialTime, 10, 64) @@ -487,6 +492,7 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { } } else { // Use all collectors + var err error collectors, err = bgpfinder.Collectors("") if err != nil { return query, fmt.Errorf("error fetching collectors: %v", err) @@ -535,10 +541,17 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { // Log the parsed query details in UTC evt := logger.Info(). - Time("from", query.From.UTC()). - Time("until", query.Until.UTC()). Str("dump_type", query.DumpType.String()). - Int("collector_count", len(query.Collectors)) + Int("collector_count", len(query.Collectors)). + Int("interval_count", len(query.Intervals)) + + if len(query.Intervals) > 0 { + first := query.Intervals[0] + last := query.Intervals[len(query.Intervals)-1] + evt.Time("first_from", first.From.UTC()). + Time("last_until", last.Until.UTC()) + } + if query.MinInitialTime != nil { evt.Time("minInitialTime", query.MinInitialTime.UTC()) @@ -549,11 +562,19 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { // can be returned (so as to avoid unnecessary scraping // attempts when a DB lookup returns no results). results := []bgpfinder.BGPDump{} - if query.MinInitialTime != nil && query.Until.Unix() > 0 && query.MinInitialTime.UTC().After(query.Until.UTC()) { - populateDataResponse(w, Data{results}, query) - return + if query.MinInitialTime != nil && len(query.Intervals) > 0 { + allBefore := true + for _, interval := range query.Intervals { + if interval.Until.Unix() == 0 || !query.MinInitialTime.UTC().After(interval.Until.UTC()) { + allBefore = false + break + } + } + if allBefore { + populateDataResponse(w, Data{results}, query) + return + } } - // Log collector details for _, c := range query.Collectors { logger.Info(). diff --git a/db.go b/db.go index 8c66a79..164fd6d 100644 --- a/db.go +++ b/db.go @@ -134,29 +134,48 @@ func UpsertBGPDumps(ctx context.Context, logger *logging.Logger, db *pgxpool.Poo func fetchDataFromDBInternal(ctx context.Context, db *pgxpool.Pool, query Query, until time.Time) ([]BGPDump, error) { var args []interface{} paramCounter := 1 + + // Extract collector names from the query + collectorNames := make([]string, len(query.Collectors)) + for i, c := range query.Collectors { + collectorNames[i] = c.Name + } + args = append(args, collectorNames) + paramCounter++ + // Use a 24-hour buffer on the start of the timestamp scan to allow Postgres // to use the B-tree index on the timestamp column. sqlQuery := ` SELECT url, dump_type, duration, collector_name, EXTRACT(EPOCH FROM timestamp)::bigint FROM bgp_dumps WHERE collector_name = ANY($1) - AND timestamp >= to_timestamp($2) - interval '24 hours' - AND timestamp + duration >= to_timestamp($2) ` - // Extract collector names from the query - collectorNames := make([]string, len(query.Collectors)) - for i, c := range query.Collectors { - collectorNames[i] = c.Name + // Construct the interval conditions + var intervalClauses []string + for _, interval := range query.Intervals { + intUntil := interval.Until + if until.Unix() != 0 && (intUntil.IsZero() || intUntil.After(until)) { + intUntil = until + } + + if intUntil.Unix() != 0 { + clause := fmt.Sprintf("(timestamp >= to_timestamp($%d) - interval '24 hours' AND timestamp + duration >= to_timestamp($%d) AND timestamp <= to_timestamp($%d))", + paramCounter, paramCounter, paramCounter+1) + intervalClauses = append(intervalClauses, clause) + args = append(args, interval.From.Unix(), intUntil.Unix()) + paramCounter += 2 + } else { + clause := fmt.Sprintf("(timestamp >= to_timestamp($%d) - interval '24 hours' AND timestamp + duration >= to_timestamp($%d))", + paramCounter, paramCounter) + intervalClauses = append(intervalClauses, clause) + args = append(args, interval.From.Unix()) + paramCounter ++ + } } - args = append(args, collectorNames, query.From.Unix()) - paramCounter = 3 - // Check if Until is 0, if not, add an end range - if until.Unix() != 0 { - sqlQuery += fmt.Sprintf(" AND timestamp <= to_timestamp($%d)", paramCounter) - args = append(args, until.Unix()) - paramCounter++ + if len(intervalClauses) > 0 { + sqlQuery += " AND (" + strings.Join(intervalClauses, " OR ") + ")" } // Check if dump type was specified, if so query for just that type of file @@ -167,11 +186,9 @@ func fetchDataFromDBInternal(ctx context.Context, db *pgxpool.Pool, query Query, } if query.MinInitialTime != nil { - if until.Unix() == 0 || query.MinInitialTime.Unix() <= until.Unix() { - sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d)", paramCounter) - paramCounter++ - args = append(args, query.MinInitialTime.Unix()) - } + sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d)", paramCounter) + paramCounter++ + args = append(args, query.MinInitialTime.Unix()) } if query.DataAddedSince != nil { @@ -256,16 +273,37 @@ func fetchDataFromDBInternal(ctx context.Context, db *pgxpool.Pool, query Query, // FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types. func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) { - internalUntil := query.Until + + if len(query.Intervals) == 0 { + return nil, nil + } + + var minFrom time.Time + var maxUntil time.Time + isOpenEnded := false + + for _, interval := range query.Intervals { + if minFrom.IsZero() || interval.From.Before(minFrom) { + minFrom = interval.From + } + if interval.Until.IsZero() { + isOpenEnded = true + } else if !isOpenEnded && (maxUntil.IsZero() || interval.Until.After(maxUntil)) { + maxUntil = interval.Until + } + } + internalUntil := maxUntil isWindowed := false - // If the requested range is excessively large (or open-ended), try a smaller initial window - // to reduce database scan load if the result cap is quickly met. - if query.Until.IsZero() || query.Until.Sub(query.From) > MaxScanWindow { - internalUntil = query.From.Add(MaxScanWindow) + // If the requested range is excessively large (or open-ended), + // try a smaller initial window to reduce database scan load if the + // result cap is quickly met. + // Only do this if we have at least one interval. + if isOpenEnded || maxUntil.Sub(minFrom) > MaxScanWindow { + internalUntil = minFrom.Add(MaxScanWindow) isWindowed = true } - + results, err := fetchDataFromDBInternal(ctx, db, query, internalUntil) if err != nil { return nil, err @@ -273,15 +311,11 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD // If the windowed query didn't fill the target limit (and it wasn't the full requested range), // retry once with the full range. - if isWindowed && len(results) < TargetLimit && !query.Until.IsZero() { - results, err = fetchDataFromDBInternal(ctx, db, query, query.Until) + if isWindowed && len(results) < TargetLimit { + results, err = fetchDataFromDBInternal(ctx, db, query, maxUntil) if err != nil { return nil, err } - } else if isWindowed && len(results) < TargetLimit && query.Until.IsZero() { - // For open-ended queries, we could potentially keep windowing or just return what we found. - // For now, let's just do one more larger window or just return. - // Usually a 1-month window is plenty for most BGP data requests. } return ApplyResultCap(results), nil diff --git a/finder.go b/finder.go index 335c78b..b29b5e2 100644 --- a/finder.go +++ b/finder.go @@ -117,17 +117,21 @@ const ( DumpTypeUpdates DumpType = 2 // updates ) +type Interval struct { + // start time (inclusive) + From time.Time + // end time (exclusive) + Until time.Time +} + // TODO: think about how this should work -- just keep it simple! no complex query structures // TODO: add Validate method (e.g., From is before Until, IsADumpType, etc.) type Query struct { // Collectors to search for. All collectors if unset/empty Collectors []Collector - // Query window start time (inclusive) - From time.Time - - // Query window end time (exclusive) - Until time.Time + // Query window intervals + Intervals []Interval // Dump type to search for. Any type if unset DumpType DumpType @@ -147,28 +151,31 @@ type Query struct { ResponseTime time.Time } +func (q Query) FirstInterval() Interval { + if len(q.Intervals) > 0 { + return q.Intervals[0] + } + return Interval{} +} + func (q Query) MarshalJSON() ([]byte, error) { custom := make(map[string]interface{}) - if !(q.From.IsZero() && q.Until.IsZero()) { - custom["intervals"] = []string{strconv.FormatInt(q.From.Unix(), 10) + "," + strconv.FormatInt(q.Until.Unix(), 10)} + + if len(q.Intervals) > 0 { + intervals := make([]string, len(q.Intervals)) + for i, interval := range q.Intervals { + intervals[i] = strconv.FormatInt(interval.From.Unix(), 10) + "," + strconv.FormatInt(interval.Until.Unix(), 10) + } + custom["intervals"] = intervals } + custom["human"] = false custom["projects"] = q.Projects - if len(q.Projects) == 1 { - custom["project"] = q.Projects[0] - } else { - custom["project"] = nil - } collectorNames := make([]string, len(q.Collectors)) for i, c := range q.Collectors { collectorNames[i] = c.Name } custom["collectors"] = collectorNames - if len(q.Collectors) == 1 { - custom["collector"] = q.Collectors[0].Name - } else { - custom["collector"] = nil - } custom["types"] = []string{q.DumpType.String()} if q.DumpType != DumpTypeAny { @@ -206,34 +213,53 @@ type BGPDump struct { Project string `json:"project"` } -// monthInRange checks if any part of the month overlaps with the query range +// monthInRange checks if any part of the month overlaps with any of the +// query intervals func monthInRange(date time.Time, query Query) bool { monthStart := date monthEnd := date.AddDate(0, 1, 0) - start := query.From - if query.MinInitialTime != nil { - start = *query.MinInitialTime - } + for _, interval := range query.Intervals { + start := interval.From + if query.MinInitialTime != nil { + start = *query.MinInitialTime + } - if query.Until.Unix() != 0 { - return monthEnd.After(start) && monthStart.Before(query.Until) + if interval.Until.Unix() != 0 { + if monthEnd.After(start) && monthStart.Before(interval.Until) { + return true + } + } else { + if monthEnd.After(start) { + return true + } + } } - return monthEnd.After(start) + + return len(query.Intervals) == 0 } -// dateInRange checks if a specific timestamp falls within the query range +// dateInRange checks if a specific timestamp falls within any of the query +// intervals func dateInRange(date time.Time, query Query) bool { unixTime := date.Unix() - startTime := query.From.Unix() - if query.MinInitialTime != nil { - startTime = query.MinInitialTime.Unix() - } - if query.Until.Unix() != 0 { - return unixTime >= startTime && unixTime < query.Until.Unix() + for _, interval := range query.Intervals { + startTime := interval.From.Unix() + if query.MinInitialTime != nil { + startTime = query.MinInitialTime.Unix() + } + if interval.Until.Unix() != 0 { + if unixTime >= startTime && unixTime < interval.Until.Unix() { + return true + } + } else { + if unixTime >= startTime { + return true; + } + } } - return unixTime >= startTime + return len(query.Intervals) == 0 } // ApplyResultCap applies the pagination capping logic to a list of BGP dumps. diff --git a/scrape_collectors.go b/scrape_collectors.go index a6aa995..a55be05 100644 --- a/scrape_collectors.go +++ b/scrape_collectors.go @@ -73,8 +73,10 @@ func UpdateCollectorsData(ctx context.Context, logger *logging.Logger, db *pgxpo query := Query{ Collectors: []Collector{collector}, DumpType: DumpTypeAny, - From: time.Unix(0, 0), // Start from Unix epoch (1970-01-01) - Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + Intervals: []Interval{{ + From: time.Unix(0, 0), // Start from Unix epoch (1970-01-01) + Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + }}, } dumps, err := finder.Find(query) From 034cbaed14def598bbfd304b6746923db985edf5 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 10 Mar 2026 17:37:40 -0400 Subject: [PATCH 5/8] Fix bad Query arguments in periodic scraper The change to the Query class to support multiple intervals was not properly ported over to the periodic scraper, so it was still constructing Query instances with a single From and To parameter. --- periodicscraper/periodic_scraper_per_collector.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/periodicscraper/periodic_scraper_per_collector.go b/periodicscraper/periodic_scraper_per_collector.go index f6da0ba..f529106 100644 --- a/periodicscraper/periodic_scraper_per_collector.go +++ b/periodicscraper/periodic_scraper_per_collector.go @@ -114,12 +114,17 @@ func getDumps(ctx context.Context, logger.Info().Str("collector", collector.Name).Msg("Starting to scrape collector data") dumpType := getDumpTypeFromBool(isRibsData) + untilNextDay := time.Now().AddDate(0, 0, 1) query := bgpfinder.Query{ Collectors: []bgpfinder.Collector{collector}, DumpType: dumpType, - From: prevRunTimeEnd, // Start from prevRuntime - Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + Intervals: []bgpfinder.Interval{ + { + From: prevRunTimeEnd, // Start from prevRuntime + Until: untilNextDay, // Until tomorrow (to ensure we get today's data) + }, + }, } dumps, err := finder.Find(query) From 019863f1219146f745fc29bcc97c98ec4b77842b Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 30 Mar 2026 19:36:55 -0400 Subject: [PATCH 6/8] Add route-views.amsix -> locix.fra alias Fix missing '/' in getCollectorURL for aliased names --- routeviews_finder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/routeviews_finder.go b/routeviews_finder.go index 224b2b8..83df23e 100644 --- a/routeviews_finder.go +++ b/routeviews_finder.go @@ -99,6 +99,7 @@ func (f *RouteViewsFinder) GetCollectorNameAliases(project string) (map[string]s return map[string]string{ "route-views2.saopaulo": "ix-br2.gru", "route-views.saopaulo": "ix-br.gru", + "route-views.amsix": "locix.fra", }, nil } @@ -171,7 +172,7 @@ func (f *RouteViewsFinder) getCollectorURL(collector Collector) string { collectorNameOverrides["route-views2"] = "" if override, exists := collectorNameOverrides[collector.Name]; exists { - return RouteviewsArchiveUrl + override + "bgpdata/" + return RouteviewsArchiveUrl + override + "/bgpdata/" } return RouteviewsArchiveUrl + collector.Name + "/bgpdata/" From 99ea2703bd179bbb24c23a519d64403571f203d7 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 31 Mar 2026 12:53:42 +1300 Subject: [PATCH 7/8] Add alias checking to singular collector parameters This resolves an inconsistency between using a defunct collector name in the query parameters `collectors[]=XXX` vs `collector=XXX` --- cmd/bgpfinder-server/main.go | 49 ++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 89c5aa7..923a688 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -356,10 +356,25 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc jsonResponse(w, collectorsResponse) } else { // Return specific collector if exists - for _, collector := range collectors { - if collector.Name == collectorName { - jsonResponse(w, collector) - return + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + http.Error(w, fmt.Sprintf("Error fetching defunct collector aliases: %v", err), http.StatusInternalServerError) + return + } + collectorMap := make(map[string]bgpfinder.Collector) + for _, c := range collectors { + collectorMap[c.Name] = c + } + + if collector, exists := collectorMap[collectorName]; exists { + jsonResponse(w, collector) + return + } else if alias, avail := aliases[collectorName]; avail { + if alias != "" { + if col, repl := collectorMap[alias]; repl { + jsonResponse(w, col) + return + } } } http.Error(w, "Collector not found", http.StatusNotFound) @@ -474,20 +489,32 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { } } } else if collectorParam != "" { + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + return query, fmt.Errorf("error fetching defunct collector aliases: %v", err) + } + allCollectors, err := bgpfinder.Collectors("") - found := false if err != nil { return query, fmt.Errorf("error fetching collectors: %v", err) } + collectorMap := make(map[string]bgpfinder.Collector) for _, c := range allCollectors { - if collectorParam == c.Name { - collectors = append(collectors, c) - found = true - break - } + collectorMap[c.Name] = c } - if !found { + + if collector, exists := collectorMap[collectorParam]; exists { + collectors = append(collectors, collector) + } else if alias, avail := aliases[collectorParam]; avail { + if alias != "" { + if col, repl := collectorMap[alias]; repl { + collectors = append(collectors, col) + } else { + return query, fmt.Errorf("unknown collector alias: %s -> %s", collectorParam, alias) + } + } + } else { return query, fmt.Errorf("collector not found: %s", collectorParam) } } else { From e508132d8f6bff0a77cf367b3dcdebc93223c28d Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 31 Mar 2026 13:13:47 +1300 Subject: [PATCH 8/8] Add 'aliases' field to responses for meta/collectors/X This change also returns the dump properties for the current iteration of the collector when the request is for a former name. --- cmd/bgpfinder-server/main.go | 44 ++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 923a688..09eaf87 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -236,6 +236,7 @@ func (c ResponseCollector) MarshalJSON() ([]byte, error) { custom := map[string]interface{}{ "project": c.Project, "dataTypes": dataTypes, + "aliases": c.Aliases, } return json.Marshal(custom) } @@ -247,6 +248,9 @@ type ResponseCollector struct { // Name of the collector Name string `json:"name"` + // List of aliases for the collector + Aliases []string `json:"aliases"` + OldestRibsDump string OldestUpdatesDump string LatestRibsDump string @@ -271,6 +275,18 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { return } + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + http.Error(w, fmt.Sprintf("Error checking collector aliases: %v", err), http.StatusInternalServerError) + return + } + invertedAliases := make(map[string][]string) + for alias, realName := range aliases { + if realName != "" { + invertedAliases[realName] = append(invertedAliases[realName], alias) + } + } + projectsMap := make(map[string]map[string]map[string]ResponseCollector) // Find matching projects for _, project := range projects { @@ -286,6 +302,7 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { c := ResponseCollector{ Project: collector.Project, Name: collector.Name, + Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...), OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump, OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump, LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump, @@ -327,6 +344,18 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc return } + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + http.Error(w, fmt.Sprintf("Error checking collector aliases status: %v", err), http.StatusInternalServerError) + return + } + invertedAliases := make(map[string][]string) + for alias, realName := range aliases { + if realName != "" { + invertedAliases[realName] = append(invertedAliases[realName], alias) + } + } + collectorsMap := make(map[string]ResponseCollector) for _, collector := range collectors { @@ -334,6 +363,7 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc collectorsMap[collector.Name] = ResponseCollector{ Project: collector.Project, Name: collector.Name, + Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...), OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump, OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump, LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump, @@ -356,22 +386,12 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc jsonResponse(w, collectorsResponse) } else { // Return specific collector if exists - aliases, err := bgpfinder.GetCollectorNameAliases("") - if err != nil { - http.Error(w, fmt.Sprintf("Error fetching defunct collector aliases: %v", err), http.StatusInternalServerError) - return - } - collectorMap := make(map[string]bgpfinder.Collector) - for _, c := range collectors { - collectorMap[c.Name] = c - } - - if collector, exists := collectorMap[collectorName]; exists { + if collector, exists := collectorsMap[collectorName]; exists { jsonResponse(w, collector) return } else if alias, avail := aliases[collectorName]; avail { if alias != "" { - if col, repl := collectorMap[alias]; repl { + if col, repl := collectorsMap[alias]; repl { jsonResponse(w, col) return }