diff options
Diffstat (limited to 'test_gps.py')
-rw-r--r-- | test_gps.py | 42 |
1 files changed, 32 insertions, 10 deletions
diff --git a/test_gps.py b/test_gps.py index 43834fe..a3b59c7 100644 --- a/test_gps.py +++ b/test_gps.py @@ -3,10 +3,11 @@ import os import pytest from gps import GPSService import logging - +from aglbaseservice import AFBResponse, AFBT logger = logging.getLogger('pytest-gps') logger.setLevel(logging.DEBUG) +pytestmark = pytest.mark.asyncio @pytest.fixture(scope="module") def event_loop(): @@ -21,15 +22,36 @@ async def service(): yield gpss await gpss.websocket.close() -@pytest.fixture(scope='module') -async def listener(event_loop, service): - listener = service.listener() +# @pytest.fixture(scope='module') +# async def response(event_loop, service): +# async for _response in service.listener(): +# yield _response -@pytest.mark.asyncio +@pytest.mark.xfail # expecting this to fail because of "No 3D GNSS fix" and GPS is unavailable async def test_location(event_loop, service: GPSService): - await service.location() - resp = await service.response() - print(resp) - assert isinstance(resp, list) - assert resp[2]['request']['status'] == 'success', f"location() returned failure; {resp[2]['request']['info']}" + id = await service.location() + resp = AFBResponse(await service.response()) + assert resp.status == 'success' + +async def test_subscribe_location(event_loop, service: GPSService): + id = await service.subscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' + +async def test_unsubscribe(event_loop, service: GPSService): + id = await service.unsubscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' + +async def test_location_events(event_loop, service: GPSService): + id = await service.subscribe('location') + resp = AFBResponse(await service.response()) + assert resp.msgid == id + assert resp.status == 'success' # successful subscription + + resp = await asyncio.wait_for(service.response(), 10) + resp = AFBResponse(resp) + assert resp.type == AFBT.EVENT, f'Expected EVENT response, got {resp.type.name} instead' |