aboutsummaryrefslogtreecommitdiffstats
path: root/test_gps.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_gps.py')
-rw-r--r--test_gps.py42
1 files changed, 32 insertions, 10 deletions
diff --git a/test_gps.py b/test_gps.py
index 43834fe..a3b59c7 100644
--- a/test_gps.py
+++ b/test_gps.py
@@ -3,10 +3,11 @@ import os
import pytest
from gps import GPSService
import logging
-
+from aglbaseservice import AFBResponse, AFBT
logger = logging.getLogger('pytest-gps')
logger.setLevel(logging.DEBUG)
+pytestmark = pytest.mark.asyncio
@pytest.fixture(scope="module")
def event_loop():
@@ -21,15 +22,36 @@ async def service():
yield gpss
await gpss.websocket.close()
-@pytest.fixture(scope='module')
-async def listener(event_loop, service):
- listener = service.listener()
+# @pytest.fixture(scope='module')
+# async def response(event_loop, service):
+# async for _response in service.listener():
+# yield _response
-@pytest.mark.asyncio
+@pytest.mark.xfail # expecting this to fail because of "No 3D GNSS fix" and GPS is unavailable
async def test_location(event_loop, service: GPSService):
- await service.location()
- resp = await service.response()
- print(resp)
- assert isinstance(resp, list)
- assert resp[2]['request']['status'] == 'success', f"location() returned failure; {resp[2]['request']['info']}"
+ id = await service.location()
+ resp = AFBResponse(await service.response())
+ assert resp.status == 'success'
+
+async def test_subscribe_location(event_loop, service: GPSService):
+ id = await service.subscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success'
+
+async def test_unsubscribe(event_loop, service: GPSService):
+ id = await service.unsubscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success'
+
+async def test_location_events(event_loop, service: GPSService):
+ id = await service.subscribe('location')
+ resp = AFBResponse(await service.response())
+ assert resp.msgid == id
+ assert resp.status == 'success' # successful subscription
+
+ resp = await asyncio.wait_for(service.response(), 10)
+ resp = AFBResponse(resp)
+ assert resp.type == AFBT.EVENT, f'Expected EVENT response, got {resp.type.name} instead'