Skip to content

JustinLoye/bgpflux

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

bgpflux

Docs Crates.io CI License

A Rust library and CLI tool for historical BGP data analysis.
Similar to bgpreader, it streams BGP elements in chronological order from:

  • Multiple RIPE RIS and RouteViews route collectors
  • Arbitrary time ranges
  • Both RIBs and updates

Features

  • Archive & Live streaming: Query historical archives via BGPKIT Broker, or tap into real-time feeds from RIS Live (WebSocket) and RouteViews Live (Kafka)
  • Chronologically sorted output: Elements from multiple collectors are merged in timestamp order via k-way merge
  • High performance: Throughput on par with bgpreader
  • Filtering: Origin ASN, prefix, peer IP/ASN, AS path regex, community regex, IP version
  • Caching: Optional local file caching to skip re-downloading archive data
  • Jitter buffer: Reorder live stream elements into chronological order using a configurable delay window

Installation

As a CLI tool

# Archive mode only (default — no WebSocket/Kafka dependencies)
cargo install bgpflux

# With live streaming support (RIS Live + RouteViews Live)
cargo install bgpflux --features live

As a library

# Archive-only (default)
[dependencies]
bgpflux = "0.2"

# With live streaming
[dependencies]
bgpflux = { version = "0.2", features = ["live"] }

You can also enable backends individually with live-ris (RIPE RIS Live via WebSocket) or live-routeviews (RouteViews Live via Kafka).

Quick Start

CLI — Archive Mode

Stream BGP updates from two collectors over a one-hour window:

bgpflux \
  --start "2025-01-15T12:00:00Z" \
  --end "2025-01-15T13:00:00Z" \
  -c route-views.wide,rrc04 \
  -t update

Stream RIB dumps:

bgpflux \
  --start "2025-01-15T12:00:00Z" \
  --end "2025-01-15T13:00:00Z" \
  -c route-views.wide,rrc04 \
  -t rib

Stream both RIBs and updates together:

bgpflux \
  --start "2025-01-15T12:00:00Z" \
  --end "2025-01-15T13:00:00Z" \
  -c route-views.wide,rrc04 \
  -t rib,update

Enable local caching to avoid re-downloading on repeated runs:

bgpflux \
  --start "2025-01-15T12:00:00Z" \
  --end "2025-01-15T13:00:00Z" \
  -c route-views.wide,rrc04 \
  -t update \
  --cache-dir ./bgp_cache

CLI — Live Mode (requires --features live)

Stream real-time BGP updates from RIS and RouteViews collectors:

bgpflux --live -c rrc00,route-views2

Use a jitter buffer (in seconds) to reorder elements by timestamp:

bgpflux --live -c rrc00,route-views2 --delay 15

CLI — Filtering

Filter by origin ASN:

bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
  -c rrc00 -t update -o 13335

Filter by prefix:

bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
  -c rrc00 -t update -p 1.1.1.0/24

Filter by AS path regex:

bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
  -c rrc00 -t update -a "13335$"

IPv6 only:

bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
  -c rrc00 -t update -6

Generic filter expressions:

bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
  -c rrc00 -t update \
  -f "origin_asn!=13335" -f "peer_asn=15169"

As a Library

use bgpflux::{BgpStream, BgpStreamConfig, DataType};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BgpStreamConfig::new(
        "2025-01-15T12:00:00Z",
        "2025-01-15T13:00:00Z",
        &["route-views.wide", "rrc04"],
        DataType::Update,
    )?;

    let stream = BgpStream::new(config).build();

    for elem in stream {
        println!("{}", elem);
    }

    Ok(())
}

See the examples/ directory for more:

Example Description
basic_stream Stream updates from multiple collectors
cached_stream Use local caching to speed up repeated queries
filtered_stream Apply filters (origin ASN, prefix, AS path)
rib_snapshot Collect unique prefixes from a RIB dump
live_stream Real-time BGP streaming with a jitter buffer (requires live feature)
elem_fields Access individual fields of BGP elements

Output Format

bgpflux outputs pipe-delimited records, one per line:

Type|Timestamp|PeerIP|PeerASN|Prefix|AsPath|Origin|NextHop|LocalPref|MED|Communities|Atomic|AggrASN|AggrIP|Collector

Where Type is A (announce), W (withdraw), or R (RIB entry). Example:

A|1736942400.469656|80.249.211.96|50629|193.186.6.0/24|50629 6939 47541|IGP|80.249.211.96|0|0|50629:500 50629:2000 50629:6939|false|||rrc04
W|1736942400.469656|80.249.211.96|50629|2a0e:b107:1a0::/44||||||||||rrc04
R|1736942400|80.249.211.96|50629|193.186.6.0/24|50629 6939 47541|IGP|80.249.211.96|0|0||false|||rrc04

Core Components

BgpStream

Main interface for streaming historical BGP data. Queries BGPKIT Broker to discover archive files, downloads and parses them, and merges elements from all collectors in timestamp order.

use bgpflux::{BgpStream, BgpStreamConfig, DataType};

let config = BgpStreamConfig::new(
    "2025-01-15T12:00:00Z",
    "2025-01-15T13:00:00Z",
    &["route-views.wide"],
    DataType::Update,
).unwrap();

let stream = BgpStream::new(config)
    .cache_dir("./bgp_cache")  // optional
    .build();

BgpStreamConfig

Configuration builder for archive streams. Supports multiple timestamp formats:

  • RFC 3339: "2025-01-15T12:00:00Z", "2025-01-15T12:00:00+02:00"
  • ISO 8601 without timezone (assumed UTC): "2025-01-15T12:00:00"
  • Date only (midnight UTC): "2025-01-15", "2025/01/15", "20250115"
  • Unix timestamp: "1736942400"

BgpStreamElem

A single BGP element. Wraps bgpkit_parser::BgpElem and adds a collector_id and elem_type. Implements Deref<Target = BgpElem>, so all BgpElem fields (timestamp, prefix, as_path, peer_ip, etc.) are directly accessible.

for elem in stream {
    println!(
        "{} {} via AS{} from {}",
        elem.elem_type, elem.prefix, elem.peer_asn, elem.collector_id
    );
}

LiveBgpStream (requires live feature)

Real-time streaming from RIS Live (WebSocket) and RouteViews Live (Kafka). Automatically routes collectors to the right backend based on their name (rrc* → RIS, route-views* → RouteViews).

use bgpflux::{LiveBgpStream, LiveConfig, JitterBufferExt};
use std::time::Duration;

let config = LiveConfig::new(&["rrc00", "route-views2"]).unwrap();
let stream = LiveBgpStream::new(config)
    .build()
    .jitter_buffer(Duration::from_secs(15));

for elem in stream {
    println!("{}", elem);
}

JitterBufferExt

Extension trait on any Iterator<Item = BgpStreamElem>. Wraps the iterator in a binary-heap-based reordering buffer that holds elements for a configurable delay window before emitting them in timestamp order. Useful for live streams where elements from different collectors may arrive slightly out of order.

Filtering

Filters are applied at the parser level using bgpkit-parser filters. Both BgpStreamConfig and LiveConfig support .with_filters() and .add_filter():

use bgpkit_parser::Filter;
use bgpflux::{BgpStreamConfig, DataType};

let config = BgpStreamConfig::new(
    "2025-01-15T12:00:00Z",
    "2025-01-15T13:00:00Z",
    &["rrc00"],
    DataType::Update,
)?
.add_filter(Filter::new("origin_asn", "13335")?)
.add_filter(Filter::new("ip_version", "ipv6")?);

Supported filter keys: origin_asn, prefix, prefix_sub, prefix_super, peer_ip, peer_ips, peer_asn, as_path, community, ip_version, type.

Performance

bgpflux achieves throughput on par with bgpreader. See performance.md.

Acknowledgments

This project uses code copied and adapted from:

License

MIT

About

BGP live and historical data preprocessing. Similar to BGPStream, but written in Rust on top of BGPKIT.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages