diff options
author | Edi Feschiyan <edi.feschiyan@konsulko.com> | 2020-04-15 15:04:02 +0300 |
---|---|---|
committer | Edi Feschiyan <edi.feschiyan@konsulko.com> | 2020-04-15 15:04:02 +0300 |
commit | 7519a78657685f14f93983f474d75ae0efb4a2ad (patch) | |
tree | 29461ca66ec4b07e4d2ab89e25c2c5aaf34049e6 | |
parent | 43d73e9cd7fe1ee5f08a8a62feb96c35750ca588 (diff) |
Cleaning up services' classes
-rw-r--r-- | aglbaseservice.py | 68 | ||||
-rw-r--r-- | bluetooth.py | 36 | ||||
-rw-r--r-- | geoclue.py | 19 | ||||
-rw-r--r-- | gps.py | 84 | ||||
-rw-r--r-- | mediaplayer.py | 51 |
5 files changed, 95 insertions, 163 deletions
diff --git a/aglbaseservice.py b/aglbaseservice.py index 81a868e..fb905ca 100644 --- a/aglbaseservice.py +++ b/aglbaseservice.py @@ -9,10 +9,12 @@ from websockets import connect import logging import asyncssh import re -from parse import * - +from parse import Result, parse from typing import Union +logging.getLogger('AGLBaseService') +logging.basicConfig(level=logging.DEBUG) + class AFBT(IntEnum): REQUEST = 2, RESPONSE = 3, @@ -37,6 +39,7 @@ class AGLBaseService: token = None uuid = None service = None + logger = None def __init__(self, api: str, ip: str, port: str = None, url: str = None, token: str = 'HELLO', uuid: str = 'magic', service: str = None): @@ -47,6 +50,7 @@ class AGLBaseService: self.token = token self.uuid = uuid self.service = service + self.logger = logging.getLogger(service) def __await__(self): return self._async_init().__await__() @@ -57,6 +61,14 @@ class AGLBaseService: async def _async_init(self): # setting ping_interval to None because AFB does not support websocket ping # if set to !None, the library will close the socket after the default timeout + if self.port is None: + serviceport = await self.portfinder() + if serviceport is not None: + self.port = serviceport + else: + self.logger('Unable to find port') + exit(1) + URL = f'ws://{self.ip}:{self.port}/api?token={self.token}&uuid={self.uuid}' self._conn = connect(close_timeout=0, uri=URL, subprotocols=['x-afb-ws-json1'], ping_interval=None) self.websocket = await self._conn.__aenter__() @@ -76,44 +88,51 @@ class AGLBaseService: async def portfinder(self): async with asyncssh.connect(self.ip, username='root') as c: - servicename = await c.run(f"systemctl --all | grep {self.service} | awk '{{print $1}}'", check=False) + servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False) if self.service not in servicename.stdout: - print(f"Unable to find service matching pattern '{self.service}'") + logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND") exit(1) - # TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util - print(f"Found service name: {servicename.stdout.strip()}") - pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}') + pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}') pid = int(pidres.stdout.strip(), 10) if pid is 0: - print(f'Service {servicename.stdout.strip()} is stopped') + logging.warning(f'Service {servicename.stdout.strip()} is stopped') return None else: - print(f'Service PID: {pidres.stdout.strip()}') + logging.debug(f'Service PID: {str(pid)}') - sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket') + sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket') inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout)) + logging.debug(f"Socket inodes: {inodes}") - print(f"Socket inodes: {inodes}") - - alltcp = await c.run('cat /proc/net/tcp') - fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \ + procnettcp = await c.run('cat /proc/net/tcp') + fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\ ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}' - tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]] - parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None] + tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]] + # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines + parsedtcpsockets = [] + for l in tcpsockets: + res = parse(fieldsstr, l) + if isinstance(res, Result): + parsedtcpsockets.append(res) + socketinodesbythisprocess = [l for l in parsedtcpsockets if - l is isinstance(l, Result) and l.named['inode'] in inodes] + isinstance(l, Result) and + l.named['inode'] in inodes and + # 0A is listening state for the socket + l.named['st'] == '0A'] + for s in socketinodesbythisprocess: _, port = tuple(parse('{}:{}', s['local_address'])) port = int(port, 16) - if port > 30000: - print(f'found port {port}') + if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy + logging.debug(f'Service running at port {port}') return port async def listener(self): try: while True: msg = await self.receive() - print(f"Received {msg}") + # self.logger.debug(f"Received websocket{msg}") try: data = json.loads(msg) if isinstance(data, list): @@ -123,18 +142,19 @@ class AGLBaseService: addresponse(msgid, data) except JSONDecodeError: - print("Not decoding a non-json message") + self.logger.warning("Not decoding a non-json message") except KeyboardInterrupt: - print("Received keyboard interrupt, exiting") + logging.debug("Received keyboard interrupt, exiting") except asyncio.CancelledError: - print("Websocket listener coroutine stopped") + logging.warning("Websocket listener coroutine stopped") except Exception as e: - print("Unhandled seal: " + str(e)) + logging.error("Unhandled seal: " + str(e)) async def request(self, verb: str, values: Union[str, dict] = "", msgid: int = randint(0, 9999999), waitresponse: bool = False): l = json.dumps([AFBT.REQUEST, str(msgid), f'{self.api}/{verb}', values]) + self.logger.debug(f'[AGL] <- {l}') await self.send(l) if waitresponse: return await self.receive() diff --git a/bluetooth.py b/bluetooth.py index c5394bb..95aa51d 100644 --- a/bluetooth.py +++ b/bluetooth.py @@ -3,6 +3,7 @@ import json import os from random import randint from aglbaseservice import AGLBaseService +import logging Verbs = ['subscribe', 'unsubscribe', 'managed_objects', 'adapter_state', 'default_adapter', 'avrcp_controls', 'connect', 'disconnect', 'pair', 'cancel_pairing', 'confirm_pairing', 'remove_device'] @@ -11,8 +12,8 @@ BTEventType = ['adapter_changes', 'device_changes', 'media', 'agent'] class BluetoothService(AGLBaseService): - def __init__(self, ip, port): - super().__init__(api='Bluetooth-Manager', ip=ip, port=port) + def __init__(self, ip, port=None, service='agl-service-bluetooth'): + super().__init__(api='Bluetooth-Manager', ip=ip, port=port, service=service) async def subscribe(self, event='device_changes'): await super().subscribe(event=event) @@ -20,30 +21,21 @@ class BluetoothService(AGLBaseService): async def unsubscribe(self, event='device_changes'): await super().unsubscribe(event=event) - async def managed_objects(self): - verb = 'managed_objects' - msgid = randint(0, 999999) - msg = f'[2,"{msgid}","{self.api}/{verb}",""]' - # print(msg) - await self.send(msg) - return await self.receive() + async def managed_objects(self, waitresponse=False): + return await self.request('managed_objects', waitresponse=waitresponse) - async def adapter_state(self, param=None, value=None): - verb = 'adapter_state' - msgid = randint(0, 999999) - if param: - p = {'adapter': param} + async def adapter_state(self, adapter=None, value=None, waitresponse=False): + p = {} + if adapter: + p = {'adapter': adapter} if isinstance(value, dict): p = {**p, **value} - # msg = f'[2,"{msgid}","{self.api}/{verb}","{param}": {value if value is not None else ""}]' - msg = f'[2,"{msgid}","{self.api}/{verb}", {json.dumps(p)}]' + return await self.request('adapter_state', p, waitresponse=waitresponse) + if waitresponse: + return await self.request('adapter_state', p) else: - msg = f'[2,"{msgid}","{self.api}/{verb}", ""]' - - print(msg) - await self.send(msg) - return await self.receive() + await self.request('adapter_state', p) async def default_adapter(self): verb = 'default_adapter' @@ -87,7 +79,7 @@ class BluetoothService(AGLBaseService): async def main(loop): addr = os.environ.get('AGL_TGT_IP', 'localhost') - port = os.environ.get('AGL_TGT_PORT', '30005') + #port = os.environ.get('AGL_TGT_PORT', '30005') BTS = await BluetoothService(ip=addr, port=port) print(await BTS.adapter_state('hci1', {'uuids': ['0000110e-0000-1000-8000-00805f9b34fb']})) @@ -4,15 +4,16 @@ from aglbaseservice import AGLBaseService class GeoClueService(AGLBaseService): - def __init__(self, ip, port, api='geoclue'): - super().__init__(ip=ip, port=port, api=api) + def __init__(self, ip, port=None, api='geoclue', service='agl-service-geoclue'): + super().__init__(ip=ip, port=port, api=api, service=service) - async def location(self): - verb = 'location' - msgid = randint(0, 999999) - - await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]') - return await self.receive() + async def location(self, waitresponse=False): + return await self.request('location', waitresponse=waitresponse) + # verb = 'location' + # msgid = randint(0, 999999) + # + # await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]') + # return await self.receive() async def subscribe(self, event='location'): super().subscribe(event=event) @@ -22,7 +23,7 @@ class GeoClueService(AGLBaseService): async def main(loop): - GCS = await GeoClueService(ip='192.168.234.202', port='30009') + GCS = await GeoClueService(ip='192.168.128.13') print(await GCS.location()) listener = loop.create_task(GCS.listener()) await listener @@ -8,11 +8,11 @@ import re import argparse class GPSService(AGLBaseService): - def __init__(self, ip, port): + def __init__(self, ip, port=None): super().__init__(api='gps', ip=ip, port=port, service='agl-service-gps') async def location(self): - return await self.request('location',waitresponse=True) + return await self.request('location', waitresponse=True) async def subscribe(self, event='location'): await super().subscribe(event=event) @@ -24,83 +24,13 @@ class GPSService(AGLBaseService): async def main(loop): addr = os.environ.get('AGL_TGT_IP', 'localhost') port = os.environ.get('AGL_TGT_PORT', '30011') + jsonpayload = os.environ.get('AGL_TGT_JSON_PAYLOAD', None) - # gpss = await GPSService(ip=addr, port=port) - async with asyncssh.connect(addr, username='root') as c: - # find the name of the service since it is dynamically generated every time - #TODO CHANGE ME to use the name of the service dynamically after cleaning this crap here - servicestr = 'agl-service-gps' - servicename = await c.run(f"systemctl --all | grep {servicestr} | awk '{{print $1}}'", check=False) - if servicestr not in servicename.stdout: - print(f"Unable to find service matching pattern '{servicestr}'") + gpss = await GPSService(addr) + print(await gpss.location()) + listener = loop.create_task(gpss.listener()) + await listener - #TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util - print(f"Found service name: {servicename.stdout.strip()}") - # get the pid - pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}') - pid = int(pidres.stdout.strip(), 10) - if pid is 0: - print(f'Service {servicename.stdout.strip()} is stopped') - exit(1) - else: - print(f'Service PID: {pidres.stdout.strip()}') - - # get all sockets in the process' fd directory and their respective inodes - sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket') - inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout)) - - print(f"Socket inodes: {inodes}") - - alltcp = await c.run('cat /proc/net/tcp') - # fieldsstr = ' '.join(alltcp.stdout.strip().splitlines()[0].strip().split()) + ' sref_cnt memloc rto pred_sclk ack_quick congest slowstart' - - # https://www.kernel.org/doc/Documentation/networking/proc_net_tcp.txt - # ['sl', 'local_address', 'rem_address', 'st', 'tx_queue:rx_queue', 'tr:tm->when', 'retrnsmt', 'uid', - # '0: 00000000:753E 00000000:0000 0A 00000000:00000000 00:00000000 00000000 1001 - - # 'timeout', 'inode', 'sref_cnt', 'memloc', 'rto', 'pred_sclk', 'ackquick', 'congest', 'slowstart' ] - # 0 20062 1 0000000095c038d6 100 0 0 10 0' - # fields = fieldsstr.split() - - fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \ - ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}' - tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]] - - # seen once an irregular line "65: 0D80A8C0:D5BE 8410A6BC:0050 06 00000000:00000000 03:000000F8 00000000 0 0 0 3 0000000083dad9fb" - # parsing could break at some point, because returns None and cannot be parsed - - parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None] - socketinodesbythisprocess = [l for l in parsedtcpsockets if l is isinstance(l,Result) and l.named['inode'] in inodes] - # got dem sockets - # expecting >1 because the process could be listening on 8080, all api services' ports are in 30000 port range - for s in socketinodesbythisprocess: - _, port = tuple(parse('{}:{}', s['local_address'])) - port = int(port,16) - if port > 30000: - print(f'found port {port}') - break - - - - #thesocketswearelookingfor = list(filter(lambda x: ( l for l in parsed if l.named['inode'] in inodes), inodes )) - - # result = parse(fieldsstr, l) - # if isinstance(Result, result): - # result.named['inode'] in inodes - # - # print(result) - - # print(' '.join(alltcp.stdout.strip().splitlines()[1].strip().split())) - # result = findall('{}: {}:{} {} {} {} {} {} {} ') - - # serviceport = await c.run(f'journalctl -u {servicename.stdout}') - # print(serviceport.stdout) - # matches = re.findall('Listening interface \*:(.*) \[',serviceport.stdout) - - print("breaketh pointeth h're") - # print(await gpss.location()) - - # listener = loop.create_task(gpss.listener()) if __name__ == '__main__': loop = asyncio.get_event_loop() diff --git a/mediaplayer.py b/mediaplayer.py index 21f9801..e7059b6 100644 --- a/mediaplayer.py +++ b/mediaplayer.py @@ -1,32 +1,22 @@ -import json -from json import JSONDecodeError import os -import sys import asyncio -from random import randint +import json -from websockets import connect, ConnectionClosedError -import concurrent -from enum import IntEnum from aglbaseservice import AGLBaseService -global DEBUG -DEBUG = True - class MediaPlayerService(AGLBaseService): def __await__(self): return super()._async_init().__await__() - def __init__(self, ip, port): - super().__init__(api='mediaplayer', ip=ip, port=port) + def __init__(self, ip, port = None): + super().__init__(api='mediaplayer', ip=ip, port=port, service='agl-service-mediaplayer') - async def playlist(self): - self.request('playlist') - verb = 'playlist' - msgid = randint(0, 999999) - - await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]') + async def playlist(self, waitresponse=False): + if waitresponse: + return await self.request('playlist', waitresponse=waitresponse) + else: + await self.request('playlist') async def subscribe(self, event='metadata'): await super().subscribe(event=event) @@ -37,7 +27,6 @@ class MediaPlayerService(AGLBaseService): async def control(self, name, value=None): verb = 'controls' loopstate = ['off', 'playlist', 'track'] - controls = { 'play': None, 'pause': None, @@ -50,46 +39,46 @@ class MediaPlayerService(AGLBaseService): 'volume': 'volume', 'loop': 'state' } - assert name in controls.keys(), 'Tried to use non-existant {name} as control for {self.api}' + assert name in controls.keys(), 'Tried to use non-existent {name} as control for {self.api}' - msgid = randint(0, 999999) if name in ['play', 'pause', 'previous', 'next']: - msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}"}}]' + msg = {'value': name} elif name in ['seek', 'fast-forward', 'rewind']: assert value > 0, "Tried to seek with negative integer" - msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "position": "{str(value)}"}}]' + msg = {'value': name, controls[name]: str(value)} elif name == 'pick-track': assert type(value) is int, "Try picking a song with an integer" assert value > 0, "Tried to pick a song with a negative integer" - msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "index": {str(value)}}}]' + msg = {'value': name, controls[value]: str(value)} elif name == 'volume': assert type(value) is int, "Try setting the volume with an integer" assert value > 0, "Tried to set the volume with a negative integer, use values betwen 0-100" assert value < 100, "Tried to set the volume over 100%, use values betwen 0-100" - msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "{name}": {str(value)}}}]' + msg = {'value': name, name: str(value)} elif name == 'loop': assert value in loopstate, f'Tried to set invalid loopstate - {value}, use "off", "playlist" or "track"' - msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "{controls[name]}": {str(value)}}}]' + msg = {'value': name, controls[name]: str(value)} - await self.send(msg) + await self.request(verb, msg) async def main(loop): addr = os.environ.get('AGL_TGT_IP', '192.168.234.202') - port = os.environ.get('AGL_TGT_PORT', '30016') + port = os.environ.get('AGL_TGT_PORT', None) MPS = await MediaPlayerService(ip=addr, port=port) - listener = loop.create_task(MPS.listener()) + # listener = loop.create_task(MPS.listener()) try: await MPS.subscribe('metadata') + print(await MPS.playlist(waitresponse=True)) await MPS.control('next') - await listener + # await listener except KeyboardInterrupt: pass - listener.cancel() + # listener.cancel() await MPS.unsubscribe('playlist') |