diff options
-rw-r--r-- | pyagl/tests/test_can.py | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py index 1a9544f..ff164e5 100644 --- a/pyagl/tests/test_can.py +++ b/pyagl/tests/test_can.py @@ -578,3 +578,92 @@ async def test_low_can_write_j1939_signal(event_loop, service: cs): msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1}) resp = await service.afbresponse() assert resp.type == AFBT.ERROR, resp + + +# Value tests +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_1.canreplay'], indirect=True) +async def test_value_test_1(event_loop, service: cs, prepare_replay_file): + eventname = 'messages.vehicle.average.speed' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_1.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_2.canreplay'], indirect=True) +async def test_value_test_2(event_loop, service: cs, prepare_replay_file): + eventname = 'diagnostic_messages.engine.speed' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_2.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.parametrize('prepare_replay_file', ['testValueCAN_3.canreplay'], indirect=True) +async def test_value_test_3(event_loop, service: cs, prepare_replay_file): + eventname = 'diagnostic_messages.engine.load' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async def listen(service: cs): + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + listener = asyncio.create_task(listen(service)) + player = canplayer(service) + assert player is not None + await player.play('testValueCAN_3.canreplay') + await listener + await player.stop() + + msgid = await service.unsubscribe(eventname) + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + |