aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEdi Feschiyan <edi.feschiyan@konsulko.com>2020-10-21 14:45:19 +0300
committerScott Murray <scott.murray@konsulko.com>2020-11-10 23:52:39 +0000
commite1e230049f6fdaf0f03dbc5d37797b1605a6e969 (patch)
treefaf7b5893738e585e054f70d760b853696c72b60
parent8f44a95e7626e67cda3e3a05c3fa1a9298c02809 (diff)
Add CAN value tests
Porting CAN value tests from lua test widgets to complete the CAN testing suite. Bug-AGL: SPEC-3660 Signed-off-by: Edi Feschiyan <edi.feschiyan@konsulko.com> Change-Id: I1f977e00fd69c3dac5a34746e514c262647461f5
-rw-r--r--pyagl/tests/test_can.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py
index 1a9544f..ff164e5 100644
--- a/pyagl/tests/test_can.py
+++ b/pyagl/tests/test_can.py
@@ -578,3 +578,92 @@ async def test_low_can_write_j1939_signal(event_loop, service: cs):
msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1})
resp = await service.afbresponse()
assert resp.type == AFBT.ERROR, resp
+
+
+# Value tests
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_1.canreplay'], indirect=True)
+async def test_value_test_1(event_loop, service: cs, prepare_replay_file):
+ eventname = 'messages.vehicle.average.speed'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_1.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_2.canreplay'], indirect=True)
+async def test_value_test_2(event_loop, service: cs, prepare_replay_file):
+ eventname = 'diagnostic_messages.engine.speed'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_2.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+
+
+@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_3.canreplay'], indirect=True)
+async def test_value_test_3(event_loop, service: cs, prepare_replay_file):
+ eventname = 'diagnostic_messages.engine.load'
+ msgid = await service.subscribe(eventname)
+ resp = await service.afbresponse()
+ assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}'
+
+ async def listen(service: cs):
+ async for resp in service.listener():
+ assert 'name' in resp.data
+ assert resp.data['name'] == eventname
+ break
+
+ listener = asyncio.create_task(listen(service))
+ player = canplayer(service)
+ assert player is not None
+ await player.play('testValueCAN_3.canreplay')
+ await listener
+ await player.stop()
+
+ msgid = await service.unsubscribe(eventname)
+ # wait until the event queue flushes out and we get unsubscribe confirmation
+ async for resp in service.listener():
+ if resp.type != AFBT.RESPONSE: continue
+ if resp.msgid != msgid: continue
+ assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}'
+ break
+