summaryrefslogtreecommitdiffstats
path: root/test_gps.py
diff options
context:
space:
mode:
authorEdi Feschiyan <edi.feschiyan@konsulko.com>2020-05-22 17:05:56 +0300
committerEdi Feschiyan <edi.feschiyan@konsulko.com>2020-06-09 09:30:12 +0300
commitc01603b57d97eba4aac9e62aca9f22da77a2ada0 (patch)
treef445a3e879fa84dd52a5cf6fecfb20045957a182 /test_gps.py
parente190fff1a49499fc9e07931c641d8d110c3a595b (diff)
updating gps tests
Diffstat (limited to 'test_gps.py')
-rw-r--r--test_gps.py42
1 files changed, 32 insertions, 10 deletions
diff --git a/test_gps.py b/test_gps.py
index 43834fe..a3b59c7 100644
--- a/test_gps.py
+++ b/test_gps.py
@@ -3,10 +3,11 @@ import os
import pytest
from gps import GPSService
import logging
-
+from aglbaseservice import AFBResponse, AFBT
logger = logging.getLogger('pytest-gps')
logger.setLevel(logging.DEBUG)
+pytestmark = pytest.mark.asyncio
@pytest.fixture(scope="module")
def event_loop():
@@ -21,15 +22,36 @@ async def service():
yield gpss
await gpss.websocket.close()
-@pytest.fixture(scope='module')
-async def listener(event_loop, service):
- listener = service.listener()
+# @pytest.fixture(scope='module')
+# async def response(event_loop, service):
+# async for _response in service.listener():
+# yield _response
-@pytest.mark.asyncio
+@pytest.mark.xfail # expecting this to fail because of "No 3D GNSS fix" and GPS is unavailable
async def test_location(event_loop, service: GPSService):
- await service.location()
- resp = await service.response()
- print(resp)
- assert isinstance(resp, list)
- assert resp[2]['request']['status'] == 'success', f"location() returned failure; {resp[2]['request']['info']}"
+ id = await service.location()
+ resp = AFBResponse(await service.response())
+ assert resp.status == 'success'
+
+async def test_subscribe_location(event_loop, service: GPSService):
+ id = await service.subscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success'
+
+async def test_unsubscribe(event_loop, service: GPSService):
+ id = await service.unsubscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success'
+
+async def test_location_events(event_loop, service: GPSService):
+ id = await service.subscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success' # successful subscription
+
+ resp = await asyncio.wait_for(service.response(), 10)
+ resp = AFBResponse(resp)
+ assert resp.type == AFBT.EVENT, f'Expected EVENT response, got {resp.type.name} instead'