Files
Hydra-Engine/tests/test_websocket_reconnect.py
2026-03-30 13:19:11 +09:00

102 lines
2.9 KiB
Python

import pytest
from unittest.mock import AsyncMock, MagicMock
from hydra.data.models import Candle
from hydra.data.collector import ExchangeCollector
def make_candle() -> Candle:
return Candle(
market="binance", symbol="BTC/USDT", timeframe="1m",
open_time=1_000_000, open=50000.0, high=50100.0,
low=49900.0, close=50050.0, volume=1.5,
close_time=1_059_999,
)
async def _gen_one(candle: Candle):
yield candle
async def _gen_raise(exc: Exception):
raise exc
yield # makes this an async generator
@pytest.fixture
def mock_store():
s = AsyncMock()
s.save = AsyncMock()
s.get_last_time = AsyncMock(return_value=None)
return s
@pytest.fixture
def mock_telegram():
t = AsyncMock()
t.send_message = AsyncMock()
return t
async def test_saves_candle_on_receive(mock_store, mock_telegram):
"""Received candle is forwarded to store.save."""
candle = make_candle()
handler = MagicMock()
handler.listen = MagicMock(return_value=_gen_one(candle))
collector = ExchangeCollector(
market="binance", handler=handler, store=mock_store,
backfiller=AsyncMock(), telegram=mock_telegram,
)
await collector._run_once()
mock_store.save.assert_awaited_once_with([candle])
async def test_telegram_alert_after_3_consecutive_failures(mock_store, mock_telegram):
"""Three consecutive disconnects trigger a Telegram alert."""
handler = MagicMock()
handler.listen = MagicMock(
side_effect=lambda: _gen_raise(ConnectionError("down"))
)
backfiller = AsyncMock()
collector = ExchangeCollector(
market="binance", handler=handler, store=mock_store,
backfiller=backfiller, telegram=mock_telegram,
max_delay=0.001,
)
await collector._run_with_retry(max_attempts=3)
mock_telegram.send_message.assert_awaited_once()
assert "binance" in mock_telegram.send_message.call_args[0][0]
async def test_gap_backfill_called_on_reconnect(mock_store, mock_telegram):
"""After a disconnect, gap_backfill is called on the next connection."""
candle = make_candle()
call_count = 0
async def listen_impl():
nonlocal call_count
call_count += 1
if call_count == 1:
raise ConnectionError("first attempt fails")
yield candle
handler = MagicMock()
handler.listen = MagicMock(side_effect=listen_impl)
backfiller = AsyncMock()
backfiller.gap_backfill = AsyncMock()
fetch_fn_factory = MagicMock(return_value=AsyncMock(return_value=[]))
collector = ExchangeCollector(
market="binance", handler=handler, store=mock_store,
backfiller=backfiller, telegram=mock_telegram,
symbols=["BTC/USDT"], timeframes=["1m"],
fetch_fn_factory=fetch_fn_factory,
max_delay=0.001,
)
await collector._run_with_retry(max_attempts=2)
backfiller.gap_backfill.assert_awaited()