135 lines
3.3 KiB
Python
135 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from logging import DEBUG
|
|
from typing import Dict, Any, List
|
|
|
|
import pytest
|
|
|
|
from main import read_config
|
|
from zvk.bot.bot import Bot
|
|
from zvk.event.event import Event
|
|
from zvk.plugins.vk.event_type import ParsedEventType
|
|
from zvk.plugins.vk.message_parser import Message
|
|
from zvk.util.db import Database
|
|
from zvk.util.zlogging import logger
|
|
from zvk.plugins.vk.api import VKApi
|
|
|
|
logger.setLevel(DEBUG)
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def db(bot) -> Database:
|
|
return bot.db
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def api() -> TestingVKApi:
|
|
api = TestingVKApi()
|
|
|
|
api.expect('users.get').set_result([{'id': 111, 'first_name': 'testing', 'last_name': 'testing'}])
|
|
|
|
return api
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def bot(api) -> TestingBot:
|
|
return TestingBot(api)
|
|
|
|
|
|
class TestingBot(Bot):
|
|
def __init__(self, api: TestingVKApi):
|
|
test_bot_config = read_config()
|
|
test_bot_config['db_url'] = 'sqlite:///:memory:'
|
|
test_bot_config['plugins']['whitelist'] = [
|
|
'vk.command_parser',
|
|
'vk.event_saver',
|
|
'vk.message_parser',
|
|
'init.identify_self',
|
|
'init.permissions'
|
|
]
|
|
|
|
super().__init__(config=test_bot_config)
|
|
self.db.create_all()
|
|
|
|
self.testing_counter = 0
|
|
|
|
self.api = api
|
|
|
|
def dummy_message_event(self, text: str, from_id=None):
|
|
return Event(ParsedEventType.MESSAGE, message=Message(
|
|
message_id=123,
|
|
from_id=123 if from_id is None else from_id,
|
|
peer_id=123 if from_id is None else from_id,
|
|
to_id=0,
|
|
flags=0,
|
|
timestamp=datetime.utcnow(),
|
|
text=text,
|
|
extra_fields_json='{}',
|
|
attachments_json='{}',
|
|
random_id=0,
|
|
is_outgoing=False,
|
|
is_bot_message=False,
|
|
))
|
|
|
|
|
|
@dataclass
|
|
class MockVKApiCall:
|
|
method_name: str
|
|
params: Dict[str, Any]
|
|
result: Any = None
|
|
single_use: bool = True
|
|
|
|
def does_match(self, method_name: str, params: Dict[str, Any]) -> bool:
|
|
if self.method_name != method_name:
|
|
return False
|
|
|
|
if set(self.params.keys()) != set(params.keys()):
|
|
return False
|
|
|
|
for key in self.params.keys():
|
|
expected_value = self.params[key]
|
|
actual_value = params[key]
|
|
|
|
if expected_value == '*':
|
|
continue
|
|
|
|
if expected_value != actual_value:
|
|
return False
|
|
|
|
return True
|
|
|
|
def set_result(self, result) -> MockVKApiCall:
|
|
self.result = result
|
|
return self
|
|
|
|
def set_single_use(self, single_use) -> MockVKApiCall:
|
|
self.single_use = single_use
|
|
return self
|
|
|
|
|
|
class TestingVKApi(VKApi):
|
|
_expectations: List[MockVKApiCall]
|
|
|
|
# noinspection PyMissingConstructor
|
|
def __init__(self):
|
|
self._expectations = []
|
|
|
|
def expect(self, method_name, **params):
|
|
mock_vk_api_call = MockVKApiCall(method_name=method_name, params=params)
|
|
self._expectations.append(mock_vk_api_call)
|
|
return mock_vk_api_call
|
|
|
|
async def call_method(self, method_name, **params):
|
|
for expectation in self._expectations:
|
|
if expectation.does_match(method_name, params):
|
|
if expectation.single_use:
|
|
self._expectations.remove(expectation)
|
|
|
|
logger.debug(f'Expected api call {method_name}({params}) -> {expectation.result}')
|
|
return expectation.result
|
|
|
|
raise ValueError(f'Unexpected api call {method_name}({params})')
|