Skip to content

Latest commit

 

History

History
594 lines (464 loc) · 16.7 KB

File metadata and controls

594 lines (464 loc) · 16.7 KB

EventStream FFXIV Python SDK

A modern Python SDK for connecting to FFXIV EventStream with async/await support, comprehensive type safety, and powerful analytics integration.

Python 3.10+ AsyncIO Type Hints License: MIT

Features

🚀 Modern Python: Built for Python 3.10+ with full type hints and async/await support
🔌 WebSocket Client: Async WebSocket connection with automatic reconnection and exponential backoff
📊 Type Safety: Pydantic models for complete type validation and IDE support
🔄 Event Streaming: Iterator-based streaming with async for loops
Rich Filtering: Chainable operators for event filtering and transformation
📈 Analytics Ready: DuckDB integration for data science workflows
🛡️ Error Handling: Comprehensive error handling with recoverable/non-recoverable classification
📝 Excellent DX: Great developer experience with clear APIs and comprehensive documentation

Installation

Basic Installation

pip install eventstream-ffxiv

With Analytics Support

pip install eventstream-ffxiv[analytics]

Development Installation

pip install eventstream-ffxiv[dev]

Quick Start

Basic Event Streaming

import asyncio
from eventstream_ffxiv import EventStreamClient, EventType

async def main():
    # Connect to EventStream server
    client = EventStreamClient("ws://localhost:8080/events")
    
    async with client.stream():
        async for event in client.events():
            if event.type == EventType.ACTION_HIT:
                data = event.data
                print(f"{data.actor.name} -> {data.target.name}: {data.result.damage} damage")

asyncio.run(main())

Advanced Filtering

import asyncio
from eventstream_ffxiv import EventStreamClient, EventType, filters

async def damage_tracker():
    client = EventStreamClient("ws://localhost:8080/events")
    
    # Create filtered stream for high-damage party events
    stream = (filters.from_client(client)
              .where_type(EventType.ACTION_HIT)
              .where_party_only()
              .where_damage_threshold(10000)
              .throttle(1.0))  # Limit to 1 event per second
    
    async with client.stream():
        async for event in stream:
            data = event.data
            damage_text = f"{data.result.damage:,}"
            if data.result.crit:
                damage_text += " CRIT!"
            
            print(f"💥 {data.actor.name}: {damage_text} damage")

asyncio.run(damage_tracker())

Core Components

EventStreamClient

The main client for connecting to EventStream servers:

from eventstream_ffxiv import EventStreamClient, EventType, FFXIVJob

# Basic client
client = EventStreamClient("ws://localhost:8080/events")

# Advanced configuration
client = EventStreamClient(
    "wss://my-server.com/events",
    client_name="MyOverlay",
    event_filter=[EventType.ACTION_HIT, EventType.DEATH],  # Only these events
    use_message_pack=True,  # Use MessagePack for efficiency
    auto_reconnect=True,
    max_reconnect_attempts=10,
    debug=True
)

# Connection management
async with client.stream():
    # Client automatically connects and disconnects
    stats = client.get_stats()
    print(f"Events received: {stats.events_received}")
    
    async for event in client.events():
        print(f"Event: {event.type}")

Event Models

All events are strongly typed with Pydantic models:

from eventstream_ffxiv import EventEnvelope, ActionHitEvent, get_typed_envelope

async for event in client.events():
    # Generic envelope
    print(f"Event type: {event.type}")
    print(f"Sequence: {event.seq}")
    print(f"Timestamp: {event.ts}")
    print(f"Encounter: {event.encounter_id}")
    
    # Type-safe event handling
    if event.type == EventType.ACTION_HIT:
        # Get typed envelope for better IDE support
        typed_event = get_typed_envelope(event)
        
        # Now you have full type safety
        actor = typed_event.data.actor
        if actor.job == FFXIVJob.WHM:
            print(f"Healer {actor.name} did {typed_event.data.result.damage} damage")

Event Filtering

Rich filtering and transformation capabilities:

from eventstream_ffxiv import filters, EventType, FFXIVJob, ChatChannel

# Create stream from client
stream = filters.from_client(client)

# Type filtering
damage_events = stream.where_type(EventType.ACTION_HIT)
chat_events = stream.where_type(EventType.CHAT_MESSAGE)

# Actor filtering
self_events = stream.where_self_only()
party_events = stream.where_party_only()
specific_actor = stream.where_actor("p:slot-1")

# Advanced filtering
high_damage = (stream
               .where_type(EventType.ACTION_HIT)
               .where_damage_threshold(50000)
               .where(lambda e: e.data.result.crit))

healer_actions = (stream
                  .where_type(EventType.ACTION_CAST_START)
                  .where_job(FFXIVJob.WHM))

party_chat = stream.where_chat_channel(ChatChannel.PARTY)

# Timing operations
throttled = stream.throttle(0.5)  # Max 2 events per second
debounced = stream.debounce(1.0)  # Only after 1s of silence
sampled = stream.sample(5.0)      # Sample every 5 seconds

# Windowing
async for batch in stream.buffer_time(10.0):  # 10-second batches
    print(f"Received {len(batch)} events in 10 seconds")

async for batch in stream.buffer_count(100):  # 100-event batches
    print(f"Batch of 100 events")

# Transformation
damage_amounts = stream.select(lambda e: e.data.result.damage if e.type == EventType.ACTION_HIT else 0)
actor_names = stream.select_actors().select(lambda actors: [a.name for a in actors])

# Utility operations
first_10 = stream.take(10)
skip_first_5 = stream.skip(5)
unique_events = stream.distinct(lambda e: (e.type, e.seq))

# Terminal operations
first_event = await stream.first()
first_damage = await stream.first(lambda e: e.type == EventType.ACTION_HIT)
event_count = await stream.count()
all_events = await stream.to_list(max_count=1000)

Factory Functions

Convenient factory functions for common filters:

from eventstream_ffxiv import filters

# Use factory functions
stream = (filters.from_client(client)
          .pipe(filters.action_hit())        # ACTION_HIT events only
          .pipe(filters.party_only())        # Party members only  
          .pipe(filters.high_damage(25000))  # >25k damage
          .pipe(filters.critical_hits()))    # Critical hits only

# Combine multiple filters
healing_stream = (filters.from_client(client)
                  .pipe(filters.action_hit())
                  .where(lambda e: e.data.result.healing and e.data.result.healing > 0))

Analytics Integration

DuckDB Integration

Store and analyze events with DuckDB:

import asyncio
from eventstream_ffxiv import EventStreamClient, filters, sinks

async def analytics_example():
    client = EventStreamClient("ws://localhost:8080/events")
    
    # Create filtered stream for analysis
    combat_stream = (filters.from_client(client)
                     .where_type(EventType.ACTION_HIT)
                     .where_party_only()
                     .take(10000))  # Collect 10k events
    
    # Store in DuckDB
    async with sinks.create_file_sink("combat_analysis.db") as sink:
        await sink.sink_stream(combat_stream)
        
        # Analyze damage by actor
        damage_summary = sink.get_damage_summary()
        print(damage_summary)
        
        # Custom analytics queries
        dps_by_job = sink.query("""
            SELECT 
                a.job,
                COUNT(*) as hit_count,
                SUM(ah.damage) as total_damage,
                AVG(ah.damage) as avg_damage,
                SUM(ah.damage) / (MAX(e.ts) - MIN(e.ts)) * 1000 as dps
            FROM action_hit ah
            JOIN events e ON ah.event_id = e.id
            JOIN actors a ON ah.actor_id = a.id
            WHERE ah.damage > 0 AND a.job IS NOT NULL
            GROUP BY a.job
            ORDER BY dps DESC
        """)
        print(dps_by_job)

asyncio.run(analytics_example())

Real-time Analytics

Combine streaming with analytics:

async def real_time_analytics():
    client = EventStreamClient("ws://localhost:8080/events")
    sink = sinks.create_memory_sink(batch_size=100)
    
    # Stream damage events to analytics
    damage_stream = (filters.from_client(client)
                     .where_type(EventType.ACTION_HIT)
                     .where_party_only())
    
    async with client.stream():
        # Process events in real-time
        async for event in damage_stream:
            # Store in analytics DB
            await sink.insert_event(event)
            
            # Real-time summary every 100 events
            if sink.get_stats().events_received % 100 == 0:
                summary = sink.get_damage_summary()
                top_dps = summary.iloc[0] if not summary.empty else None
                if top_dps:
                    print(f"Current top DPS: {top_dps['actor_name']} ({top_dps['total_damage']:,})")

Advanced Usage

Connection Management

from eventstream_ffxiv import EventStreamClient, ConnectionState

client = EventStreamClient("ws://localhost:8080/events", debug=True)

# Manual connection management
await client.connect()
await client.wait_connected(timeout=10.0)

# Monitor connection state
async for state in client.subscribe_state_changes():
    if state == ConnectionState.CONNECTED:
        print("✅ Connected to EventStream")
    elif state == ConnectionState.RECONNECTING:
        print("🔄 Reconnecting...")
    elif state == ConnectionState.ERROR:
        print("❌ Connection error")

# Get connection statistics
stats = client.get_stats()
print(f"Uptime: {stats.uptime_ms}ms")
print(f"Events received: {stats.events_received}")
print(f"Using MessagePack: {stats.using_message_pack}")

Error Handling

from eventstream_ffxiv import EventStreamClient, EventStreamError

try:
    client = EventStreamClient("ws://invalid-url")
    await client.connect()
except EventStreamError as e:
    if e.recoverable:
        print(f"Recoverable error: {e}")
        # Could retry later
    else:
        print(f"Fatal error: {e}")
        # Should not retry

Performance Monitoring

from eventstream_ffxiv import PerformanceMonitor, timed_operation

monitor = PerformanceMonitor()

async with timed_operation(monitor, "event_processing"):
    async for event in client.events():
        monitor.increment("events_processed")
        
        # Process event...
        
        if monitor.get_counter("events_processed") % 1000 == 0:
            stats = monitor.get_timer_stats("event_processing")
            print(f"Processed 1000 events in {stats['avg']:.3f}s avg")

Custom Event Processing

from eventstream_ffxiv import utils, EventType

async def process_events():
    async for event in client.events():
        # Validate event
        issues = utils.validate_event_envelope(event)
        if issues:
            print(f"Invalid event: {issues}")
            continue
            
        # Extract summary
        summary = utils.extract_event_summary(event)
        age = utils.event_age_seconds(event)
        
        print(f"[{age:.1f}s ago] {summary}")
        
        # Actor analysis
        if event.type == EventType.ACTION_HIT:
            actor = event.data.actor
            if utils.is_party_actor(actor):
                role = utils.get_job_role(actor.job) if actor.job else "unknown"
                slot = utils.get_party_slot(actor)
                print(f"  Party {role} in slot {slot}")

Event Types Reference

ActionCastStart

Fired when an actor begins casting an action:

{
    "type": "actionCastStart",
    "data": {
        "actor": {"id": "p:self", "job": "WHM", "level": 90},
        "action": {"id": 25859, "name": "Glare III"},
        "target": {"id": "e:1"},
        "castTimeMs": 1400
    }
}

ActionHit

Fired when an action connects with a target:

{
    "type": "actionHit", 
    "data": {
        "actor": {"id": "p:self"},
        "target": {"id": "e:1"},
        "action": {"id": 25859},
        "result": {
            "damage": 42125,
            "crit": true,
            "dh": false
        }
    }
}

StatusGain

Fired when an entity gains a status effect:

{
    "type": "statusGain",
    "data": {
        "target": {"id": "p:self"},
        "status": {
            "id": 1876,
            "stacks": 1,
            "durationMs": 15000,
            "src": {"id": "p:self"}
        }
    }
}

StatusLose

Fired when an entity loses a status effect:

{
    "type": "statusLose",
    "data": {
        "target": {"id": "p:self"},
        "status": {
            "id": 1876,
            "remainingStacks": 0
        }
    }
}

Death

Fired when an entity dies:

{
    "type": "death",
    "data": {
        "actor": {"id": "p:slot-1"},
        "by": {
            "actionId": 12345,
            "src": {"id": "e:1"}
        }
    }
}

ChatMessage

Chat messages from allowed channels:

{
    "type": "chatMessage",
    "data": {
        "channel": "party",
        "sender": {"id": "p:self"},
        "text": "LB ready!",
        "timestamp": "2024-01-15T10:30:00Z"
    }
}

Type Safety

The SDK provides complete type safety with Pydantic models:

from eventstream_ffxiv import EventEnvelope, ActionHitData, EventType

# Type-safe event handling
def handle_damage_event(event: EventEnvelope) -> None:
    if event.type == EventType.ACTION_HIT:
        # Type checker knows this is ActionHitData
        data: ActionHitData = event.data
        
        # Full autocompletion and type checking
        actor_name = data.actor.name or data.actor.id
        damage = data.result.damage
        was_crit = data.result.crit or False
        
        print(f"{actor_name} dealt {damage} damage{' (CRIT)' if was_crit else ''}")

Best Practices

Resource Management

Always use context managers for automatic cleanup:

# ✅ Good - automatic cleanup
async with client.stream():
    async for event in client.events():
        process_event(event)

# ✅ Good - manual cleanup
client = EventStreamClient("ws://localhost:8080/events")
try:
    await client.connect()
    # Process events...
finally:
    await client.disconnect()

Performance

Use appropriate batch sizes and filtering:

# ✅ Good - filter early
stream = (filters.from_client(client)
          .where_type(EventType.ACTION_HIT)  # Filter early
          .where_party_only()                # Reduce data volume
          .throttle(0.1))                    # Prevent spam

# ✅ Good - batch analytics writes
async with sinks.create_file_sink("data.db", batch_size=1000) as sink:
    await sink.sink_stream(stream)

Error Handling

Handle both recoverable and non-recoverable errors:

from eventstream_ffxiv import EventStreamError

try:
    await client.connect()
except EventStreamError as e:
    if e.recoverable:
        # Log and potentially retry
        logger.warning(f"Connection failed: {e}")
        await asyncio.sleep(5)
        # Retry logic...
    else:
        # Fatal error, don't retry
        logger.error(f"Fatal connection error: {e}")
        raise

Examples Repository

Find more examples in the examples directory:

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Related Projects