From c01603b57d97eba4aac9e62aca9f22da77a2ada0 Mon Sep 17 00:00:00 2001 From: Edi Feschiyan Date: Fri, 22 May 2020 17:05:56 +0300 Subject: updating gps tests --- test_gps.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/test_gps.py b/test_gps.py index 43834fe..a3b59c7 100644 --- a/test_gps.py +++ b/test_gps.py @@ -3,10 +3,11 @@ import os import pytest from gps import GPSService import logging - +from aglbaseservice import AFBResponse, AFBT logger = logging.getLogger('pytest-gps') logger.setLevel(logging.DEBUG) +pytestmark = pytest.mark.asyncio @pytest.fixture(scope="module") def event_loop(): @@ -21,15 +22,36 @@ async def service(): yield gpss await gpss.websocket.close() -@pytest.fixture(scope='module') -async def listener(event_loop, service): - listener = service.listener() +# @pytest.fixture(scope='module') +# async def response(event_loop, service): +# async for _response in service.listener(): +# yield _response -@pytest.mark.asyncio +@pytest.mark.xfail # expecting this to fail because of "No 3D GNSS fix" and GPS is unavailable async def test_location(event_loop, service: GPSService): - await service.location() - resp = await service.response() - print(resp) - assert isinstance(resp, list) - assert resp[2]['request']['status'] == 'success', f"location() returned failure; {resp[2]['request']['info']}" + id = await service.location() + resp = AFBResponse(await service.response()) + assert resp.status == 'success' + +async def test_subscribe_location(event_loop, service: GPSService): + id = await service.subscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' + +async def test_unsubscribe(event_loop, service: GPSService): + id = await service.unsubscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' + +async def test_location_events(event_loop, service: GPSService): + id = await service.subscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' # successful subscription + + resp = await asyncio.wait_for(service.response(), 10) + resp = AFBResponse(resp) + assert resp.type == AFBT.EVENT, f'Expected EVENT response, got {resp.type.name} instead' -- cgit 1.2.3-korg