A modern Python SDK for connecting to FFXIV EventStream with async/await support, comprehensive type safety, and powerful analytics integration.
🚀 Modern Python: Built for Python 3.10+ with full type hints and async/await support
🔌 WebSocket Client: Async WebSocket connection with automatic reconnection and exponential backoff
📊 Type Safety: Pydantic models for complete type validation and IDE support
🔄 Event Streaming: Iterator-based streaming with async for loops
⚡ Rich Filtering: Chainable operators for event filtering and transformation
📈 Analytics Ready: DuckDB integration for data science workflows
🛡️ Error Handling: Comprehensive error handling with recoverable/non-recoverable classification
📝 Excellent DX: Great developer experience with clear APIs and comprehensive documentation
pip install eventstream-ffxivpip install eventstream-ffxiv[analytics]pip install eventstream-ffxiv[dev]import asyncio
from eventstream_ffxiv import EventStreamClient, EventType
async def main():
# Connect to EventStream server
client = EventStreamClient("ws://localhost:8080/events")
async with client.stream():
async for event in client.events():
if event.type == EventType.ACTION_HIT:
data = event.data
print(f"{data.actor.name} -> {data.target.name}: {data.result.damage} damage")
asyncio.run(main())import asyncio
from eventstream_ffxiv import EventStreamClient, EventType, filters
async def damage_tracker():
client = EventStreamClient("ws://localhost:8080/events")
# Create filtered stream for high-damage party events
stream = (filters.from_client(client)
.where_type(EventType.ACTION_HIT)
.where_party_only()
.where_damage_threshold(10000)
.throttle(1.0)) # Limit to 1 event per second
async with client.stream():
async for event in stream:
data = event.data
damage_text = f"{data.result.damage:,}"
if data.result.crit:
damage_text += " CRIT!"
print(f"💥 {data.actor.name}: {damage_text} damage")
asyncio.run(damage_tracker())The main client for connecting to EventStream servers:
from eventstream_ffxiv import EventStreamClient, EventType, FFXIVJob
# Basic client
client = EventStreamClient("ws://localhost:8080/events")
# Advanced configuration
client = EventStreamClient(
"wss://my-server.com/events",
client_name="MyOverlay",
event_filter=[EventType.ACTION_HIT, EventType.DEATH], # Only these events
use_message_pack=True, # Use MessagePack for efficiency
auto_reconnect=True,
max_reconnect_attempts=10,
debug=True
)
# Connection management
async with client.stream():
# Client automatically connects and disconnects
stats = client.get_stats()
print(f"Events received: {stats.events_received}")
async for event in client.events():
print(f"Event: {event.type}")All events are strongly typed with Pydantic models:
from eventstream_ffxiv import EventEnvelope, ActionHitEvent, get_typed_envelope
async for event in client.events():
# Generic envelope
print(f"Event type: {event.type}")
print(f"Sequence: {event.seq}")
print(f"Timestamp: {event.ts}")
print(f"Encounter: {event.encounter_id}")
# Type-safe event handling
if event.type == EventType.ACTION_HIT:
# Get typed envelope for better IDE support
typed_event = get_typed_envelope(event)
# Now you have full type safety
actor = typed_event.data.actor
if actor.job == FFXIVJob.WHM:
print(f"Healer {actor.name} did {typed_event.data.result.damage} damage")Rich filtering and transformation capabilities:
from eventstream_ffxiv import filters, EventType, FFXIVJob, ChatChannel
# Create stream from client
stream = filters.from_client(client)
# Type filtering
damage_events = stream.where_type(EventType.ACTION_HIT)
chat_events = stream.where_type(EventType.CHAT_MESSAGE)
# Actor filtering
self_events = stream.where_self_only()
party_events = stream.where_party_only()
specific_actor = stream.where_actor("p:slot-1")
# Advanced filtering
high_damage = (stream
.where_type(EventType.ACTION_HIT)
.where_damage_threshold(50000)
.where(lambda e: e.data.result.crit))
healer_actions = (stream
.where_type(EventType.ACTION_CAST_START)
.where_job(FFXIVJob.WHM))
party_chat = stream.where_chat_channel(ChatChannel.PARTY)
# Timing operations
throttled = stream.throttle(0.5) # Max 2 events per second
debounced = stream.debounce(1.0) # Only after 1s of silence
sampled = stream.sample(5.0) # Sample every 5 seconds
# Windowing
async for batch in stream.buffer_time(10.0): # 10-second batches
print(f"Received {len(batch)} events in 10 seconds")
async for batch in stream.buffer_count(100): # 100-event batches
print(f"Batch of 100 events")
# Transformation
damage_amounts = stream.select(lambda e: e.data.result.damage if e.type == EventType.ACTION_HIT else 0)
actor_names = stream.select_actors().select(lambda actors: [a.name for a in actors])
# Utility operations
first_10 = stream.take(10)
skip_first_5 = stream.skip(5)
unique_events = stream.distinct(lambda e: (e.type, e.seq))
# Terminal operations
first_event = await stream.first()
first_damage = await stream.first(lambda e: e.type == EventType.ACTION_HIT)
event_count = await stream.count()
all_events = await stream.to_list(max_count=1000)Convenient factory functions for common filters:
from eventstream_ffxiv import filters
# Use factory functions
stream = (filters.from_client(client)
.pipe(filters.action_hit()) # ACTION_HIT events only
.pipe(filters.party_only()) # Party members only
.pipe(filters.high_damage(25000)) # >25k damage
.pipe(filters.critical_hits())) # Critical hits only
# Combine multiple filters
healing_stream = (filters.from_client(client)
.pipe(filters.action_hit())
.where(lambda e: e.data.result.healing and e.data.result.healing > 0))Store and analyze events with DuckDB:
import asyncio
from eventstream_ffxiv import EventStreamClient, filters, sinks
async def analytics_example():
client = EventStreamClient("ws://localhost:8080/events")
# Create filtered stream for analysis
combat_stream = (filters.from_client(client)
.where_type(EventType.ACTION_HIT)
.where_party_only()
.take(10000)) # Collect 10k events
# Store in DuckDB
async with sinks.create_file_sink("combat_analysis.db") as sink:
await sink.sink_stream(combat_stream)
# Analyze damage by actor
damage_summary = sink.get_damage_summary()
print(damage_summary)
# Custom analytics queries
dps_by_job = sink.query("""
SELECT
a.job,
COUNT(*) as hit_count,
SUM(ah.damage) as total_damage,
AVG(ah.damage) as avg_damage,
SUM(ah.damage) / (MAX(e.ts) - MIN(e.ts)) * 1000 as dps
FROM action_hit ah
JOIN events e ON ah.event_id = e.id
JOIN actors a ON ah.actor_id = a.id
WHERE ah.damage > 0 AND a.job IS NOT NULL
GROUP BY a.job
ORDER BY dps DESC
""")
print(dps_by_job)
asyncio.run(analytics_example())Combine streaming with analytics:
async def real_time_analytics():
client = EventStreamClient("ws://localhost:8080/events")
sink = sinks.create_memory_sink(batch_size=100)
# Stream damage events to analytics
damage_stream = (filters.from_client(client)
.where_type(EventType.ACTION_HIT)
.where_party_only())
async with client.stream():
# Process events in real-time
async for event in damage_stream:
# Store in analytics DB
await sink.insert_event(event)
# Real-time summary every 100 events
if sink.get_stats().events_received % 100 == 0:
summary = sink.get_damage_summary()
top_dps = summary.iloc[0] if not summary.empty else None
if top_dps:
print(f"Current top DPS: {top_dps['actor_name']} ({top_dps['total_damage']:,})")from eventstream_ffxiv import EventStreamClient, ConnectionState
client = EventStreamClient("ws://localhost:8080/events", debug=True)
# Manual connection management
await client.connect()
await client.wait_connected(timeout=10.0)
# Monitor connection state
async for state in client.subscribe_state_changes():
if state == ConnectionState.CONNECTED:
print("✅ Connected to EventStream")
elif state == ConnectionState.RECONNECTING:
print("🔄 Reconnecting...")
elif state == ConnectionState.ERROR:
print("❌ Connection error")
# Get connection statistics
stats = client.get_stats()
print(f"Uptime: {stats.uptime_ms}ms")
print(f"Events received: {stats.events_received}")
print(f"Using MessagePack: {stats.using_message_pack}")from eventstream_ffxiv import EventStreamClient, EventStreamError
try:
client = EventStreamClient("ws://invalid-url")
await client.connect()
except EventStreamError as e:
if e.recoverable:
print(f"Recoverable error: {e}")
# Could retry later
else:
print(f"Fatal error: {e}")
# Should not retryfrom eventstream_ffxiv import PerformanceMonitor, timed_operation
monitor = PerformanceMonitor()
async with timed_operation(monitor, "event_processing"):
async for event in client.events():
monitor.increment("events_processed")
# Process event...
if monitor.get_counter("events_processed") % 1000 == 0:
stats = monitor.get_timer_stats("event_processing")
print(f"Processed 1000 events in {stats['avg']:.3f}s avg")from eventstream_ffxiv import utils, EventType
async def process_events():
async for event in client.events():
# Validate event
issues = utils.validate_event_envelope(event)
if issues:
print(f"Invalid event: {issues}")
continue
# Extract summary
summary = utils.extract_event_summary(event)
age = utils.event_age_seconds(event)
print(f"[{age:.1f}s ago] {summary}")
# Actor analysis
if event.type == EventType.ACTION_HIT:
actor = event.data.actor
if utils.is_party_actor(actor):
role = utils.get_job_role(actor.job) if actor.job else "unknown"
slot = utils.get_party_slot(actor)
print(f" Party {role} in slot {slot}")Fired when an actor begins casting an action:
{
"type": "actionCastStart",
"data": {
"actor": {"id": "p:self", "job": "WHM", "level": 90},
"action": {"id": 25859, "name": "Glare III"},
"target": {"id": "e:1"},
"castTimeMs": 1400
}
}Fired when an action connects with a target:
{
"type": "actionHit",
"data": {
"actor": {"id": "p:self"},
"target": {"id": "e:1"},
"action": {"id": 25859},
"result": {
"damage": 42125,
"crit": true,
"dh": false
}
}
}Fired when an entity gains a status effect:
{
"type": "statusGain",
"data": {
"target": {"id": "p:self"},
"status": {
"id": 1876,
"stacks": 1,
"durationMs": 15000,
"src": {"id": "p:self"}
}
}
}Fired when an entity loses a status effect:
{
"type": "statusLose",
"data": {
"target": {"id": "p:self"},
"status": {
"id": 1876,
"remainingStacks": 0
}
}
}Fired when an entity dies:
{
"type": "death",
"data": {
"actor": {"id": "p:slot-1"},
"by": {
"actionId": 12345,
"src": {"id": "e:1"}
}
}
}Chat messages from allowed channels:
{
"type": "chatMessage",
"data": {
"channel": "party",
"sender": {"id": "p:self"},
"text": "LB ready!",
"timestamp": "2024-01-15T10:30:00Z"
}
}The SDK provides complete type safety with Pydantic models:
from eventstream_ffxiv import EventEnvelope, ActionHitData, EventType
# Type-safe event handling
def handle_damage_event(event: EventEnvelope) -> None:
if event.type == EventType.ACTION_HIT:
# Type checker knows this is ActionHitData
data: ActionHitData = event.data
# Full autocompletion and type checking
actor_name = data.actor.name or data.actor.id
damage = data.result.damage
was_crit = data.result.crit or False
print(f"{actor_name} dealt {damage} damage{' (CRIT)' if was_crit else ''}")Always use context managers for automatic cleanup:
# ✅ Good - automatic cleanup
async with client.stream():
async for event in client.events():
process_event(event)
# ✅ Good - manual cleanup
client = EventStreamClient("ws://localhost:8080/events")
try:
await client.connect()
# Process events...
finally:
await client.disconnect()Use appropriate batch sizes and filtering:
# ✅ Good - filter early
stream = (filters.from_client(client)
.where_type(EventType.ACTION_HIT) # Filter early
.where_party_only() # Reduce data volume
.throttle(0.1)) # Prevent spam
# ✅ Good - batch analytics writes
async with sinks.create_file_sink("data.db", batch_size=1000) as sink:
await sink.sink_stream(stream)Handle both recoverable and non-recoverable errors:
from eventstream_ffxiv import EventStreamError
try:
await client.connect()
except EventStreamError as e:
if e.recoverable:
# Log and potentially retry
logger.warning(f"Connection failed: {e}")
await asyncio.sleep(5)
# Retry logic...
else:
# Fatal error, don't retry
logger.error(f"Fatal connection error: {e}")
raiseFind more examples in the examples directory:
- Basic Overlay - Simple damage meter overlay
- Combat Logger - Log combat events to file
- Real-time Analytics - Live DPS tracking
- Chat Bot - Respond to party chat
- Raid Analysis - Post-raid performance analysis
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
This project is licensed under the MIT License - see the LICENSE file for details.
- EventStream Plugin - FFXIV Dalamud plugin that generates events
- EventStream TypeScript SDK - TypeScript/JavaScript SDK for web applications
- EventStream C# SDK - C# SDK for .NET applications