A Rust library and CLI tool for historical BGP data analysis.
Similar to bgpreader, it streams BGP elements in chronological order from:
- Multiple RIPE RIS and RouteViews route collectors
- Arbitrary time ranges
- Both RIBs and updates
- Archive & Live streaming: Query historical archives via BGPKIT Broker, or tap into real-time feeds from RIS Live (WebSocket) and RouteViews Live (Kafka)
- Chronologically sorted output: Elements from multiple collectors are merged in timestamp order via k-way merge
- High performance: Throughput on par with bgpreader
- Filtering: Origin ASN, prefix, peer IP/ASN, AS path regex, community regex, IP version
- Caching: Optional local file caching to skip re-downloading archive data
- Jitter buffer: Reorder live stream elements into chronological order using a configurable delay window
# Archive mode only (default — no WebSocket/Kafka dependencies)
cargo install bgpflux
# With live streaming support (RIS Live + RouteViews Live)
cargo install bgpflux --features live# Archive-only (default)
[dependencies]
bgpflux = "0.2"
# With live streaming
[dependencies]
bgpflux = { version = "0.2", features = ["live"] }You can also enable backends individually with live-ris (RIPE RIS Live via WebSocket) or live-routeviews (RouteViews Live via Kafka).
Stream BGP updates from two collectors over a one-hour window:
bgpflux \
--start "2025-01-15T12:00:00Z" \
--end "2025-01-15T13:00:00Z" \
-c route-views.wide,rrc04 \
-t updateStream RIB dumps:
bgpflux \
--start "2025-01-15T12:00:00Z" \
--end "2025-01-15T13:00:00Z" \
-c route-views.wide,rrc04 \
-t ribStream both RIBs and updates together:
bgpflux \
--start "2025-01-15T12:00:00Z" \
--end "2025-01-15T13:00:00Z" \
-c route-views.wide,rrc04 \
-t rib,updateEnable local caching to avoid re-downloading on repeated runs:
bgpflux \
--start "2025-01-15T12:00:00Z" \
--end "2025-01-15T13:00:00Z" \
-c route-views.wide,rrc04 \
-t update \
--cache-dir ./bgp_cacheStream real-time BGP updates from RIS and RouteViews collectors:
bgpflux --live -c rrc00,route-views2Use a jitter buffer (in seconds) to reorder elements by timestamp:
bgpflux --live -c rrc00,route-views2 --delay 15Filter by origin ASN:
bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
-c rrc00 -t update -o 13335Filter by prefix:
bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
-c rrc00 -t update -p 1.1.1.0/24Filter by AS path regex:
bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
-c rrc00 -t update -a "13335$"IPv6 only:
bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
-c rrc00 -t update -6Generic filter expressions:
bgpflux --start "2025-01-15T12:00:00Z" --end "2025-01-15T13:00:00Z" \
-c rrc00 -t update \
-f "origin_asn!=13335" -f "peer_asn=15169"use bgpflux::{BgpStream, BgpStreamConfig, DataType};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BgpStreamConfig::new(
"2025-01-15T12:00:00Z",
"2025-01-15T13:00:00Z",
&["route-views.wide", "rrc04"],
DataType::Update,
)?;
let stream = BgpStream::new(config).build();
for elem in stream {
println!("{}", elem);
}
Ok(())
}See the examples/ directory for more:
| Example | Description |
|---|---|
basic_stream |
Stream updates from multiple collectors |
cached_stream |
Use local caching to speed up repeated queries |
filtered_stream |
Apply filters (origin ASN, prefix, AS path) |
rib_snapshot |
Collect unique prefixes from a RIB dump |
live_stream |
Real-time BGP streaming with a jitter buffer (requires live feature) |
elem_fields |
Access individual fields of BGP elements |
bgpflux outputs pipe-delimited records, one per line:
Type|Timestamp|PeerIP|PeerASN|Prefix|AsPath|Origin|NextHop|LocalPref|MED|Communities|Atomic|AggrASN|AggrIP|Collector
Where Type is A (announce), W (withdraw), or R (RIB entry). Example:
A|1736942400.469656|80.249.211.96|50629|193.186.6.0/24|50629 6939 47541|IGP|80.249.211.96|0|0|50629:500 50629:2000 50629:6939|false|||rrc04
W|1736942400.469656|80.249.211.96|50629|2a0e:b107:1a0::/44||||||||||rrc04
R|1736942400|80.249.211.96|50629|193.186.6.0/24|50629 6939 47541|IGP|80.249.211.96|0|0||false|||rrc04
Main interface for streaming historical BGP data. Queries BGPKIT Broker to discover archive files, downloads and parses them, and merges elements from all collectors in timestamp order.
use bgpflux::{BgpStream, BgpStreamConfig, DataType};
let config = BgpStreamConfig::new(
"2025-01-15T12:00:00Z",
"2025-01-15T13:00:00Z",
&["route-views.wide"],
DataType::Update,
).unwrap();
let stream = BgpStream::new(config)
.cache_dir("./bgp_cache") // optional
.build();Configuration builder for archive streams. Supports multiple timestamp formats:
- RFC 3339:
"2025-01-15T12:00:00Z","2025-01-15T12:00:00+02:00" - ISO 8601 without timezone (assumed UTC):
"2025-01-15T12:00:00" - Date only (midnight UTC):
"2025-01-15","2025/01/15","20250115" - Unix timestamp:
"1736942400"
A single BGP element. Wraps bgpkit_parser::BgpElem and adds a collector_id and elem_type. Implements Deref<Target = BgpElem>, so all BgpElem fields (timestamp, prefix, as_path, peer_ip, etc.) are directly accessible.
for elem in stream {
println!(
"{} {} via AS{} from {}",
elem.elem_type, elem.prefix, elem.peer_asn, elem.collector_id
);
}Real-time streaming from RIS Live (WebSocket) and RouteViews Live (Kafka). Automatically routes collectors to the right backend based on their name (rrc* → RIS, route-views* → RouteViews).
use bgpflux::{LiveBgpStream, LiveConfig, JitterBufferExt};
use std::time::Duration;
let config = LiveConfig::new(&["rrc00", "route-views2"]).unwrap();
let stream = LiveBgpStream::new(config)
.build()
.jitter_buffer(Duration::from_secs(15));
for elem in stream {
println!("{}", elem);
}Extension trait on any Iterator<Item = BgpStreamElem>. Wraps the iterator in a binary-heap-based reordering buffer that holds elements for a configurable delay window before emitting them in timestamp order. Useful for live streams where elements from different collectors may arrive slightly out of order.
Filters are applied at the parser level using bgpkit-parser filters. Both BgpStreamConfig and LiveConfig support .with_filters() and .add_filter():
use bgpkit_parser::Filter;
use bgpflux::{BgpStreamConfig, DataType};
let config = BgpStreamConfig::new(
"2025-01-15T12:00:00Z",
"2025-01-15T13:00:00Z",
&["rrc00"],
DataType::Update,
)?
.add_filter(Filter::new("origin_asn", "13335")?)
.add_filter(Filter::new("ip_version", "ipv6")?);Supported filter keys: origin_asn, prefix, prefix_sub, prefix_super, peer_ip, peer_ips, peer_asn, as_path, community, ip_version, type.
bgpflux achieves throughput on par with bgpreader. See performance.md.
This project uses code copied and adapted from: