From dcc2f91e5ab3b70202e78ec7164a8e9d7501d70c Mon Sep 17 00:00:00 2001 From: Edi Feschiyan Date: Thu, 22 Oct 2020 12:57:43 +0300 Subject: Add HVAC bindings and tests The current implementation of agl-service-hvac does not have test widgets implemented as reference point so these tests are new. Writing to verbs related to changing LED brightness results in I2C errors and therefore the tests are marked as hwrequired and xfail, as no hardware demo units are available to test with. The markers will stay until it is decided whether it is enough for testing or mock tests should be implemented. Bug-AGL: SPEC-3660 Signed-off-by: Edi Feschiyan Change-Id: Ibc051c6b4a06e0d126be9f8a0e0efe5876244671 --- pyagl/services/hvac.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++ pyagl/tests/test_hvac.py | 74 ++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 pyagl/services/hvac.py create mode 100644 pyagl/tests/test_hvac.py diff --git a/pyagl/services/hvac.py b/pyagl/services/hvac.py new file mode 100644 index 0000000..324ed11 --- /dev/null +++ b/pyagl/services/hvac.py @@ -0,0 +1,102 @@ +# Copyright (C) 2020 Konsulko Group +# Author: Edi Feschiyan +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyagl.services.base import AGLBaseService, AFBResponse +import asyncio +import os +import json + +class HVACService(AGLBaseService): + service = 'agl-service-hvac' + parser = AGLBaseService.getparser() + parser.add_argument('--temp_left_zone', action='store_true') + parser.add_argument('--temp_right_zone', action='store_true') + parser.add_argument('--temp_left_led', action='store_true') + parser.add_argument('--temp_right_led', action='store_true') + parser.add_argument('--get_fanspeed', action='store_true') + parser.add_argument('--get', action='store_true') + parser.add_argument('--set') # this accepts JSON formatted string, beware with shell input and quote escaping + # python3 -m pyagl.services.hvac --set "{\"FanSpeed\": 100}" + + def __init__(self, ip, port=None, service='agl-service-hvac'): + super().__init__(api='hvac', ip=ip, port=port, service=service) + + async def get_temp_left_zone(self): + return await self.request('get_temp_left_zone') + + async def get_temp_right_zone(self): + return await self.request('get_temp_right_zone') + + async def get_fanspeed(self): + return await self.request('get_fanspeed') + + async def temp_left_zone_led(self): + return await self.request('temp_left_zone_led') + + async def temp_right_zone_led(self): + return await self.request('temp_right_zone_led') + + async def get(self, values=None): + return await self.request('get', values) + + async def set(self, values): + return await self.request('set', values) + + +async def main(loop): + args = HVACService.parser.parse_args() + hvs = await HVACService(args.ipaddr) + + if args.loglevel: + hvs.logger.setLevel(args.loglevel) + + if args.temp_left_zone: + msgid = await hvs.get_temp_left_zone() + resp = await hvs.afbresponse() + print(resp) + + if args.temp_right_zone: + msgid = await hvs.get_temp_right_zone() + resp = await hvs.afbresponse() + print(resp) + + if args.get_fanspeed: + msgid = await hvs.get_fanspeed() + resp = await hvs.afbresponse() + print(resp) + + if args.temp_left_led: + msgid = await hvs.temp_left_zone_led() + resp = await hvs.afbresponse() + print(resp) + + if args.temp_right_led: + msgid = await hvs.temp_right_zone_led() + resp = await hvs.afbresponse() + print(resp) + + if args.get: + msgid = await hvs.get(args.get) + resp = await hvs.afbresponse() + print(resp) + + if args.set: + msgid = await hvs.set(json.loads(args.set)) + resp = await hvs.afbresponse() + print(resp) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main(loop)) diff --git a/pyagl/tests/test_hvac.py b/pyagl/tests/test_hvac.py new file mode 100644 index 0000000..b265819 --- /dev/null +++ b/pyagl/tests/test_hvac.py @@ -0,0 +1,74 @@ +import asyncio +import os +import pytest +import logging +from pyagl.services.base import AFBResponse, AFBT + +from pyagl.services.hvac import HVACService as hvs +pytestmark = [pytest.mark.asyncio, pytest.mark.hvac] + + +@pytest.fixture(scope='module') +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + + +@pytest.fixture(scope='module') +async def service(): + address = os.environ.get('AGL_TGT_IP', 'localhost') + port = os.environ.get('AGL_TGT_PORT', None) + svc = await hvs(ip=address, port=port) + yield svc + await svc.websocket.close() + + +async def test_get_temp_left_zone(event_loop, service: hvs): + msgid = await service.get_temp_left_zone() + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'LeftTemperature' in resp.data + + +async def test_get_temp_right_zone(event_loop, service: hvs): + msgid = await service.get_temp_right_zone() + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'RightTemperature' in resp.data + + +async def test_get_fanspeed(event_loop, service: hvs): + msgid = await service.get_fanspeed() + resp = await service.afbresponse() + assert resp.status == 'success' + assert 'FanSpeed' in resp.data + + +@pytest.mark.hwrequired +@pytest.mark.xfail(reason='This fails with I2C error due to missing /sys/class/leds/blinkm-3-9-[red,blue,green]') +async def test_temp_left_zone_led(event_loop, service: hvs): + msgid = await service.temp_left_zone_led() + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +@pytest.mark.hwrequired +@pytest.mark.xfail(reason='This fails with I2C error due to missing /sys/class/leds/blinkm-3-9-[red,blue,green]') +async def test_temp_right_zone_led(event_loop, service: hvs): + msgid = await service.temp_right_zone_led() + resp = await service.afbresponse() + assert resp.status == 'success', resp.info + + +async def test_get(event_loop, service: hvs): + msgid = await service.get() + resp = await service.afbresponse() + for property in ['FanSpeed', 'LeftTemperature', 'RightTemperature']: + assert property in resp.data + + +async def test_set(event_loop, service: hvs): + msgid = await service.set({'FanSpeed': 15}) + resp = await service.afbresponse() + assert resp.status == 'success' + -- cgit 1.2.3-korg