From e190fff1a49499fc9e07931c641d8d110c3a595b Mon Sep 17 00:00:00 2001 From: Edi Feschiyan Date: Fri, 22 May 2020 15:09:59 +0300 Subject: Working base class, hopefully final reworks for the services --- aglbaseservice.py | 59 +++++++++++++++++------- geoclue.py | 33 ++++++++++---- gps.py | 32 ++++++++----- mediaplayer.py | 133 +++++++++++++++++++++++++++++++++++++++++++----------- test_gps.py | 38 ++++++---------- weather.py | 31 +++++++++++-- 6 files changed, 234 insertions(+), 92 deletions(-) diff --git a/aglbaseservice.py b/aglbaseservice.py index 7f532a5..bc4b1d2 100644 --- a/aglbaseservice.py +++ b/aglbaseservice.py @@ -17,6 +17,7 @@ import re # logging.getLogger('AGLBaseService') # logging.basicConfig(level=logging.DEBUG) +# AFB message type class AFBT(IntEnum): REQUEST = 2, RESPONSE = 3, @@ -27,13 +28,11 @@ msgq = {} AFBLEN = 3 -def betterrand(): - bs = os.urandom(5) - result = bs[0] * bs[1] * bs[2] * bs[3] + bs[4] +def newrand(): while True: - yield result bs = os.urandom(5) - result = bs[0]*bs[1]*bs[2]*bs[3]+bs[4] + result = bs[0] * bs[1] * bs[2] * bs[3] + bs[4] + yield result def addrequest(msgid, msg): msgq[msgid] = {'request': msg, 'response': None} @@ -47,26 +46,53 @@ class AFBResponse: msgid: int data: dict api: str + status: str + info: str def __init__(self, data: list): if type(data[0]) is not int: logging.debug(f'Received a response with non-integer message type {binascii.hexlify(data[0])}') raise ValueError('Received a response with non-integer message type') - if data[0] not in [AFBT.RESPONSE, AFBT.ERROR, AFBT.ERROR]: + if data[0] not in AFBT._value2member_map_: raise ValueError(f'Received a response with invalid message type {data[0]}') - self.type = data[0] + self.type = AFBT(data[0]) + if self.type == AFBT.RESPONSE: + if 'request' not in data[2]: + logging.error(f'Received malformed or invalid response without "request" dict - {data}') if not str.isnumeric(data[1]): raise ValueError(f'Received a response with non-numeric message id {data[1]}') - self.msgid = int(data[1]) + else: + self.msgid = int(data[1]) + self.status = data[2]['request']['status'] + if 'info' in data[2]['request']: + self.info = data[2]['request']['info'] + if 'response' in data[2]: + self.data = data[2]['response'] + elif self.type == AFBT.EVENT: self.api = data[1] + if 'data' in data[2]: + self.data = data[2]['data'] + elif self.type == AFBT.ERROR: logging.debug(f'AFB returned erroneous response {data}') - raise ValueError(f'AFB returned erroneous response {data}') - if 'request' not in data[2] or 'response' not in data[2]: - logging.error('Received malformed or invalid response') - self.data = data[2] + self.msgid = int(data[1]) + self.status = data[2]['request']['status'] + self.info = data[2]['request']['info'] + # raise ValueError(f'AFB returned erroneous response {data}') + # if 'request' not in data[2] or 'response' not in data[2]: + + if 'response' in data[2]: + self.data = data[2]['response'] + + def __str__(self): # for debugging purposes + if self.type == AFBT.EVENT: + return f'[{self.type.name}][{self.api}][Data: {self.data if hasattr(self, "data") else None}]' + else: + return f'[{self.type.name}][Status: {self.status}][{self.msgid}]' \ + f'[Info: {self.info if hasattr(self,"info") else None}]' \ + f'[Data: {self.data if hasattr(self, "data") else None}]' class AGLBaseService: api: str @@ -80,7 +106,7 @@ class AGLBaseService: @staticmethod def getparser(): - parser = argparse.ArgumentParser(description='Utility to interact with agl-service-gps via it\'s websocket') + parser = argparse.ArgumentParser(description='Utility to interact with agl-service-* via it\'s websocket') parser.add_argument('-l', '--loglevel', help='Level of logging verbosity', default='INFO', choices=list(logging._nameToLevel.keys())) parser.add_argument('ipaddr', default=os.environ.get('AGL_TGT_IP', 'localhost'), help='AGL host address') @@ -186,7 +212,8 @@ class AGLBaseService: async def listener(self, stdout: bool = False): while True: - data = await self.response() + raw = await self.response() + data = AFBResponse(raw) if stdout: print(data) yield data @@ -197,7 +224,7 @@ class AGLBaseService: data = json.loads(msg) self.logger.debug('[AGL] -> ' + msg) if isinstance(data, list): - + # check whether the received response is an answer to previous query and queue it for debugging if len(data) == AFBLEN and data[0] == AFBT.RESPONSE and str.isnumeric(data[1]): msgid = int(data[1]) if msgid in msgq: @@ -214,7 +241,7 @@ class AGLBaseService: self.logger.error("Unhandled seal: " + str(e)) async def request(self, verb: str, values: Union[str, dict] = "", msgid: int = None): - msgid = next(betterrand()) if msgid is None else msgid + msgid = next(newrand()) if msgid is None else msgid l = json.dumps([AFBT.REQUEST, str(msgid), f'{self.api}/{verb}', values]) self.logger.debug(f'[AGL] <- {l}') await self.send(l) diff --git a/geoclue.py b/geoclue.py index 1578b74..df1afd6 100644 --- a/geoclue.py +++ b/geoclue.py @@ -1,9 +1,13 @@ -from aglbaseservice import AGLBaseService +from aglbaseservice import AGLBaseService, AFBResponse import asyncio import os class GeoClueService(AGLBaseService): + service = 'agl-service-geoclue' + parser = AGLBaseService.getparser() + parser.add_argument('--location', help='Get current location', action='store_true') + def __init__(self, ip, port=None, api='geoclue'): super().__init__(ip=ip, port=port, api=api, service='agl-service-geoclue') @@ -11,20 +15,29 @@ class GeoClueService(AGLBaseService): return await self.request('location') async def subscribe(self, event='location'): - await super().subscribe(event=event) + return await super().subscribe(event=event) async def unsubscribe(self, event='location'): - await super().unsubscribe(event=event) + return await super().unsubscribe(event=event) async def main(loop): - addr = os.environ.get('AGL_TGT_IP', 'localhost') - GCS = await GeoClueService(ip=addr) - tasks = [] - tasks.append(loop.create_task(GCS.location())) - tasks.append(loop.create_task(GCS.listener())) - loop.run_until_complete(tasks) - + args = GeoClueService.parser.parse_args() + gcs = await GeoClueService(args.ipaddr) + + if args.location: + id = await gcs.location() + print(f'Sent location request with messageid {id}') + print(AFBResponse(await gcs.response())) + + if args.subscribe: + for event in args.subscribe: + id = await gcs.subscribe(event) + print(f"Subscribed for {event} with messageid {id}") + print(AFBResponse(await gcs.response())) + if args.listener: + async for response in gcs.listener(): + print(response) if __name__ == '__main__': loop = asyncio.get_event_loop() diff --git a/gps.py b/gps.py index 61e3153..c11b2a8 100644 --- a/gps.py +++ b/gps.py @@ -1,14 +1,13 @@ -from aglbaseservice import AGLBaseService +from aglbaseservice import AGLBaseService, AFBResponse import asyncio import os -from concurrent import futures -xc = futures.ThreadPoolExecutor(1) class GPSService(AGLBaseService): service = 'agl-service-gps' parser = AGLBaseService.getparser() - parser.add_argument('--location', help='Query location verb', action='store_true') + parser.add_argument('--record', help='Begin recording verb ') + parser.add_argument('--location', help='Get current location', action='store_true') def __init__(self, ip, port=None): super().__init__(api='gps', ip=ip, port=port, service='agl-service-gps') @@ -16,6 +15,9 @@ class GPSService(AGLBaseService): async def location(self): return await self.request('location') + async def record(self, state='on'): + return await self.request('record', {'state': state}) + async def subscribe(self, event='location'): return await super().subscribe(event=event) @@ -27,19 +29,27 @@ async def main(loop): args = GPSService.parser.parse_args() gpss = await GPSService(args.ipaddr) - l = await loop.run_in_executor(xc, gpss.listener) - # print(await r.__anext__()) - if args.loglevel: gpss.logger.setLevel(args.loglevel) + + if args.record: + id = await gpss.record(args.record) + print(f'Sent gps record request with value {args.record} with messageid {id}') + print(AFBResponse(await gpss.response())) + if args.location: msgid = await gpss.location() - print(await gpss.receive()) + print(AFBResponse(await gpss.response())) if args.subscribe: - await gpss.subscribe(args.subscribe) - - print(await l.__anext__()) + for event in args.subscribe: + id = await gpss.subscribe(event) + print(f'Subscribed for event {event} with messageid {id}') + print(AFBResponse(await gpss.response())) + + if args.listener: + async for response in gpss.listener(): + print(response) if __name__ == '__main__': loop = asyncio.get_event_loop() diff --git a/mediaplayer.py b/mediaplayer.py index 3466d06..9291981 100644 --- a/mediaplayer.py +++ b/mediaplayer.py @@ -1,26 +1,52 @@ -from aglbaseservice import AGLBaseService +from aglbaseservice import AGLBaseService, AFBResponse +from typing import Union +import logging import asyncio import os +class AFBMediaPlayerResponse(AFBResponse): + status: str + info: str + data = None + + def __init__(self, data: AFBResponse): + if isinstance(data, list): + super().__init__(data) + self.msgid = data.msgid + self.type = data.type + self.data = data.data + class MediaPlayerService(AGLBaseService): + service = 'agl-service-mediaplayer' + parser = AGLBaseService.getparser() + parser.add_argument('--playlist', help='Get current playlist', action='store_true') + parser.add_argument('--control', help='Play/Pause/Previous/Next') + parser.add_argument('--seek', help='Seek time through audio track', metavar='msec', type=int) + parser.add_argument('--rewind', help='Rewind time', metavar='msec', type=int) + parser.add_argument('--fastforward', help='Fast forward time', metavar='msec', type=int) + parser.add_argument('--picktrack', help='Play specific track in the playlist', metavar='index', type=int) + parser.add_argument('--volume', help='Volume control - <1-100>', metavar='int') + parser.add_argument('--loop', help='Set loop state - ', metavar='string') + parser.add_argument('--avrcp', help='AVRCP Controls') def __await__(self): return super()._async_init().__await__() - def __init__(self, ip, port = None): + def __init__(self, ip, port=None): super().__init__(api='mediaplayer', ip=ip, port=port, service='agl-service-mediaplayer') async def playlist(self): return await self.request('playlist') async def subscribe(self, event='metadata'): - await super().subscribe(event=event) + return await super().subscribe(event=event) async def unsubscribe(self, event='metadata'): - await super().subscribe(event=event) + return await super().subscribe(event=event) async def control(self, name, value=None): loopstate = ['off', 'playlist', 'track'] + avrcp_controls = ['next', 'previous', 'play', 'pause'] controls = { 'play': None, 'pause': None, @@ -31,19 +57,20 @@ class MediaPlayerService(AGLBaseService): 'rewind': 'position', 'pick-track': 'index', 'volume': 'volume', - 'loop': 'state' + 'loop': 'state', + # 'avrcp_controls': 'value' } - assert name in controls.keys(), 'Tried to use non-existent {name} as control for {self.api}' - + assert name in controls.keys(), f'Tried to use non-existent {name} as control for {self.api}' + msg = None if name in ['play', 'pause', 'previous', 'next']: msg = {'value': name} elif name in ['seek', 'fast-forward', 'rewind']: - assert value > 0, "Tried to seek with negative integer" + #assert value > 0, "Tried to seek with negative integer" msg = {'value': name, controls[name]: str(value)} elif name == 'pick-track': assert type(value) is int, "Try picking a song with an integer" assert value > 0, "Tried to pick a song with a negative integer" - msg = {'value': name, controls[value]: str(value)} + msg = {'value': name, controls[name]: str(value)} elif name == 'volume': assert type(value) is int, "Try setting the volume with an integer" assert value > 0, "Tried to set the volume with a negative integer, use values betwen 0-100" @@ -52,28 +79,80 @@ class MediaPlayerService(AGLBaseService): elif name == 'loop': assert value in loopstate, f'Tried to set invalid loopstate - {value}, use "off", "playlist" or "track"' msg = {'value': name, controls[name]: str(value)} + # elif name == 'avrcp_controls': + # msg = {'value': name, } + assert msg is not None, "Congratulations, somehow you made an invalid control request" - await self.request('controls', msg) + return await self.request('controls', msg) async def main(loop): - addr = os.environ.get('AGL_TGT_IP', '192.168.234.202') - port = os.environ.get('AGL_TGT_PORT', None) - - MPS = await MediaPlayerService(ip=addr, port=port) - # listener = loop.create_task(MPS.listener()) - try: - await MPS.subscribe('metadata') - await MPS.playlist() - await MPS.control('next') - - # await listener - - except KeyboardInterrupt: - pass - - # listener.cancel() - await MPS.unsubscribe('playlist') + args = MediaPlayerService.parser.parse_args() + MPS = await MediaPlayerService(ip=args.ipaddr) + + if args.playlist: + id = await MPS.playlist() + r = AFBResponse(await MPS.response()) + for l in r.data['list']: print(l) + + if args.control: + id = await MPS.control(args.control) + print(f'Sent {args.control} request with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.seek: + id = await MPS.control('seek', args.seek) + print(f'Sent seek request to {args.seek} msec with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.fastforward: + id = await MPS.control('fast-forward', args.fastforward) + print(f'Sent fast-forward request for {args.fastforward} msec with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.rewind: + id = await MPS.control('rewind', -args.rewind) + print(f'Sent rewind request for {args.rewind} msec with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.picktrack: + id = await MPS.control('pick-track', args.picktrack) + print(f'Sent pick-track request with index {args.rewind} with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.volume: + id = await MPS.control('volume', int(args.volume)) + print(f'Sent volume request: {args.rewind} with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + if args.loop: + id = await MPS.control('loop', args.loop) + print(f'Sent loop-state request: {args.loop} with messageid {id}') + r = AFBResponse(await MPS.response()) + print(r) + + # if args.avrcp: + # id = await MPS.control('avrcp_controls', args.avrcp) + # print(f'Sent AVRCP control request: {args.loop} with messageid {id}') + # r = AFBResponse(await MPS.response()) + # print(r) + + if args.subscribe: + for event in args.subscribe: + id = await MPS.subscribe(event) + print(f"Subscribed for event {event} with messageid {id}") + r = await MPS.response() + print(r) + + if args.listener: + async for response in MPS.listener(): + print(response) if __name__ == '__main__': diff --git a/test_gps.py b/test_gps.py index fe27307..43834fe 100644 --- a/test_gps.py +++ b/test_gps.py @@ -2,42 +2,34 @@ import asyncio import os import pytest from gps import GPSService -from concurrent import futures import logging -from functools import partial logger = logging.getLogger('pytest-gps') logger.setLevel(logging.DEBUG) -@pytest.fixture +@pytest.fixture(scope="module") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + +@pytest.fixture(scope='module') async def service(): - event_loop = asyncio.get_running_loop() address = os.environ.get('AGL_TGT_IP', 'localhost') gpss = await GPSService(ip=address) - # gpss = await GPSService(ip=address) yield gpss await gpss.websocket.close() -# @pytest.fixture -# async def listener(service): -# loop = asyncio.get_event_loop() -# xc = futures.ThreadPoolExecutor(1) -# l = await loop.run_in_executor(xc, service.listener) -# while True: -# try: -# yield l.__anext__() -# except RuntimeError: -# xc.shutdown() -# except asyncio.CancelledError: -# logger.warning("Websocket listener coroutine stopped") -# except Exception as e: -# logger.error(e) +@pytest.fixture(scope='module') +async def listener(event_loop, service): + listener = service.listener() @pytest.mark.asyncio -async def test_location(event_loop, service): +async def test_location(event_loop, service: GPSService): await service.location() - r = await service.response() - print(r) - assert True + resp = await service.response() + print(resp) + assert isinstance(resp, list) + assert resp[2]['request']['status'] == 'success', f"location() returned failure; {resp[2]['request']['info']}" diff --git a/weather.py b/weather.py index ef1461a..de0223f 100644 --- a/weather.py +++ b/weather.py @@ -1,26 +1,47 @@ import asyncio -from aglbaseservice import AGLBaseService +import json +from aglbaseservice import AGLBaseService, AFBResponse class WeatherService(AGLBaseService): service = 'agl-service-weather' parser = AGLBaseService.getparser() + parser.add_argument('--current', default=True, help='Request current weather state', action='store_true') parser.add_argument('--apikey', default=False, help='Request weather API Key', action='store_true') def __init__(self, ip, port=None): super().__init__(api='weather', ip=ip, port=port, service='agl-service-weather') + async def current_weather(self): + return await self.request('current_weather', "") + async def apikey(self): return await self.request('api_key', "") async def main(): args = WeatherService.parser.parse_args() - ws = await WeatherService(ip=args.ipaddr) + aws = await WeatherService(ip=args.ipaddr) + if args.current: + id = await aws.current_weather() + resp = AFBResponse(await aws.response()) + print(json.dumps(resp.data, indent=2)) + if args.apikey: - id = await ws.apikey() - resp = await ws.response() - print(resp[2]['response']['api_key']) + id = await aws.apikey() + resp = AFBResponse(await aws.response()) + print(resp.data['api_key']) + + if args.subscribe: + for event in args.subscribe: + id = await aws.subscribe(event) + print(f'Subscribed for event {event} with messageid {id}') + resp = AFBResponse(await aws.response()) + print(resp) + + if args.listener: + async for response in aws.listener(): + print(response) if __name__ == '__main__': loop = asyncio.get_event_loop() -- cgit 1.2.3-korg