-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket.py
More file actions
78 lines (59 loc) · 2.3 KB
/
websocket.py
File metadata and controls
78 lines (59 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""WebSocket gerçek zamanlı örnek.
Çalıştırma:
ALTINAPI_KEY=hapi_xxx python examples/websocket.py
"""
from __future__ import annotations
import os
import signal
import sys
from datetime import datetime
from altinapi import AltinapiClient
def main() -> None:
api_key = os.environ.get("ALTINAPI_KEY")
if not api_key:
print("ALTINAPI_KEY environment değişkeni eksik", file=sys.stderr)
sys.exit(1)
client = AltinapiClient(api_key=api_key)
# Handler'ları connect()'ten önce kaydedebilirsiniz; SDK biriktirir
@client.on("connect")
def _on_connect() -> None:
print("✅ altinapi WebSocket'e bağlandı")
client.subscribe(["USDTRY", "EURTRY", "ALTIN", "CEYREK_YENI"])
print("📡 Abone olundu: USDTRY, EURTRY, ALTIN, CEYREK_YENI")
@client.on("prices:snapshot")
def _on_snapshot(fiyatlar: list) -> None:
print(f"\n📸 Snapshot alındı ({len(fiyatlar)} sembol)")
for f in fiyatlar[:5]:
print(f" {f['symbol']:<15} {f['bid']} / {f['ask']}")
@client.on("prices:update")
def _on_update(guncellemeler: list) -> None:
for f in guncellemeler:
# ISO 8601 timestamp'i okuyup yerel saate çevir
try:
ts = datetime.fromisoformat(f["timestamp"].replace("Z", "+00:00"))
saat = ts.astimezone().strftime("%H:%M:%S")
except (ValueError, KeyError):
saat = "??:??:??"
print(f"[{saat}] {f['symbol']:<15} alış={f['bid']} satış={f['ask']}")
@client.on("data:stale")
def _on_stale() -> None:
print("⚠️ Kaynak bağlantısı koptu — fiyatlar eski olabilir")
@client.on("data:live")
def _on_live() -> None:
print("✅ Kaynak yeniden bağlandı")
@client.on("disconnect")
def _on_disconnect() -> None:
print("❌ Bağlantı koptu")
@client.on("connect_error")
def _on_error(err) -> None:
print(f"Bağlantı hatası: {err}", file=sys.stderr)
def _shutdown(_signum, _frame) -> None:
print("\n👋 Kapanıyor...")
client.disconnect()
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
print("🔌 altinapi'ye bağlanılıyor... (çıkmak için Ctrl+C)")
client.connect()
client.wait()
if __name__ == "__main__":
main()