102 lines
2.9 KiB
Python
102 lines
2.9 KiB
Python
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from hydra.data.models import Candle
|
|
from hydra.data.collector import ExchangeCollector
|
|
|
|
|
|
def make_candle() -> Candle:
|
|
return Candle(
|
|
market="binance", symbol="BTC/USDT", timeframe="1m",
|
|
open_time=1_000_000, open=50000.0, high=50100.0,
|
|
low=49900.0, close=50050.0, volume=1.5,
|
|
close_time=1_059_999,
|
|
)
|
|
|
|
|
|
async def _gen_one(candle: Candle):
|
|
yield candle
|
|
|
|
|
|
async def _gen_raise(exc: Exception):
|
|
raise exc
|
|
yield # makes this an async generator
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_store():
|
|
s = AsyncMock()
|
|
s.save = AsyncMock()
|
|
s.get_last_time = AsyncMock(return_value=None)
|
|
return s
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_telegram():
|
|
t = AsyncMock()
|
|
t.send_message = AsyncMock()
|
|
return t
|
|
|
|
|
|
async def test_saves_candle_on_receive(mock_store, mock_telegram):
|
|
"""Received candle is forwarded to store.save."""
|
|
candle = make_candle()
|
|
handler = MagicMock()
|
|
handler.listen = MagicMock(return_value=_gen_one(candle))
|
|
|
|
collector = ExchangeCollector(
|
|
market="binance", handler=handler, store=mock_store,
|
|
backfiller=AsyncMock(), telegram=mock_telegram,
|
|
)
|
|
await collector._run_once()
|
|
mock_store.save.assert_awaited_once_with([candle])
|
|
|
|
|
|
async def test_telegram_alert_after_3_consecutive_failures(mock_store, mock_telegram):
|
|
"""Three consecutive disconnects trigger a Telegram alert."""
|
|
handler = MagicMock()
|
|
handler.listen = MagicMock(
|
|
side_effect=lambda: _gen_raise(ConnectionError("down"))
|
|
)
|
|
backfiller = AsyncMock()
|
|
|
|
collector = ExchangeCollector(
|
|
market="binance", handler=handler, store=mock_store,
|
|
backfiller=backfiller, telegram=mock_telegram,
|
|
max_delay=0.001,
|
|
)
|
|
await collector._run_with_retry(max_attempts=3)
|
|
|
|
mock_telegram.send_message.assert_awaited_once()
|
|
assert "binance" in mock_telegram.send_message.call_args[0][0]
|
|
|
|
|
|
async def test_gap_backfill_called_on_reconnect(mock_store, mock_telegram):
|
|
"""After a disconnect, gap_backfill is called on the next connection."""
|
|
candle = make_candle()
|
|
call_count = 0
|
|
|
|
async def listen_impl():
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
raise ConnectionError("first attempt fails")
|
|
yield candle
|
|
|
|
handler = MagicMock()
|
|
handler.listen = MagicMock(side_effect=listen_impl)
|
|
|
|
backfiller = AsyncMock()
|
|
backfiller.gap_backfill = AsyncMock()
|
|
fetch_fn_factory = MagicMock(return_value=AsyncMock(return_value=[]))
|
|
|
|
collector = ExchangeCollector(
|
|
market="binance", handler=handler, store=mock_store,
|
|
backfiller=backfiller, telegram=mock_telegram,
|
|
symbols=["BTC/USDT"], timeframes=["1m"],
|
|
fetch_fn_factory=fetch_fn_factory,
|
|
max_delay=0.001,
|
|
)
|
|
await collector._run_with_retry(max_attempts=2)
|
|
|
|
backfiller.gap_backfill.assert_awaited()
|