diff options
-rw-r--r-- | pyagl/conftest.py | 2 | ||||
-rw-r--r-- | pyagl/services/radio.py | 137 | ||||
-rw-r--r-- | pyagl/tests/test_radio.py | 604 |
3 files changed, 741 insertions, 2 deletions
diff --git a/pyagl/conftest.py b/pyagl/conftest.py index 0d470d6..1cefe24 100644 --- a/pyagl/conftest.py +++ b/pyagl/conftest.py @@ -86,8 +86,6 @@ def pytest_report_teststatus(config, report): if test_file.endswith('.py'): test_file = test_file[:-3] test_name = test_file + '_' + report.location[2][5:] - # Strip any fixture parameters - test_name = re.sub('\[.*\]$', '', test_name) test_result = lava_result_convert(report.outcome) # Generate expected LAVA testcase output diff --git a/pyagl/services/radio.py b/pyagl/services/radio.py new file mode 100644 index 0000000..a509526 --- /dev/null +++ b/pyagl/services/radio.py @@ -0,0 +1,137 @@ +# Copyright (C) 2020 Konsulko Group +# Author: Scott Murray +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyagl.services.base import AGLBaseService, AFBResponse +from typing import Union +import logging +import asyncio +import os + +class AFBRadioResponse(AFBResponse): + status: str + info: str + data = None + + def __init__(self, data: AFBResponse): + if isinstance(data, list): + super().__init__(data) + self.msgid = data.msgid + self.type = data.type + self.data = data.data + + +class RadioService(AGLBaseService): + service = 'agl-service-radio' + parser = AGLBaseService.getparser() + parser.add_argument('--start', help='Start radio playback') + parser.add_argument('--stop', help='Stop radio playback') + parser.add_argument('--scan-start', help='Start radio scanning') + parser.add_argument('--scan-stop', help='Stop radio scanning') + # FIXME: Add rest of verb arguments... + + def __await__(self): + return super()._async_init().__await__() + + def __init__(self, ip, port=None): + super().__init__(api='radio', ip=ip, port=port, service='agl-service-radio') + + async def frequency(self, value=None): + msg = None + if value is not None: + msg = {'value': value} + return await self.request('frequency', msg) + + async def band(self, value=None): + msg = None + if value is not None: + msg = {'value': value} + return await self.request('band', msg) + + async def band_supported(self, band=None): + msg = None + if band is not None: + msg = {'band': band} + return await self.request('band_supported', msg) + + async def frequency_range(self, band=None): + msg = None + if band is not None: + msg = {'band': band} + return await self.request('frequency_range', msg) + + async def frequency_step(self, band=None): + msg = None + if band is not None: + msg = {'band': band} + return await self.request('frequency_step', msg) + + async def start(self): + return await self.request('start') + + async def scan_start(self, direction=None): + msg = None + if direction is not None: + msg = {'direction': direction} + return await self.request('scan_start', msg) + + async def stop(self): + return await self.request('stop') + + async def scan_stop(self): + return await self.request('scan_stop') + + async def stereo_mode(self, value=None): + msg = None + if value is not None: + msg = {'value': value} + return await self.request('stereo_mode', msg) + + async def subscribe(self, event=None): + return await super().subscribe(event=event) + + async def unsubscribe(self, event=None): + return await super().unsubscribe(event=event) + + +async def main(loop): + args = RadioService.parser.parse_args() + RS = await RadioService(ip=args.ipaddr) + + if args.start: + msgid = await RS.start() + r = await RS.afbresponse() + print(r) + + if args.stop: + msgid = await RS.start() + r = await RS.afbresponse() + print(r) + + if args.subscribe: + for event in args.subscribe: + msgid = await RS.subscribe(event) + print(f"Subscribed for event {event} with messageid {msgid}") + r = await RS.afbresponse() + print(r) + + if args.listener: + async for response in RS.listener(): + print(response) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main(loop)) diff --git a/pyagl/tests/test_radio.py b/pyagl/tests/test_radio.py new file mode 100644 index 0000000..b8d397e --- /dev/null +++ b/pyagl/tests/test_radio.py @@ -0,0 +1,604 @@ +# Copyright (C) 2020 Konsulko Group +# Author: Scott Murray <scott.murray@konsulko.com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Note that these tests make the following assumptions: +# - The radio backend implementation only supports FM. +# - The radio backend implementation only supports stereo mode. +# - The radio environment/implementation is such that scanning from +# the middle of the FM band will find a station. +# - The radio backend implementation will wrap from the low/high ends +# of a band when seeking, and that scanning from the low/high values +# in the backward/forward directions will find a station. +# +# The FM stereo operation assumption could be changed if it can be +# assumed that the null/mock backend is used, currently there is no +# mechanism to force that. If that changes, the null backend could +# be updated to reflect the expected behavior, and the tests for +# setting frequency and scanning would need additional +# parameterization by frequency band. +# +# Known limitations: +# - The assumed behavior is that a scan_start when a scan is running +# has no effect, but it is difficult to enable reliably testing that +# is actually the case. It would perhaps be preferable if the +# binding returned an error instead, but the current implementations +# of the RTL-SDR and Kingfisher backends need some potentially +# involved rework to enable doing so. Additionally, a decision would +# need to be made on how to enable mocking it in the null backend +# (e.g. rely on a fixed significant delay in scanning or providing an +# interface to adjust mocked scan behavior). +# - There currently are no tests of the RDS functionality. Once some +# rework is done to simplify mocking something up in the null +# backend, the intent is some will be added. +# + +import asyncio +import os +import pytest +import logging +from pyagl.services.base import AFBResponse, AFBT +from pyagl.services.radio import RadioService as rs + +pytestmark = [pytest.mark.asyncio, pytest.mark.radio] +events = ['frequency', 'station_found', 'status'] + + +@pytest.fixture(scope='module') +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + + +@pytest.fixture(scope='module') +async def service(): + address = os.environ.get('AGL_TGT_IP', 'localhost') + port = os.environ.get('AGL_TGT_PORT', None) + ns = await rs(ip=address, port=port) + yield ns + await ns.websocket.close() + + +@pytest.fixture(scope='module') +async def band_config(event_loop, service: rs): + # Get frequency range + msgid = await service.frequency_range('FM') + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'min' in resp.data.keys() + assert 'max' in resp.data.keys() + freq_min = resp.data['min'] + freq_max = resp.data['max'] + assert freq_max > freq_min + + # Get frequency step + msgid = await service.frequency_step('FM') + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'step' in resp.data.keys() + freq_step = resp.data['step'] + + return { 'min': freq_min, 'max': freq_max, 'step': freq_step } + + +# subscribe/unsubscribe helper functions +async def subscribe_helper(service: rs, eventname: str): + msgid = await service.subscribe(eventname) + resp = await service.afbresponse() + assert resp.status == 'success', f'Could not subscribe for {eventname}; info: {resp.info}' + + +async def unsubscribe_helper(service: rs, eventname: str): + msgid = await service.unsubscribe(eventname) + + # wait until the event queue flushes out and we get unsubscribe confirmation + async for resp in service.listener(): + if resp.type != AFBT.RESPONSE: + continue + if resp.msgid != msgid: + continue + assert resp.status == 'success', f'Could not unsubscribe from {eventname}; info: {resp.info}' + break + + +# Fixture to initialize to a known state for scan tests +# +# The expected resulting state is: +# - frequency configured to center of band, or optionally to min or max +# frequency if 'frequency' is 'min'/'max' in the dict passed in as +# request parameter. +# - frequency and station_found events subscribed +# - play state will be playing, this can optionally be manually specified +# with the 'playing' boolean in the dict passed in as request parameter. + +@pytest.fixture() +async def scan_setup(event_loop, service: rs, band_config, request): + assert 'min' in band_config.keys() + assert 'max' in band_config.keys() + assert 'step' in band_config.keys() + freq_min = band_config['min'] + assert freq_min is not None + freq_max = band_config['max'] + assert freq_max is not None + freq_step = band_config['step'] + assert freq_step is not None + + # Start frequency to middle unless parameter indicates otherwise + freq = None + if hasattr(request, 'param'): + if 'frequency' in request.param.keys(): + if request.param['frequency'] == 'min': + freq = freq_min + elif request.param['frequency'] == 'max': + freq = freq_max + if freq is None: + # Set the frequency to the middle of the range + freq = int(freq_min + (((freq_max - freq_min) / 2) / freq_step) * freq_step) + + logging.debug(f"min = {freq_min}, max = {freq_max}, step = {freq_step}, setting {freq}") + msgid = await service.frequency(freq) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + + # Start playing before test unless parameter indicates otherwise + start = True + if hasattr(request, 'param'): + if 'playing' in request.param.keys(): + start = request.param['playing'] + if start: + msgid = await service.start() + else: + msgid = await service.stop() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + await subscribe_helper(service, 'frequency') + await subscribe_helper(service, 'station_found') + + yield { 'min': freq_min, 'max': freq_max, 'step': freq_step, 'start': freq, 'playing': start } + + # Cleanup after test + await unsubscribe_helper(service, 'frequency') + await unsubscribe_helper(service, 'station_found') + msgid = await service.stop() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + +# Async listener helper function for scan tests +# NOTE: Two events and a response are expected are expected after +# starting the scan, so need to check for all of them in our +# asynchronous listener so we don't miss anything. + +async def scan_listener(service: rs, freq: int): + resp_received = False + freq_event_received = False + station_event_received = False + async for resp in service.listener(): + assert resp.type == AFBT.EVENT or resp.type == AFBT.RESPONSE + if resp.type == AFBT.EVENT: + assert resp.api == 'radio/frequency' or resp.api == 'radio/station_found' + assert 'value' in resp.data.keys() + assert resp.data['value'] != freq + if resp.api == 'radio/frequency': + freq_event_received = True + else: + station_event_received = True + else: + assert resp.status == 'success' + resp_received = True + if resp_received and \ + freq_event_received and \ + station_event_received: + break + + +# +# Tests +# + + +@pytest.mark.parametrize('band,supported', [('AM', False), ('am', False), (0, False), ('FM', True), ('fm', True), (1, True)]) +async def test_band_supported(event_loop, service: rs, band, supported: bool): + assert band is not None + assert supported is not None + + msgid = await service.band_supported(band) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'supported' in resp.data.keys() + assert resp.data['supported'] == supported + + +async def test_band_supported_invalid(event_loop, service: rs): + msgid = await service.band_supported('foo') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +async def test_band_get(event_loop, service: rs): + msgid = await service.band() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'band' in resp.data.keys() + assert resp.data['band'] == 'FM' + + +@pytest.mark.parametrize('band,result', [('AM', 'FM'), ('am', 'FM'), (0, 'FM'), ('FM', 'FM'), ('fm', 'FM'), (1, 'FM')]) +async def test_band_set(event_loop, service: rs, band, result): + assert band is not None + assert result is not None + + msgid = await service.band('AM') + resp = await service.afbresponse() + # NOTE: If the band is not supported there will not be an error + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'band' in resp.data.keys() + assert resp.data['band'] == result + + +async def test_band_set_invalid(event_loop, service: rs): + msgid = await service.band('foo') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +async def test_stereo_mode_get(event_loop, service: rs): + msgid = await service.stereo_mode() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'mode' in resp.data.keys() + assert resp.data['mode'] == 'stereo' + + +@pytest.mark.parametrize('mode', ['MONO', 'mono', 0, 'STEREO', 'stereo', 1]) +async def test_stereo_mode_set_MONO(event_loop, service: rs, mode: str): + assert mode is not None + + msgid = await service.stereo_mode(mode) + resp = await service.afbresponse() + # NOTE: There will never be an error, and it is assumed that + # the mode will remain stereo. + assert resp.status == 'success' + assert 'mode' in resp.data.keys() + assert resp.data['mode'] == 'stereo' + + +async def test_stereo_mode_set_invalid(event_loop, service: rs): + msgid = await service.stereo_mode('foo') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +@pytest.mark.parametrize('band', ['AM', 'am', 0, 'FM', 'fm', 1]) +async def test_frequency_range(event_loop, service: rs, band): + assert band is not None + + msgid = await service.frequency_range(band) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'min' in resp.data.keys() + assert 'max' in resp.data.keys() + assert resp.data['max'] > resp.data['min'] + + +async def test_frequency_range_invalid(event_loop, service: rs): + msgid = await service.frequency_range('foo') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +@pytest.mark.parametrize('band', ['AM', 'am', 0, 'FM', 'fm', 1]) +async def test_frequency_step(event_loop, service: rs, band): + assert band is not None + + msgid = await service.frequency_step(band) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'step' in resp.data.keys() + + +async def test_frequency_step_invalid(event_loop, service: rs): + msgid = await service.frequency_step('foo') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +async def test_frequency_get(event_loop, service: rs): + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + + +@pytest.mark.parametrize('frequency', ['center', 'min', 'max']) +async def test_frequency_set(event_loop, service: rs, band_config, frequency: str): + assert 'min' in band_config.keys() + assert 'max' in band_config.keys() + assert 'step' in band_config.keys() + freq_min = band_config['min'] + assert freq_min is not None + freq_max = band_config['max'] + assert freq_max is not None + freq_step = band_config['step'] + assert freq_step is not None + assert frequency in ['min', 'max', 'center'] + + # Get the current frequency + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + freq = resp.data['frequency'] + + eventname = 'frequency' + await subscribe_helper(service, eventname) + + # NOTE: An event and response are expected are expected one after + # another after setting the frequency, so handle both in our + # asynchronous listener so we don't miss anything. + async def listen(service: rs, freq: int, new_freq: int): + resp_received = False + event_received = False + async for resp in service.listener(): + assert resp.type == AFBT.EVENT or resp.type == AFBT.RESPONSE + if resp.type == AFBT.EVENT: + assert resp.api == 'radio/frequency' + assert 'value' in resp.data.keys() + assert resp.data['value'] == new_freq + event_received = True + else: + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + assert resp.data['frequency'] != freq + assert resp.data['frequency'] == new_freq + resp_received = True + if resp_received and event_received: + break + + # Set the frequency based on parameter + new_freq = None + if frequency == 'min': + new_freq = freq_min + elif frequency == 'max': + new_freq = freq_max + elif frequency == 'center': + # middle of the range + new_freq = int(freq_min + (((freq_max - freq_min) / 2) / freq_step) * freq_step) + # Handle collision + if new_freq == freq: + new_freq += freq_step + assert new_freq is not None + logging.debug(f"min = {freq_min}, max = {freq_max}, step = {freq_step}, setting {new_freq}") + listener = asyncio.create_task(listen(service, freq, new_freq)) + msgid = await service.frequency(new_freq) + await listener + + await unsubscribe_helper(service, eventname) + + # Get the current frequency to make sure it changed + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + assert resp.data['frequency'] == new_freq + + +@pytest.mark.parametrize('error', ['low', 'high', 'offset']) +async def test_frequency_set_invalid_frequency(event_loop, service: rs, band_config, error: str): + assert 'min' in band_config.keys() + assert 'max' in band_config.keys() + assert 'step' in band_config.keys() + freq_min = band_config['min'] + assert freq_min is not None + freq_max = band_config['max'] + assert freq_max is not None + freq_step = band_config['step'] + assert freq_step is not None + assert error in ['low', 'high', 'offset'] + + # Get the current frequency + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + freq = resp.data['frequency'] + + # Set the frequency based on parameter + new_freq = None + if error == 'low': + # Set the frequency to the minimum minus one step + new_freq = int(freq_min - freq_step) + elif error == 'high': + # Set the frequency to the maximum plus one step + new_freq = int(freq_max + freq_step) + elif error == 'offset': + # Set the frequency to the middle of the range + half a step + new_freq = freq_min + (((freq_max - freq_min) / 2) / freq_step) * freq_step + new_freq = int(freq + freq_step / 2) + assert new_freq is not None + logging.debug(f"min = {freq_min}, max = {freq_max}, step = {freq_step}, freq {freq}, setting {new_freq}") + msgid = await service.frequency(new_freq) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + # Get the current frequency to make sure it did not change + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + assert resp.data['frequency'] == freq + + +async def test_frequency_set_invalid(event_loop, service: rs): + msgid = await service.frequency('foo') + resp = await service.afbresponse() + logging.debug(f"{resp}") + assert resp.msgid == msgid + assert resp.type == AFBT.ERROR + + +async def test_start(event_loop, service: rs): + msgid = await service.start() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + +async def test_stop(event_loop, service: rs): + msgid = await service.stop() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + +@pytest.mark.parametrize('scan_setup', [{'playing': False}], indirect=True) +async def test_scan_start_not_playing(event_loop, service: rs, scan_setup): + assert 'playing' in scan_setup.keys() + assert scan_setup['playing'] != True + assert 'start' in scan_setup.keys() + start = scan_setup['start'] + assert start is not None + + msgid = await service.scan_start('FORWARD') + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + # NOTE: Ideally we should check that there are no unexpected + # frequency or station_found events generated here, but it + # seems perhaps more trouble than it's worth wrt waiting... + + # Get the current frequency to make sure there's been no change + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + assert resp.data['frequency'] == start + + +@pytest.mark.parametrize('direction', ['FORWARD', 'forward', 0, 'BACKWARD', 'backward', 1]) +async def test_scan_start(event_loop, service: rs, scan_setup, direction): + assert 'playing' in scan_setup.keys() + assert scan_setup['playing'] == True + assert 'start' in scan_setup.keys() + start = scan_setup['start'] + assert start is not None + assert direction is not None + + listener = asyncio.create_task(scan_listener(service, start)) + msgid = await service.scan_start(direction) + await listener + + # Get the current frequency to make sure there's been a change + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + assert resp.data['frequency'] != start + + +@pytest.mark.parametrize('scan_setup', [{'frequency': 'max'}], indirect=True) +async def test_scan_start_wrap_high(event_loop, service: rs, scan_setup): + assert 'playing' in scan_setup.keys() + assert scan_setup['playing'] == True + assert 'start' in scan_setup.keys() + start = scan_setup['start'] + assert start is not None + + listener = asyncio.create_task(scan_listener(service, start)) + msgid = await service.scan_start('FORWARD') + await listener + + # Get the current frequency to make sure there's been a change + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + freq = resp.data['frequency'] + assert freq != start and freq < start + + +@pytest.mark.parametrize('scan_setup', [{'frequency': 'min'}], indirect=True) +async def test_scan_start_wrap_low(event_loop, service: rs, scan_setup): + assert 'playing' in scan_setup.keys() + assert scan_setup['playing'] == True + assert 'start' in scan_setup.keys() + start = scan_setup['start'] + assert start is not None + + listener = asyncio.create_task(scan_listener(service, start)) + msgid = await service.scan_start('BACKWARD') + await listener + + # Get the current frequency to make sure there's been a change + msgid = await service.frequency() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + assert 'frequency' in resp.data.keys() + freq = resp.data['frequency'] + assert freq != start and freq > start + + +async def test_scan_stop(event_loop, service: rs, scan_setup): + assert 'playing' in scan_setup.keys() + assert scan_setup['playing'] == True + + # NOTE: Without specific mocking support in the binding, testing + # stopping during a scan does not seem practical currently. + msgid = await service.scan_stop() + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + +async def test_subscribe_all(event_loop, service: rs): + for e in events: + msgid = await service.subscribe(e) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' + + +async def test_unsubscribe_all(event_loop, service: rs): + for e in events: + msgid = await service.unsubscribe(e) + resp = await service.afbresponse() + assert resp.msgid == msgid + assert resp.status == 'success' |