summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pyagl/tests/test_can.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py
index 1a9544f..ff164e5 100644
--- a/pyagl/tests/test_can.py
+++ b/pyagl/tests/test_can.py
@@ -578,3 +578,92 @@ async def test_low_can_write_j1939_signal(event_loop, service: cs):
msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1})
resp = await service.afbresponse()
assert resp.type == AFBT.ERROR, resp
+
+
+# Value tests
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_1.canreplay'], indirect=True)
+async def test_value_test_1(event_loop, service: cs, prepare_replay_file):
+ eventname = 'messages.vehicle.average.speed'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_1.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_2.canreplay'], indirect=True)
+async def test_value_test_2(event_loop, service: cs, prepare_replay_file):
+ eventname = 'diagnostic_messages.engine.speed'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_2.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_3.canreplay'], indirect=True)
+async def test_value_test_3(event_loop, service: cs, prepare_replay_file):
+ eventname = 'diagnostic_messages.engine.load'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_3.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+