aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pyagl/pytest.ini1
-rw-r--r--pyagl/services/can.py16
-rw-r--r--pyagl/tests/test_can.py474
-rw-r--r--pyagl/tests/test_signal_composer.py9
4 files changed, 491 insertions, 9 deletions
diff --git a/pyagl/pytest.ini b/pyagl/pytest.ini
index 6587041..9eec573 100644
--- a/pyagl/pytest.ini
+++ b/pyagl/pytest.ini
@@ -17,3 +17,4 @@ markers =
mediascanner: agl-service-mediascanner tests
signal_composer: agl-service-signal-composer tests
can_low_level: agl-service-can-low-level tests
+ can_j1939: J1939 CAN specific tests
diff --git a/pyagl/services/can.py b/pyagl/services/can.py
index 12c920a..91bd01d 100644
--- a/pyagl/services/can.py
+++ b/pyagl/services/can.py
@@ -15,6 +15,7 @@
from pyagl.services.base import AGLBaseService, AFBResponse
+from typing import Union
import asyncio
import json
import os
@@ -43,11 +44,18 @@ class CANService(AGLBaseService):
async def list(self):
return await self.request('list')
- async def subscribe(self, event):
- return await self.request(verb='subscribe', values={'event': event})
+ async def subscribe(self, values: Union[str, dict] = ""):
+ if isinstance(values, str):
+ return await self.request(verb='subscribe', values={'event': values})
+ else:
+ return await self.request(verb='subscribe', values=values)
+
+ async def unsubscribe(self, values: Union[str, dict] = ""):
+ if isinstance(values, str):
+ return await self.request(verb='unsubscribe', values={'event': values})
+ else:
+ return await self.request(verb='unsubscribe', values=values)
- async def unsubscribe(self, event):
- return await self.request(verb='unsubscribe', values={'event': event})
async def main(loop):
args = CANService.parser.parse_args()
diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py
index a8ef8b3..ba41202 100644
--- a/pyagl/tests/test_can.py
+++ b/pyagl/tests/test_can.py
@@ -17,6 +17,9 @@ import asyncio
import os
import pytest
import logging
+import asyncssh
+import subprocess
+
from pyagl.services.base import AFBResponse, AFBT
from pyagl.services.can import CANService as cs
@@ -38,30 +41,493 @@ async def service():
await svc.websocket.close()
+@pytest.fixture(scope='module')
+async def prepare_replay_files(service: cs):
+ result = None
+
+ if service.ip != 'localhost':
+ async with asyncssh.connect(service.ip, username='root') as ssh:
+ result = await ssh.run('unzip -jo /usr/AGL/apps/test/agl-service-can-low-level-test.wgt var/*.canreplay -d /tmp/canlowfixtures/')
+ assert result.returncode == 0, "there was a problem during *.canreplay fixture extraction"
+ else:
+ result = subprocess.check_output('unzip -jo /usr/AGL/apps/test/agl-service-can-low-level-test.wgt var/*.canreplay -d /tmp/canlowfixtures/', shell=True)
+
+
+@pytest.fixture(scope='module')
+async def canplayer(service, prepare_replay_files, request):
+ if service.ip != 'localhost':
+ async with asyncssh.connect(service.ip, username='root') as ssh:
+ result = await ssh.run(f'nohup canplayer -I /tmp/canlowfixtures/{request.param} < /dev/null > /dev/null 2>&1 &')
+ yield None
+ pid = await ssh.run("killall -9 canplayer")
+ else: # running on target
+ proc = subprocess.Popen(f'nohup canplayer -I /tmp/canlowfixtures/{request.param} < /dev/null > /dev/null 2>&1 &', shell=True)
+ yield None
+ subprocess.check_output('killall -9 canplayer', shell=True)
+
+
+# basic tests
+hsmessage = {'bus_name': 'hs',
+ 'frame': {
+ 'can_id': 1568,
+ 'can_dlc': 8,
+ 'can_data': [255, 255, 255, 255, 255, 255, 255, 255]
+ }}
+
+@pytest.mark.dependency
@pytest.mark.regular
async def test_list(event_loop, service: cs):
msgid = await service.list()
resp = await service.afbresponse()
- assert resp.status == 'success'
+ assert resp.status == 'success', resp.info
@pytest.mark.regular
async def test_get(event_loop, service: cs):
msgid = await service.get("engine.speed")
resp = await service.afbresponse()
- assert resp.status == 'success'
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+# @pytest.mark.dependency(depends=['test_list'])
+async def test_get_all_messages(event_loop, service: cs):
+ msgid = await service.list()
+ resp = await service.afbresponse()
+ messagelist = [m for m in resp.data if m.startswith('messages.')]
+ for m in messagelist:
+ msgid = await service.get(m)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'.get() failed with message {m}'
@pytest.mark.regular
async def test_auth(event_loop, service: cs):
msgid = await service.auth()
resp = await service.afbresponse()
- assert resp.status == 'success'
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+async def test_write_wo_auth(event_loop, service: cs):
+ msgid = await service.write({'signal_name': 'engine.speed', 'signal_value': 12})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
@pytest.mark.regular
async def test_write(event_loop, service: cs):
msgid = await service.write({"signal_name": "hvac.temperature.left", "signal_value": 21})
resp = await service.afbresponse()
- assert resp.status == 'success'
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+async def test_write_raw(event_loop, service: cs):
+ msgid = await service.write(hsmessage)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+async def test_write_unwritable_signal(event_loop, service: cs):
+ msgid = await service.write({'signal_name': 'vehicle.average.speed', 'signal_value': 1234})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_wrong_value_key(event_loop, service: cs):
+ msgid = await service.write({'name': 'vehicle.average.speed', 'signal_value': 21})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_bus_key(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['bus'] = message.pop('bus_name')
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_frame_key(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['fram'] = message.pop('frame')
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_id_key(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['id'] = message['frame'].pop('can_id')
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_id_args(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['can_id'] = "1568"
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_dlc_key(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['dlc'] = message['frame'].pop('can_dlc')
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_dlc_args(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['can_dlc'] = "8"
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_data_key(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['data'] = message['frame'].pop('can_data')
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_write_raw_invalid_can_data_value(event_loop, service: cs):
+ message = dict(hsmessage)
+ message['frame']['can_data'] = ["255", 255, 255, 255, 255, 255, 255, 255]
+ msgid = await service.write(message)
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR
+
+
+@pytest.mark.regular
+async def test_get_written_message(event_loop, service: cs):
+ msgid = await service.get("hvac.temperature.left")
+ resp = await service.afbresponse()
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+@pytest.mark.dependency
+async def test_subscribe(event_loop, service: cs):
+ msgid = await service.subscribe('*')
+ resp = await service.afbresponse()
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+@pytest.mark.dependency(depends=['test_subscribe'])
+async def test_unsubscribe(event_loop, service: cs):
+ msgid = await service.unsubscribe('*')
+ resp = await service.afbresponse()
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.regular
+@pytest.mark.parametrize('canplayer', ['test1.canreplay'], indirect=True)
+async def test_diagnostic_engine_speed_simulation(event_loop, service: cs, canplayer):
+ eventname = 'diagnostic_messages.engine.speed'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname}; info: {resp.info}'
+
+ async for resp in service.listener():
+ resp = await service.afbresponse()
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ msgid = await service.unsubscribe(eventname)
+ async for resp in service.listener(): # wait until the event queue flushes out and we get unsubscribe confirmation
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.regular
+@pytest.mark.parametrize('canplayer', ['test2-3.canreplay'], indirect=True)
+async def test_Subscribe_all(event_loop, service: cs, canplayer):
+ eventname = 'messages.vehicle.average.speed'
+ msgid = await service.subscribe('*')
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for all events; info: {resp.info}'
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ msgid = await service.unsubscribe('*')
+ async for resp in service.listener(): # wait until the event queue flushes out and we get unsubscribe confirmation
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+# End of basic tests
+
+# Filter tests
+
+
+@pytest.mark.regular
+@pytest.mark.parametrize('canplayer', ['testFilter01filteredOut.canreplay'], indirect=True)
+async def test_Filter_Test_01_Step_1(event_loop, service: cs, canplayer):
+ minspeed = 30
+ maxspeed = 100
+ eventname = 'messages.engine.speed'
+ msgid = await service.subscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}})
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async for resp in service.listener():
+ assert resp is None, "Test failed, was supposed to timeout and return None on this fixture, but returned data"
+ break
+
+ msgid = await service.unsubscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}})
+ async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm
+ if resp.type == AFBT.EVENT: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+
+@pytest.mark.regular
+@pytest.mark.parametrize('canplayer', ['testFilter01pass.canreplay'], indirect=True)
+async def test_Filter_Test_01_Step_2(event_loop, service: cs, canplayer):
+ minspeed = 30
+ maxspeed = 100
+ eventname = 'messages.engine.speed'
+ msgid = await service.subscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}})
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ assert minspeed < resp.data['value'] < maxspeed
+ break
+
+ msgid = await service.unsubscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}})
+ async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm
+ if resp.type == AFBT.EVENT: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.regular
+@pytest.mark.parametrize('canplayer', ['test2-3.canreplay'], indirect=True)
+async def test_Filter_Test_01_Step_3(event_loop, service: cs, canplayer):
+ # this testcase is supposed to test event filter frequency
+ minspeed = 30
+ maxspeed = 100
+ tolerance = 100000 # usec
+ eventname = 'messages.vehicle.average.speed'
+ msgid = await service.subscribe({'event': eventname, 'filter': {'frequency': 1, 'min': minspeed, 'max': maxspeed}})
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ resplist = []
+ for i in range(2):
+ resplist.append(await service.afbresponse())
+
+ for r in resplist:
+ assert 'name' in r.data
+ assert r.data['name'] == eventname
+ assert 'value' in r.data
+ assert minspeed < r.data['value'] < maxspeed
+
+ # check whether the time delta between the two events is more than 1 second ('frequency' in filter above, in usec)
+ delta = resplist[-1].data['timestamp'] - resplist[0].data['timestamp']
+ assert 1000000 - tolerance < delta < 1000000 + tolerance
+
+ msgid = await service.unsubscribe({'event': eventname, 'filter': {'frequency': 1, 'min': minspeed, 'max': maxspeed}})
+ async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm
+ if resp.type == AFBT.EVENT: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+# end of Filter tests
+
+# J1939 tests
+# Un/Subscription tests
+
+@pytest.mark.can_j1939
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+async def test_low_can_subscribe_j1939_event(event_loop, service: cs):
+ msgid = await service.subscribe('Eng.Momentary.Overspeed.Enable')
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_event(event_loop, service: cs):
+ msgid = await service.unsubscribe('Eng.Momentary.Overspeed.Enable')
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_events(event_loop, service: cs):
+ msgid = await service.subscribe('Eng.*')
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_events(event_loop, service: cs):
+ msgid = await service.unsubscribe('Eng.*')
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_pgn(event_loop, service: cs):
+ msgid = await service.subscribe({'pgn': 61442})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_pgn(event_loop, service: cs):
+ msgid = await service.unsubscribe({'pgn': 61442})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_all_pgn(event_loop, service: cs):
+ msgid = await service.subscribe({'pgn': '*'})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_all_pgn(event_loop, service: cs):
+ msgid = await service.unsubscribe({'pgn': '*'})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_id(event_loop, service: cs):
+ msgid = await service.subscribe({'id': 61442})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_id(event_loop, service: cs):
+ msgid = await service.unsubscribe({'id': 61442})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_all_id(event_loop, service: cs):
+ msgid = await service.subscribe({'id': '*'})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_unsubscribe_j1939_all_id(event_loop, service: cs):
+ msgid = await service.unsubscribe({'id': '*'})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.RESPONSE
+ assert resp.status == 'success', resp.info
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_no_event(event_loop, service: cs):
+ msgid = await service.subscribe({'event': ''})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}'
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_no_id(event_loop, service: cs):
+ msgid = await service.subscribe({'id': ''})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}'
+
+
+@pytest.mark.xfail(reason="J1939 support may not be available in the kernel")
+@pytest.mark.can_j1939
+async def test_low_can_subscribe_j1939_no_pgn(event_loop, service: cs):
+ msgid = await service.subscribe({'pgn': ''})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}'
+
+# End of Un/Subscription tests
+
+# Write tests
+#
+
+@pytest.mark.regular
+async def test_low_can_write_wo_auth(event_loop, service: cs):
+ msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1})
+ async for resp in service.listener(): # using a listener because sometimes there are events in the queue from
+ # previous tests or subscriptions even they are properly unsubscribed and awaited for confirmation
+ if resp is AFBResponse and resp.type is not AFBT.ERROR:
+ logging.warning(f'Expected Error response, got {resp}')
+ continue
+ assert resp.type == AFBT.ERROR, resp
+ assert resp.msgid == msgid
+ break
+
+@pytest.mark.regular
+async def test_low_can_write_auth(event_loop, service: cs):
+ msgid = await service.auth()
+ resp = await service.afbresponse()
+ assert resp.status == 'success', resp
+
+
+@pytest.mark.regular
+@pytest.mark.xfail(reason='J1939 write messages are failing')
+async def test_low_can_write_signal(event_loop, service: cs):
+ msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1})
+ resp = await service.afbresponse()
+ assert resp.type == AFBT.ERROR, resp
diff --git a/pyagl/tests/test_signal_composer.py b/pyagl/tests/test_signal_composer.py
index 5a96b45..bdb8edd 100644
--- a/pyagl/tests/test_signal_composer.py
+++ b/pyagl/tests/test_signal_composer.py
@@ -45,17 +45,20 @@ async def service():
async def test_list(event_loop, service: scs):
msgid = await service.list()
resp = await service.afbresponse()
+ assert resp is not None, 'list() timed out'
assert resp.status == 'success'
- print(resp)
+
@pytest.mark.regular
async def test_getNoFilter(event_loop, service: scs):
msgid = await service.get({'signal': 'fuel_level'})
resp = await service.afbresponse()
+ assert resp is not None, f'.get() timed out'
assert resp.status == 'success'
@pytest.mark.regular
@pytest.mark.dependency(depends=['test_list'])
+@pytest.mark.xfail(reason='on first boot enumerating through all signals causes the service to crash, restarting it fixes it')
async def test_getAllSignals(event_loop, service: scs):
msgid = await service.list()
resp = await service.afbresponse()
@@ -78,6 +81,7 @@ async def test_getAllSignals(event_loop, service: scs):
async def test_getFilterMin(event_loop, service: scs):
msgid = await service.get({'signal': 'latitude', 'options': {'minimum': 10}})
resp = await service.afbresponse()
+ assert resp is not None, '.get() timed out'
assert resp.status == 'success'
@@ -85,6 +89,7 @@ async def test_getFilterMin(event_loop, service: scs):
async def test_getFilterMax(event_loop, service: scs):
msgid = await service.get({'signal': 'vehicle_speed', 'options': {'maximum': 10}})
resp = await service.afbresponse()
+ assert resp is not None, '.get() timed out'
assert resp.status == 'success'
@@ -92,6 +97,7 @@ async def test_getFilterMax(event_loop, service: scs):
async def test_subscribe(event_loop, service: scs):
msgid = await service.subscribe({'service': 'longitude'})
resp = await service.afbresponse()
+ assert resp is not None, ".subscribe() timed out with {'service': 'longitude'}"
assert resp.status == 'success'
@@ -99,6 +105,7 @@ async def test_subscribe(event_loop, service: scs):
async def test_unsubscribe(event_loop, service: scs):
msgid = await service.unsubscribe({'service': 'longitude'})
resp = await service.afbresponse()
+ resp is not None, ".unsubscribe() timed out with {'service': 'longitude'}"
assert resp.status == 'success'