diff --git a/cmd/bgpfinder-server/main.go b/cmd/bgpfinder-server/main.go index 6f3cd90..09eaf87 100644 --- a/cmd/bgpfinder-server/main.go +++ b/cmd/bgpfinder-server/main.go @@ -236,6 +236,7 @@ func (c ResponseCollector) MarshalJSON() ([]byte, error) { custom := map[string]interface{}{ "project": c.Project, "dataTypes": dataTypes, + "aliases": c.Aliases, } return json.Marshal(custom) } @@ -247,6 +248,9 @@ type ResponseCollector struct { // Name of the collector Name string `json:"name"` + // List of aliases for the collector + Aliases []string `json:"aliases"` + OldestRibsDump string OldestUpdatesDump string LatestRibsDump string @@ -271,6 +275,18 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { return } + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + http.Error(w, fmt.Sprintf("Error checking collector aliases: %v", err), http.StatusInternalServerError) + return + } + invertedAliases := make(map[string][]string) + for alias, realName := range aliases { + if realName != "" { + invertedAliases[realName] = append(invertedAliases[realName], alias) + } + } + projectsMap := make(map[string]map[string]map[string]ResponseCollector) // Find matching projects for _, project := range projects { @@ -286,6 +302,7 @@ func projectHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { c := ResponseCollector{ Project: collector.Project, Name: collector.Name, + Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...), OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump, OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump, LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump, @@ -327,6 +344,18 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc return } + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + http.Error(w, fmt.Sprintf("Error checking collector aliases status: %v", err), http.StatusInternalServerError) + return + } + invertedAliases := make(map[string][]string) + for alias, realName := range aliases { + if realName != "" { + invertedAliases[realName] = append(invertedAliases[realName], alias) + } + } + collectorsMap := make(map[string]ResponseCollector) for _, collector := range collectors { @@ -334,6 +363,7 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc collectorsMap[collector.Name] = ResponseCollector{ Project: collector.Project, Name: collector.Name, + Aliases: append([]string{collector.Name}, invertedAliases[collector.Name]...), OldestRibsDump: oldestLatestDumps[collector.Name].OldestRibsDump, OldestUpdatesDump: oldestLatestDumps[collector.Name].OldestUpdatesDump, LatestRibsDump: oldestLatestDumps[collector.Name].LatestRibsDump, @@ -356,10 +386,15 @@ func collectorHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc jsonResponse(w, collectorsResponse) } else { // Return specific collector if exists - for _, collector := range collectors { - if collector.Name == collectorName { - jsonResponse(w, collector) - return + if collector, exists := collectorsMap[collectorName]; exists { + jsonResponse(w, collector) + return + } else if alias, avail := aliases[collectorName]; avail { + if alias != "" { + if col, repl := collectorsMap[alias]; repl { + jsonResponse(w, col) + return + } } } http.Error(w, "Collector not found", http.StatusNotFound) @@ -374,33 +409,49 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { intervalsParams := queryParams["intervals[]"] collectorsParams := queryParams["collectors[]"] typesParams := queryParams["types[]"] + if len(typesParams) == 0 { + typesParams=queryParams["type"] + } + + projectParams := queryParams["projects[]"] + if len(projectParams) == 0 { + if p := queryParams.Get("project"); p != "" { + projectParams = []string{p} + } + } + query.Projects = projectParams collectorParam := queryParams.Get("collector") minInitialTime := queryParams.Get("minInitialTime") dataAddedSince := queryParams.Get("dataAddedSince") - // Parse interval + // Parse intervals if len(intervalsParams) == 0 { return query, fmt.Errorf("at least one interval is required") } - times := strings.Split(intervalsParams[0], ",") - if len(times) != 2 { - return query, fmt.Errorf("invalid interval format. Expected format: start,end") - } + for _, intervalStr := range intervalsParams { + times := strings.Split(intervalStr, ",") + if len(times) != 2 { + return query, fmt.Errorf("invalid interval format. Expected format: start,end") + } - startInt, err := strconv.ParseInt(times[0], 10, 64) - if err != nil { - return query, fmt.Errorf("invalid start time: %v", err) - } + startInt, err := strconv.ParseInt(times[0], 10, 64) + if err != nil { + return query, fmt.Errorf("invalid start time: %v", err) + } - endInt, err := strconv.ParseInt(times[1], 10, 64) - if err != nil { - return query, fmt.Errorf("invalid end time: %v", err) - } + endInt, err := strconv.ParseInt(times[1], 10, 64) + if err != nil { + return query, fmt.Errorf("invalid end time: %v", err) + } + + query.Intervals = append(query.Intervals, bgpfinder.Interval{ + From: time.Unix(startInt, 0), + Until: time.Unix(endInt, 0), + }) - query.From = time.Unix(startInt, 0) - query.Until = time.Unix(endInt, 0) + } if minInitialTime != "" { minInitialTimeInt, err := strconv.ParseInt(minInitialTime, 10, 64) @@ -458,29 +509,57 @@ func parseDataRequest(r *http.Request) (bgpfinder.Query, error) { } } } else if collectorParam != "" { + aliases, err := bgpfinder.GetCollectorNameAliases("") + if err != nil { + return query, fmt.Errorf("error fetching defunct collector aliases: %v", err) + } + allCollectors, err := bgpfinder.Collectors("") - found := false if err != nil { return query, fmt.Errorf("error fetching collectors: %v", err) } + collectorMap := make(map[string]bgpfinder.Collector) for _, c := range allCollectors { - if collectorParam == c.Name { - collectors = append(collectors, c) - found = true - break - } + collectorMap[c.Name] = c } - if !found { + + if collector, exists := collectorMap[collectorParam]; exists { + collectors = append(collectors, collector) + } else if alias, avail := aliases[collectorParam]; avail { + if alias != "" { + if col, repl := collectorMap[alias]; repl { + collectors = append(collectors, col) + } else { + return query, fmt.Errorf("unknown collector alias: %s -> %s", collectorParam, alias) + } + } + } else { return query, fmt.Errorf("collector not found: %s", collectorParam) } } else { // Use all collectors + var err error collectors, err = bgpfinder.Collectors("") if err != nil { return query, fmt.Errorf("error fetching collectors: %v", err) } } + + // exclude collectors that are not part of the included projects + if len(query.Projects) > 0 { + projectMap := make(map[string]bool) + for _, p := range query.Projects { + projectMap[p] = true + } + var filtered []bgpfinder.Collector + for _, c := range collectors { + if projectMap[c.Project] { + filtered = append(filtered, c) + } + } + collectors = filtered + } query.Collectors = collectors // Parse types @@ -509,10 +588,17 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { // Log the parsed query details in UTC evt := logger.Info(). - Time("from", query.From.UTC()). - Time("until", query.Until.UTC()). Str("dump_type", query.DumpType.String()). - Int("collector_count", len(query.Collectors)) + Int("collector_count", len(query.Collectors)). + Int("interval_count", len(query.Intervals)) + + if len(query.Intervals) > 0 { + first := query.Intervals[0] + last := query.Intervals[len(query.Intervals)-1] + evt.Time("first_from", first.From.UTC()). + Time("last_until", last.Until.UTC()) + } + if query.MinInitialTime != nil { evt.Time("minInitialTime", query.MinInitialTime.UTC()) @@ -523,11 +609,19 @@ func dataHandler(db *pgxpool.Pool, logger *logging.Logger) http.HandlerFunc { // can be returned (so as to avoid unnecessary scraping // attempts when a DB lookup returns no results). results := []bgpfinder.BGPDump{} - if query.MinInitialTime != nil && query.Until.Unix() > 0 && query.MinInitialTime.UTC().After(query.Until.UTC()) { - populateDataResponse(w, Data{results}, query) - return + if query.MinInitialTime != nil && len(query.Intervals) > 0 { + allBefore := true + for _, interval := range query.Intervals { + if interval.Until.Unix() == 0 || !query.MinInitialTime.UTC().After(interval.Until.UTC()) { + allBefore = false + break + } + } + if allBefore { + populateDataResponse(w, Data{results}, query) + return + } } - // Log collector details for _, c := range query.Collectors { logger.Info(). diff --git a/db.go b/db.go index 06f4db6..164fd6d 100644 --- a/db.go +++ b/db.go @@ -130,32 +130,52 @@ func UpsertBGPDumps(ctx context.Context, logger *logging.Logger, db *pgxpool.Poo return nil } -// FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types. -func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) { - +// fetchDataFromDBInternal retrieves BGP dump data for a specific time range. +func fetchDataFromDBInternal(ctx context.Context, db *pgxpool.Pool, query Query, until time.Time) ([]BGPDump, error) { var args []interface{} paramCounter := 1 - sqlQuery := ` - SELECT url, dump_type, duration, collector_name, EXTRACT(EPOCH FROM timestamp)::bigint - FROM bgp_dumps - WHERE collector_name = ANY($1) - AND timestamp + duration >= to_timestamp($2) - ` // Extract collector names from the query collectorNames := make([]string, len(query.Collectors)) for i, c := range query.Collectors { collectorNames[i] = c.Name } - args = append(args, collectorNames, query.From.Unix()) - paramCounter = 3 - - // Check if Until is 0, if not, add an end range - if query.Until.Unix() != 0 { - sqlQuery += `` - sqlQuery += fmt.Sprintf(" AND timestamp <= to_timestamp($%d)", paramCounter) - args = append(args, query.Until.Unix()) - paramCounter++ + args = append(args, collectorNames) + paramCounter++ + + // Use a 24-hour buffer on the start of the timestamp scan to allow Postgres + // to use the B-tree index on the timestamp column. + sqlQuery := ` + SELECT url, dump_type, duration, collector_name, EXTRACT(EPOCH FROM timestamp)::bigint + FROM bgp_dumps + WHERE collector_name = ANY($1) + ` + + // Construct the interval conditions + var intervalClauses []string + for _, interval := range query.Intervals { + intUntil := interval.Until + if until.Unix() != 0 && (intUntil.IsZero() || intUntil.After(until)) { + intUntil = until + } + + if intUntil.Unix() != 0 { + clause := fmt.Sprintf("(timestamp >= to_timestamp($%d) - interval '24 hours' AND timestamp + duration >= to_timestamp($%d) AND timestamp <= to_timestamp($%d))", + paramCounter, paramCounter, paramCounter+1) + intervalClauses = append(intervalClauses, clause) + args = append(args, interval.From.Unix(), intUntil.Unix()) + paramCounter += 2 + } else { + clause := fmt.Sprintf("(timestamp >= to_timestamp($%d) - interval '24 hours' AND timestamp + duration >= to_timestamp($%d))", + paramCounter, paramCounter) + intervalClauses = append(intervalClauses, clause) + args = append(args, interval.From.Unix()) + paramCounter ++ + } + } + + if len(intervalClauses) > 0 { + sqlQuery += " AND (" + strings.Join(intervalClauses, " OR ") + ")" } // Check if dump type was specified, if so query for just that type of file @@ -166,11 +186,9 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD } if query.MinInitialTime != nil { - if query.Until.Unix() == 0 || query.MinInitialTime.Unix() <= query.Until.Unix() { - sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d)", paramCounter) - paramCounter ++ - args = append(args, query.MinInitialTime.Unix()) - } + sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d)", paramCounter) + paramCounter++ + args = append(args, query.MinInitialTime.Unix()) } if query.DataAddedSince != nil { @@ -182,42 +200,43 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD // Alternative implementation to above?: /* - if query.MinInitialTime != nil { - if query.DataAddedSince == nil { - sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)", paramCounter, paramCounter+1) - paramCounter += 2 - args = append(args, query.MinInitialTime.Unix(), query.MinInitialTime.Add(time.Duration(2)*time.Hour) - } else { - sqlQuery += fmt.Sprintf( - ` AND ( - (timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)) - OR - (timestamp < to_timestamp($%d) AND timestamp > to_timestamp($%d) AND cdate > to_timestamp($%d) AND cdate < to_timestamp($%d)) - ) - `, - paramCounter, - paramCounter+1, - paramCounter+2, - paramCounter+3, - paramCounter+4, - paramCounter+5) - - paramCounter += 6 - - args = append(args, - query.MinInitialTime.Unix(), - query.MinInitialTime.Add(time.Duration(2)*time.Hour).Unix(), - query.MinInitialTime.Unix(), - query.MinInitialTime.Add(-time.Duration(86400)*time.Second).Unix(), - query.DataAddedSince.Unix(), - query.ResponseTime.Unix(), - ) - } - } - */ + if query.MinInitialTime != nil { + if query.DataAddedSince == nil { + sqlQuery += fmt.Sprintf(" AND timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)", paramCounter, paramCounter+1) + paramCounter += 2 + args = append(args, query.MinInitialTime.Unix(), query.MinInitialTime.Add(time.Duration(2)*time.Hour) + } else { + sqlQuery += fmt.Sprintf( + ` AND ( + (timestamp >= to_timestamp($%d) AND timestamp <= to_timestamp($%d)) + OR + (timestamp < to_timestamp($%d) AND timestamp > to_timestamp($%d) AND cdate > to_timestamp($%d) AND cdate < to_timestamp($%d)) + ) + `, + paramCounter, + paramCounter+1, + paramCounter+2, + paramCounter+3, + paramCounter+4, + paramCounter+5) + + paramCounter += 6 + + args = append(args, + query.MinInitialTime.Unix(), + query.MinInitialTime.Add(time.Duration(2)*time.Hour).Unix(), + query.MinInitialTime.Unix(), + query.MinInitialTime.Add(-time.Duration(86400)*time.Second).Unix(), + query.DataAddedSince.Unix(), + query.ResponseTime.Unix(), + ) + } + } + */ - sqlQuery += " ORDER BY timestamp ASC, dump_type ASC" // This ORDER BY may be bad for performance? But putting it there to match bgpstream ordering (which I think this is) + sqlQuery += " ORDER BY timestamp ASC, dump_type ASC" + sqlQuery += fmt.Sprintf(" LIMIT %d", HardLimit) rows, err := db.Query(ctx, sqlQuery, args...) if err != nil { @@ -249,10 +268,59 @@ func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPD Project: determineProjectName(collectorName), }) } - return results, nil } +// FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types. +func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) { + + if len(query.Intervals) == 0 { + return nil, nil + } + + var minFrom time.Time + var maxUntil time.Time + isOpenEnded := false + + for _, interval := range query.Intervals { + if minFrom.IsZero() || interval.From.Before(minFrom) { + minFrom = interval.From + } + if interval.Until.IsZero() { + isOpenEnded = true + } else if !isOpenEnded && (maxUntil.IsZero() || interval.Until.After(maxUntil)) { + maxUntil = interval.Until + } + } + internalUntil := maxUntil + isWindowed := false + + // If the requested range is excessively large (or open-ended), + // try a smaller initial window to reduce database scan load if the + // result cap is quickly met. + // Only do this if we have at least one interval. + if isOpenEnded || maxUntil.Sub(minFrom) > MaxScanWindow { + internalUntil = minFrom.Add(MaxScanWindow) + isWindowed = true + } + + results, err := fetchDataFromDBInternal(ctx, db, query, internalUntil) + if err != nil { + return nil, err + } + + // If the windowed query didn't fill the target limit (and it wasn't the full requested range), + // retry once with the full range. + if isWindowed && len(results) < TargetLimit { + results, err = fetchDataFromDBInternal(ctx, db, query, maxUntil) + if err != nil { + return nil, err + } + } + + return ApplyResultCap(results), nil +} + func determineProjectName(collector string) string { if strings.HasPrefix(collector, "rrc") { return "ris" diff --git a/finder.go b/finder.go index 8bb5de8..b29b5e2 100644 --- a/finder.go +++ b/finder.go @@ -3,11 +3,47 @@ package bgpfinder import ( "encoding/json" "fmt" + "os" + "sort" "strconv" "strings" "time" ) +var ( + // TargetLimit is the preferred maximum number of results to return. + // Can be overridden by BGPFINDER_TARGET_LIMIT env var. + TargetLimit = 500 + + // HardLimit is the absolute maximum number of results to return, + // even when extending to include all results with the same timestamp. + // Can be overridden by BGPFINDER_HARD_LIMIT env var. + HardLimit = 1000 + + // MaxScanWindow is the maximum time range to scan in a single database query. + // Queries exceeding this will be windowed for efficiency. + // Can be overridden by BGPFINDER_MAX_SCAN_WINDOW (in days) env var. + MaxScanWindow = 30 * 24 * time.Hour +) + +func init() { + if tl := os.Getenv("BGPFINDER_TARGET_LIMIT"); tl != "" { + if val, err := strconv.Atoi(tl); err == nil { + TargetLimit = val + } + } + if hl := os.Getenv("BGPFINDER_HARD_LIMIT"); hl != "" { + if val, err := strconv.Atoi(hl); err == nil { + HardLimit = val + } + } + if mw := os.Getenv("BGPFINDER_MAX_SCAN_WINDOW"); mw != "" { + if val, err := strconv.Atoi(mw); err == nil { + MaxScanWindow = time.Duration(val) * 24 * time.Hour + } + } +} + // Finder Just a sketch of what the base Finder interface might look like. Everything // gets built on top of (or under, I guess) this. type Finder interface { @@ -81,21 +117,28 @@ const ( DumpTypeUpdates DumpType = 2 // updates ) +type Interval struct { + // start time (inclusive) + From time.Time + // end time (exclusive) + Until time.Time +} + // TODO: think about how this should work -- just keep it simple! no complex query structures // TODO: add Validate method (e.g., From is before Until, IsADumpType, etc.) type Query struct { // Collectors to search for. All collectors if unset/empty Collectors []Collector - // Query window start time (inclusive) - From time.Time - - // Query window end time (exclusive) - Until time.Time + // Query window intervals + Intervals []Interval // Dump type to search for. Any type if unset DumpType DumpType + // Projects to search for. All projects if empty or unset + Projects []string + // Min initial time MinInitialTime *time.Time @@ -108,12 +151,38 @@ type Query struct { ResponseTime time.Time } +func (q Query) FirstInterval() Interval { + if len(q.Intervals) > 0 { + return q.Intervals[0] + } + return Interval{} +} + func (q Query) MarshalJSON() ([]byte, error) { custom := make(map[string]interface{}) - if !(q.From.IsZero() && q.Until.IsZero()) { - custom["intervals"] = strconv.FormatInt(q.From.Unix(), 10) + "," + strconv.FormatInt(q.Until.Unix(), 10) + + if len(q.Intervals) > 0 { + intervals := make([]string, len(q.Intervals)) + for i, interval := range q.Intervals { + intervals[i] = strconv.FormatInt(interval.From.Unix(), 10) + "," + strconv.FormatInt(interval.Until.Unix(), 10) + } + custom["intervals"] = intervals } + custom["human"] = false + custom["projects"] = q.Projects + collectorNames := make([]string, len(q.Collectors)) + for i, c := range q.Collectors { + collectorNames[i] = c.Name + } + custom["collectors"] = collectorNames + + custom["types"] = []string{q.DumpType.String()} + if q.DumpType != DumpTypeAny { + custom["type"] = q.DumpType.String() + } else { + custom["type"] = nil + } return json.Marshal(custom) } @@ -144,32 +213,85 @@ type BGPDump struct { Project string `json:"project"` } -// monthInRange checks if any part of the month overlaps with the query range +// monthInRange checks if any part of the month overlaps with any of the +// query intervals func monthInRange(date time.Time, query Query) bool { monthStart := date monthEnd := date.AddDate(0, 1, 0) - start := query.From - if query.MinInitialTime != nil { - start = *query.MinInitialTime + for _, interval := range query.Intervals { + start := interval.From + if query.MinInitialTime != nil { + start = *query.MinInitialTime + } + + if interval.Until.Unix() != 0 { + if monthEnd.After(start) && monthStart.Before(interval.Until) { + return true + } + } else { + if monthEnd.After(start) { + return true + } + } } - if query.Until.Unix() != 0 { - return monthEnd.After(start) && monthStart.Before(query.Until) - } - return monthEnd.After(start) + return len(query.Intervals) == 0 } -// dateInRange checks if a specific timestamp falls within the query range +// dateInRange checks if a specific timestamp falls within any of the query +// intervals func dateInRange(date time.Time, query Query) bool { unixTime := date.Unix() - startTime := query.From.Unix() - if query.MinInitialTime != nil { - startTime = query.MinInitialTime.Unix() + + for _, interval := range query.Intervals { + startTime := interval.From.Unix() + if query.MinInitialTime != nil { + startTime = query.MinInitialTime.Unix() + } + if interval.Until.Unix() != 0 { + if unixTime >= startTime && unixTime < interval.Until.Unix() { + return true + } + } else { + if unixTime >= startTime { + return true; + } + } + } + return len(query.Intervals) == 0 +} + +// ApplyResultCap applies the pagination capping logic to a list of BGP dumps. +func ApplyResultCap(results []BGPDump) []BGPDump { + if len(results) <= TargetLimit { + return results + } + + // Ensure results are sorted by timestamp and then dump type + sort.Slice(results, func(i, j int) bool { + if results[i].Timestamp == results[j].Timestamp { + return results[i].DumpType < results[j].DumpType + } + return results[i].Timestamp < results[j].Timestamp + }) + + limitTs := results[TargetLimit].Timestamp + + if results[0].Timestamp == limitTs { + // The first timestamp already crosses/hits the target limit. + // Include all results with this timestamp, up to HardLimit. + i := 0 + for i < len(results) && results[i].Timestamp == limitTs && i < HardLimit { + i++ + } + return results[:i] } - if query.Until.Unix() != 0 { - return unixTime >= startTime && unixTime < query.Until.Unix() + // Otherwise, exclude all results that have the timestamp that crosses the limit. + j := TargetLimit + for j > 0 && results[j-1].Timestamp == limitTs { + j-- } - return unixTime >= startTime + return results[:j] } diff --git a/multi_finder.go b/multi_finder.go index 113632f..3f2eb53 100644 --- a/multi_finder.go +++ b/multi_finder.go @@ -173,7 +173,7 @@ func (m *MultiFinder) Find(query Query) ([]BGPDump, error) { dumps = append(dumps, dump...) } - return dumps, nil + return ApplyResultCap(dumps), nil } func (m *MultiFinder) getFinderByProject(projName string) (Finder, bool) { diff --git a/periodicscraper/periodic_scraper_per_collector.go b/periodicscraper/periodic_scraper_per_collector.go index f6da0ba..f529106 100644 --- a/periodicscraper/periodic_scraper_per_collector.go +++ b/periodicscraper/periodic_scraper_per_collector.go @@ -114,12 +114,17 @@ func getDumps(ctx context.Context, logger.Info().Str("collector", collector.Name).Msg("Starting to scrape collector data") dumpType := getDumpTypeFromBool(isRibsData) + untilNextDay := time.Now().AddDate(0, 0, 1) query := bgpfinder.Query{ Collectors: []bgpfinder.Collector{collector}, DumpType: dumpType, - From: prevRunTimeEnd, // Start from prevRuntime - Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + Intervals: []bgpfinder.Interval{ + { + From: prevRunTimeEnd, // Start from prevRuntime + Until: untilNextDay, // Until tomorrow (to ensure we get today's data) + }, + }, } dumps, err := finder.Find(query) diff --git a/routeviews_finder.go b/routeviews_finder.go index 224b2b8..83df23e 100644 --- a/routeviews_finder.go +++ b/routeviews_finder.go @@ -99,6 +99,7 @@ func (f *RouteViewsFinder) GetCollectorNameAliases(project string) (map[string]s return map[string]string{ "route-views2.saopaulo": "ix-br2.gru", "route-views.saopaulo": "ix-br.gru", + "route-views.amsix": "locix.fra", }, nil } @@ -171,7 +172,7 @@ func (f *RouteViewsFinder) getCollectorURL(collector Collector) string { collectorNameOverrides["route-views2"] = "" if override, exists := collectorNameOverrides[collector.Name]; exists { - return RouteviewsArchiveUrl + override + "bgpdata/" + return RouteviewsArchiveUrl + override + "/bgpdata/" } return RouteviewsArchiveUrl + collector.Name + "/bgpdata/" diff --git a/scrape_collectors.go b/scrape_collectors.go index a6aa995..a55be05 100644 --- a/scrape_collectors.go +++ b/scrape_collectors.go @@ -73,8 +73,10 @@ func UpdateCollectorsData(ctx context.Context, logger *logging.Logger, db *pgxpo query := Query{ Collectors: []Collector{collector}, DumpType: DumpTypeAny, - From: time.Unix(0, 0), // Start from Unix epoch (1970-01-01) - Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + Intervals: []Interval{{ + From: time.Unix(0, 0), // Start from Unix epoch (1970-01-01) + Until: time.Now().AddDate(0, 0, 1), // Until tomorrow (to ensure we get today's data) + }}, } dumps, err := finder.Find(query)