diff options
author | Edi Feschiyan <edi.feschiyan@konsulko.com> | 2020-10-21 14:45:19 +0300 |
---|---|---|
committer | Jan-Simon Moeller <jsmoeller@linuxfoundation.org> | 2020-11-18 17:15:22 +0100 |
commit | ca505ea0fa560d9715e636ddbeab9e1986838d9f (patch) | |
tree | 361e6aa0bef6ccd2ea4ab4a8fcf2bacd1b742689 | |
parent | 10a486735977ca141c5d6b3d0a3a59956ad274a9 (diff) |
Add CAN value tests
Porting CAN value tests from lua test widgets to complete the CAN
testing suite.
Bug-AGL: SPEC-3660
Signed-off-by: Edi Feschiyan <edi.feschiyan@konsulko.com>
Change-Id: I1f977e00fd69c3dac5a34746e514c262647461f5
-rw-r--r-- | pyagl/tests/test_can.py | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py index 1a9544f..ff164e5 100644 --- a/pyagl/tests/test_can.py +++ b/pyagl/tests/test_can.py @@ -578,3 +578,92 @@ async def test_low_can_write_j1939_signal(event_loop, service: cs): msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1}) resp = await service.afbresponse() assert resp.type == AFBT.ERROR, resp + + +# Value tests +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_1.canreplay'], indirect=True) +async def test_value_test_1(event_loop, service: cs, prepare_replay_file): + eventname = 'messages.vehicle.average.speed' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_1.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_2.canreplay'], indirect=True) +async def test_value_test_2(event_loop, service: cs, prepare_replay_file): + eventname = 'diagnostic_messages.engine.speed' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_2.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_3.canreplay'], indirect=True) +async def test_value_test_3(event_loop, service: cs, prepare_replay_file): + eventname = 'diagnostic_messages.engine.load' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_3.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + |