From e29a6e10bb687cb2a4dca84e8c41bd50948ee534 Mon Sep 17 00:00:00 2001 From: Edi Feschiyan Date: Mon, 14 Sep 2020 21:21:32 +0300 Subject: Add CANService tests Porting most of the lua tests for agl-service-can-low to pyagl Handle None in AGLBaseService.afbresponse() and AFBResponse class for timeouts Add new can_j1939 marker for J1939 CAN tests, also xfail marked until a way to determine if J1939 kernel support is available signal-composer verb queries tend to crash/choke after service startup right after boot, but when the service is restarted, tests run fine - so adding a timeout handling on verb queries in tests Bug-AGL: SPEC-3585 Signed-off-by: Edi Feschiyan Change-Id: Ib1fc259f2b28fb11c5f89b61df0b066d2c02045e --- pyagl/pytest.ini | 1 + pyagl/services/can.py | 16 +- pyagl/tests/test_can.py | 474 +++++++++++++++++++++++++++++++++++- pyagl/tests/test_signal_composer.py | 9 +- 4 files changed, 491 insertions(+), 9 deletions(-) diff --git a/pyagl/pytest.ini b/pyagl/pytest.ini index 6587041..9eec573 100644 --- a/pyagl/pytest.ini +++ b/pyagl/pytest.ini @@ -17,3 +17,4 @@ markers = mediascanner: agl-service-mediascanner tests signal_composer: agl-service-signal-composer tests can_low_level: agl-service-can-low-level tests + can_j1939: J1939 CAN specific tests diff --git a/pyagl/services/can.py b/pyagl/services/can.py index 12c920a..91bd01d 100644 --- a/pyagl/services/can.py +++ b/pyagl/services/can.py @@ -15,6 +15,7 @@ from pyagl.services.base import AGLBaseService, AFBResponse +from typing import Union import asyncio import json import os @@ -43,11 +44,18 @@ class CANService(AGLBaseService): async def list(self): return await self.request('list') - async def subscribe(self, event): - return await self.request(verb='subscribe', values={'event': event}) + async def subscribe(self, values: Union[str, dict] = ""): + if isinstance(values, str): + return await self.request(verb='subscribe', values={'event': values}) + else: + return await self.request(verb='subscribe', values=values) + + async def unsubscribe(self, values: Union[str, dict] = ""): + if isinstance(values, str): + return await self.request(verb='unsubscribe', values={'event': values}) + else: + return await self.request(verb='unsubscribe', values=values) - async def unsubscribe(self, event): - return await self.request(verb='unsubscribe', values={'event': event}) async def main(loop): args = CANService.parser.parse_args() diff --git a/pyagl/tests/test_can.py b/pyagl/tests/test_can.py index a8ef8b3..ba41202 100644 --- a/pyagl/tests/test_can.py +++ b/pyagl/tests/test_can.py @@ -17,6 +17,9 @@ import asyncio import os import pytest import logging +import asyncssh +import subprocess + from pyagl.services.base import AFBResponse, AFBT from pyagl.services.can import CANService as cs @@ -38,30 +41,493 @@ async def service(): await svc.websocket.close() +@pytest.fixture(scope='module') +async def prepare_replay_files(service: cs): + result = None + + if service.ip != 'localhost': + async with asyncssh.connect(service.ip, username='root') as ssh: + result = await ssh.run('unzip -jo /usr/AGL/apps/test/agl-service-can-low-level-test.wgt var/*.canreplay -d /tmp/canlowfixtures/') + assert result.returncode == 0, "there was a problem during *.canreplay fixture extraction" + else: + result = subprocess.check_output('unzip -jo /usr/AGL/apps/test/agl-service-can-low-level-test.wgt var/*.canreplay -d /tmp/canlowfixtures/', shell=True) + + +@pytest.fixture(scope='module') +async def canplayer(service, prepare_replay_files, request): + if service.ip != 'localhost': + async with asyncssh.connect(service.ip, username='root') as ssh: + result = await ssh.run(f'nohup canplayer -I /tmp/canlowfixtures/{request.param} < /dev/null > /dev/null 2>&1 &') + yield None + pid = await ssh.run("killall -9 canplayer") + else: # running on target + proc = subprocess.Popen(f'nohup canplayer -I /tmp/canlowfixtures/{request.param} < /dev/null > /dev/null 2>&1 &', shell=True) + yield None + subprocess.check_output('killall -9 canplayer', shell=True) + + +# basic tests +hsmessage = {'bus_name': 'hs', + 'frame': { + 'can_id': 1568, + 'can_dlc': 8, + 'can_data': [255, 255, 255, 255, 255, 255, 255, 255] + }} + +@pytest.mark.dependency @pytest.mark.regular async def test_list(event_loop, service: cs): msgid = await service.list() resp = await service.afbresponse() - assert resp.status == 'success' + assert resp.status == 'success', resp.info @pytest.mark.regular async def test_get(event_loop, service: cs): msgid = await service.get("engine.speed") resp = await service.afbresponse() - assert resp.status == 'success' + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +# @pytest.mark.dependency(depends=['test_list']) +async def test_get_all_messages(event_loop, service: cs): + msgid = await service.list() + resp = await service.afbresponse() + messagelist = [m for m in resp.data if m.startswith('messages.')] + for m in messagelist: + msgid = await service.get(m) + resp = await service.afbresponse() + assert resp.status == 'success', f'.get() failed with message {m}' @pytest.mark.regular async def test_auth(event_loop, service: cs): msgid = await service.auth() resp = await service.afbresponse() - assert resp.status == 'success' + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +async def test_write_wo_auth(event_loop, service: cs): + msgid = await service.write({'signal_name': 'engine.speed', 'signal_value': 12}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR @pytest.mark.regular async def test_write(event_loop, service: cs): msgid = await service.write({"signal_name": "hvac.temperature.left", "signal_value": 21}) resp = await service.afbresponse() - assert resp.status == 'success' + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +async def test_write_raw(event_loop, service: cs): + msgid = await service.write(hsmessage) + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +async def test_write_unwritable_signal(event_loop, service: cs): + msgid = await service.write({'signal_name': 'vehicle.average.speed', 'signal_value': 1234}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_wrong_value_key(event_loop, service: cs): + msgid = await service.write({'name': 'vehicle.average.speed', 'signal_value': 21}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_bus_key(event_loop, service: cs): + message = dict(hsmessage) + message['bus'] = message.pop('bus_name') + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_frame_key(event_loop, service: cs): + message = dict(hsmessage) + message['fram'] = message.pop('frame') + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_id_key(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['id'] = message['frame'].pop('can_id') + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_id_args(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['can_id'] = "1568" + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_dlc_key(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['dlc'] = message['frame'].pop('can_dlc') + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_dlc_args(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['can_dlc'] = "8" + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_data_key(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['data'] = message['frame'].pop('can_data') + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_write_raw_invalid_can_data_value(event_loop, service: cs): + message = dict(hsmessage) + message['frame']['can_data'] = ["255", 255, 255, 255, 255, 255, 255, 255] + msgid = await service.write(message) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR + + +@pytest.mark.regular +async def test_get_written_message(event_loop, service: cs): + msgid = await service.get("hvac.temperature.left") + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +@pytest.mark.dependency +async def test_subscribe(event_loop, service: cs): + msgid = await service.subscribe('*') + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +@pytest.mark.dependency(depends=['test_subscribe']) +async def test_unsubscribe(event_loop, service: cs): + msgid = await service.unsubscribe('*') + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +@pytest.mark.regular +@pytest.mark.parametrize('canplayer', ['test1.canreplay'], indirect=True) +async def test_diagnostic_engine_speed_simulation(event_loop, service: cs, canplayer): + eventname = 'diagnostic_messages.engine.speed' + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname}; info: {resp.info}' + + async for resp in service.listener(): + resp = await service.afbresponse() + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + msgid = await service.unsubscribe(eventname) + async for resp in service.listener(): # wait until the event queue flushes out and we get unsubscribe confirmation + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.regular +@pytest.mark.parametrize('canplayer', ['test2-3.canreplay'], indirect=True) +async def test_Subscribe_all(event_loop, service: cs, canplayer): + eventname = 'messages.vehicle.average.speed' + msgid = await service.subscribe('*') + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for all events; info: {resp.info}' + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + break + + msgid = await service.unsubscribe('*') + async for resp in service.listener(): # wait until the event queue flushes out and we get unsubscribe confirmation + if resp.type != AFBT.RESPONSE: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + +# End of basic tests + +# Filter tests + + +@pytest.mark.regular +@pytest.mark.parametrize('canplayer', ['testFilter01filteredOut.canreplay'], indirect=True) +async def test_Filter_Test_01_Step_1(event_loop, service: cs, canplayer): + minspeed = 30 + maxspeed = 100 + eventname = 'messages.engine.speed' + msgid = await service.subscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}}) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async for resp in service.listener(): + assert resp is None, "Test failed, was supposed to timeout and return None on this fixture, but returned data" + break + + msgid = await service.unsubscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}}) + async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm + if resp.type == AFBT.EVENT: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + + +@pytest.mark.regular +@pytest.mark.parametrize('canplayer', ['testFilter01pass.canreplay'], indirect=True) +async def test_Filter_Test_01_Step_2(event_loop, service: cs, canplayer): + minspeed = 30 + maxspeed = 100 + eventname = 'messages.engine.speed' + msgid = await service.subscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}}) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + async for resp in service.listener(): + assert 'name' in resp.data + assert resp.data['name'] == eventname + assert minspeed < resp.data['value'] < maxspeed + break + + msgid = await service.unsubscribe({'event': eventname, 'filter': {'min': minspeed, 'max': maxspeed}}) + async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm + if resp.type == AFBT.EVENT: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +@pytest.mark.regular +@pytest.mark.parametrize('canplayer', ['test2-3.canreplay'], indirect=True) +async def test_Filter_Test_01_Step_3(event_loop, service: cs, canplayer): + # this testcase is supposed to test event filter frequency + minspeed = 30 + maxspeed = 100 + tolerance = 100000 # usec + eventname = 'messages.vehicle.average.speed' + msgid = await service.subscribe({'event': eventname, 'filter': {'frequency': 1, 'min': minspeed, 'max': maxspeed}}) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname} events; info: {resp.info}' + + resplist = [] + for i in range(2): + resplist.append(await service.afbresponse()) + + for r in resplist: + assert 'name' in r.data + assert r.data['name'] == eventname + assert 'value' in r.data + assert minspeed < r.data['value'] < maxspeed + + # check whether the time delta between the two events is more than 1 second ('frequency' in filter above, in usec) + delta = resplist[-1].data['timestamp'] - resplist[0].data['timestamp'] + assert 1000000 - tolerance < delta < 1000000 + tolerance + + msgid = await service.unsubscribe({'event': eventname, 'filter': {'frequency': 1, 'min': minspeed, 'max': maxspeed}}) + async for resp in service.listener(): # discard events in the queue until it flushes out and we get unsub confirm + if resp.type == AFBT.EVENT: continue + if resp.msgid != msgid: continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + +# end of Filter tests + +# J1939 tests +# Un/Subscription tests + +@pytest.mark.can_j1939 +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +async def test_low_can_subscribe_j1939_event(event_loop, service: cs): + msgid = await service.subscribe('Eng.Momentary.Overspeed.Enable') + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_event(event_loop, service: cs): + msgid = await service.unsubscribe('Eng.Momentary.Overspeed.Enable') + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_events(event_loop, service: cs): + msgid = await service.subscribe('Eng.*') + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_events(event_loop, service: cs): + msgid = await service.unsubscribe('Eng.*') + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_pgn(event_loop, service: cs): + msgid = await service.subscribe({'pgn': 61442}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_pgn(event_loop, service: cs): + msgid = await service.unsubscribe({'pgn': 61442}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_all_pgn(event_loop, service: cs): + msgid = await service.subscribe({'pgn': '*'}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_all_pgn(event_loop, service: cs): + msgid = await service.unsubscribe({'pgn': '*'}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_id(event_loop, service: cs): + msgid = await service.subscribe({'id': 61442}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_id(event_loop, service: cs): + msgid = await service.unsubscribe({'id': 61442}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_all_id(event_loop, service: cs): + msgid = await service.subscribe({'id': '*'}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_unsubscribe_j1939_all_id(event_loop, service: cs): + msgid = await service.unsubscribe({'id': '*'}) + resp = await service.afbresponse() + assert resp.type == AFBT.RESPONSE + assert resp.status == 'success', resp.info + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_no_event(event_loop, service: cs): + msgid = await service.subscribe({'event': ''}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}' + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_no_id(event_loop, service: cs): + msgid = await service.subscribe({'id': ''}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}' + + +@pytest.mark.xfail(reason="J1939 support may not be available in the kernel") +@pytest.mark.can_j1939 +async def test_low_can_subscribe_j1939_no_pgn(event_loop, service: cs): + msgid = await service.subscribe({'pgn': ''}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR, f'Expected ERROR type messsage, got {resp.type}' + +# End of Un/Subscription tests + +# Write tests +# + +@pytest.mark.regular +async def test_low_can_write_wo_auth(event_loop, service: cs): + msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1}) + async for resp in service.listener(): # using a listener because sometimes there are events in the queue from + # previous tests or subscriptions even they are properly unsubscribed and awaited for confirmation + if resp is AFBResponse and resp.type is not AFBT.ERROR: + logging.warning(f'Expected Error response, got {resp}') + continue + assert resp.type == AFBT.ERROR, resp + assert resp.msgid == msgid + break + +@pytest.mark.regular +async def test_low_can_write_auth(event_loop, service: cs): + msgid = await service.auth() + resp = await service.afbresponse() + assert resp.status == 'success', resp + + +@pytest.mark.regular +@pytest.mark.xfail(reason='J1939 write messages are failing') +async def test_low_can_write_signal(event_loop, service: cs): + msgid = await service.write({'signal_name': 'Eng.Momentary.Overspeed.Enable', 'signal_value': 1}) + resp = await service.afbresponse() + assert resp.type == AFBT.ERROR, resp diff --git a/pyagl/tests/test_signal_composer.py b/pyagl/tests/test_signal_composer.py index 5a96b45..bdb8edd 100644 --- a/pyagl/tests/test_signal_composer.py +++ b/pyagl/tests/test_signal_composer.py @@ -45,17 +45,20 @@ async def service(): async def test_list(event_loop, service: scs): msgid = await service.list() resp = await service.afbresponse() + assert resp is not None, 'list() timed out' assert resp.status == 'success' - print(resp) + @pytest.mark.regular async def test_getNoFilter(event_loop, service: scs): msgid = await service.get({'signal': 'fuel_level'}) resp = await service.afbresponse() + assert resp is not None, f'.get() timed out' assert resp.status == 'success' @pytest.mark.regular @pytest.mark.dependency(depends=['test_list']) +@pytest.mark.xfail(reason='on first boot enumerating through all signals causes the service to crash, restarting it fixes it') async def test_getAllSignals(event_loop, service: scs): msgid = await service.list() resp = await service.afbresponse() @@ -78,6 +81,7 @@ async def test_getAllSignals(event_loop, service: scs): async def test_getFilterMin(event_loop, service: scs): msgid = await service.get({'signal': 'latitude', 'options': {'minimum': 10}}) resp = await service.afbresponse() + assert resp is not None, '.get() timed out' assert resp.status == 'success' @@ -85,6 +89,7 @@ async def test_getFilterMin(event_loop, service: scs): async def test_getFilterMax(event_loop, service: scs): msgid = await service.get({'signal': 'vehicle_speed', 'options': {'maximum': 10}}) resp = await service.afbresponse() + assert resp is not None, '.get() timed out' assert resp.status == 'success' @@ -92,6 +97,7 @@ async def test_getFilterMax(event_loop, service: scs): async def test_subscribe(event_loop, service: scs): msgid = await service.subscribe({'service': 'longitude'}) resp = await service.afbresponse() + assert resp is not None, ".subscribe() timed out with {'service': 'longitude'}" assert resp.status == 'success' @@ -99,6 +105,7 @@ async def test_subscribe(event_loop, service: scs): async def test_unsubscribe(event_loop, service: scs): msgid = await service.unsubscribe({'service': 'longitude'}) resp = await service.afbresponse() + resp is not None, ".unsubscribe() timed out with {'service': 'longitude'}" assert resp.status == 'success' -- cgit 1.2.3-korg