aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEdi Feschiyan <edi.feschiyan@konsulko.com>2020-04-15 15:04:02 +0300
committerEdi Feschiyan <edi.feschiyan@konsulko.com>2020-04-15 15:04:02 +0300
commit7519a78657685f14f93983f474d75ae0efb4a2ad (patch)
tree29461ca66ec4b07e4d2ab89e25c2c5aaf34049e6
parent43d73e9cd7fe1ee5f08a8a62feb96c35750ca588 (diff)
Cleaning up services' classes
-rw-r--r--aglbaseservice.py68
-rw-r--r--bluetooth.py36
-rw-r--r--geoclue.py19
-rw-r--r--gps.py84
-rw-r--r--mediaplayer.py51
5 files changed, 95 insertions, 163 deletions
diff --git a/aglbaseservice.py b/aglbaseservice.py
index 81a868e..fb905ca 100644
--- a/aglbaseservice.py
+++ b/aglbaseservice.py
@@ -9,10 +9,12 @@ from websockets import connect
import logging
import asyncssh
import re
-from parse import *
-
+from parse import Result, parse
from typing import Union
+logging.getLogger('AGLBaseService')
+logging.basicConfig(level=logging.DEBUG)
+
class AFBT(IntEnum):
REQUEST = 2,
RESPONSE = 3,
@@ -37,6 +39,7 @@ class AGLBaseService:
token = None
uuid = None
service = None
+ logger = None
def __init__(self, api: str, ip: str, port: str = None, url: str = None,
token: str = 'HELLO', uuid: str = 'magic', service: str = None):
@@ -47,6 +50,7 @@ class AGLBaseService:
self.token = token
self.uuid = uuid
self.service = service
+ self.logger = logging.getLogger(service)
def __await__(self):
return self._async_init().__await__()
@@ -57,6 +61,14 @@ class AGLBaseService:
async def _async_init(self):
# setting ping_interval to None because AFB does not support websocket ping
# if set to !None, the library will close the socket after the default timeout
+ if self.port is None:
+ serviceport = await self.portfinder()
+ if serviceport is not None:
+ self.port = serviceport
+ else:
+ self.logger('Unable to find port')
+ exit(1)
+
URL = f'ws://{self.ip}:{self.port}/api?token={self.token}&uuid={self.uuid}'
self._conn = connect(close_timeout=0, uri=URL, subprotocols=['x-afb-ws-json1'], ping_interval=None)
self.websocket = await self._conn.__aenter__()
@@ -76,44 +88,51 @@ class AGLBaseService:
async def portfinder(self):
async with asyncssh.connect(self.ip, username='root') as c:
- servicename = await c.run(f"systemctl --all | grep {self.service} | awk '{{print $1}}'", check=False)
+ servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False)
if self.service not in servicename.stdout:
- print(f"Unable to find service matching pattern '{self.service}'")
+ logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND")
exit(1)
- # TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util
- print(f"Found service name: {servicename.stdout.strip()}")
- pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}')
+ pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}')
pid = int(pidres.stdout.strip(), 10)
if pid is 0:
- print(f'Service {servicename.stdout.strip()} is stopped')
+ logging.warning(f'Service {servicename.stdout.strip()} is stopped')
return None
else:
- print(f'Service PID: {pidres.stdout.strip()}')
+ logging.debug(f'Service PID: {str(pid)}')
- sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket')
+ sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket')
inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout))
+ logging.debug(f"Socket inodes: {inodes}")
- print(f"Socket inodes: {inodes}")
-
- alltcp = await c.run('cat /proc/net/tcp')
- fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \
+ procnettcp = await c.run('cat /proc/net/tcp')
+ fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\
' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}'
- tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]]
- parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None]
+ tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]]
+ # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines
+ parsedtcpsockets = []
+ for l in tcpsockets:
+ res = parse(fieldsstr, l)
+ if isinstance(res, Result):
+ parsedtcpsockets.append(res)
+
socketinodesbythisprocess = [l for l in parsedtcpsockets if
- l is isinstance(l, Result) and l.named['inode'] in inodes]
+ isinstance(l, Result) and
+ l.named['inode'] in inodes and
+ # 0A is listening state for the socket
+ l.named['st'] == '0A']
+
for s in socketinodesbythisprocess:
_, port = tuple(parse('{}:{}', s['local_address']))
port = int(port, 16)
- if port > 30000:
- print(f'found port {port}')
+ if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy
+ logging.debug(f'Service running at port {port}')
return port
async def listener(self):
try:
while True:
msg = await self.receive()
- print(f"Received {msg}")
+ # self.logger.debug(f"Received websocket{msg}")
try:
data = json.loads(msg)
if isinstance(data, list):
@@ -123,18 +142,19 @@ class AGLBaseService:
addresponse(msgid, data)
except JSONDecodeError:
- print("Not decoding a non-json message")
+ self.logger.warning("Not decoding a non-json message")
except KeyboardInterrupt:
- print("Received keyboard interrupt, exiting")
+ logging.debug("Received keyboard interrupt, exiting")
except asyncio.CancelledError:
- print("Websocket listener coroutine stopped")
+ logging.warning("Websocket listener coroutine stopped")
except Exception as e:
- print("Unhandled seal: " + str(e))
+ logging.error("Unhandled seal: " + str(e))
async def request(self, verb: str, values: Union[str, dict] = "", msgid: int = randint(0, 9999999),
waitresponse: bool = False):
l = json.dumps([AFBT.REQUEST, str(msgid), f'{self.api}/{verb}', values])
+ self.logger.debug(f'[AGL] <- {l}')
await self.send(l)
if waitresponse:
return await self.receive()
diff --git a/bluetooth.py b/bluetooth.py
index c5394bb..95aa51d 100644
--- a/bluetooth.py
+++ b/bluetooth.py
@@ -3,6 +3,7 @@ import json
import os
from random import randint
from aglbaseservice import AGLBaseService
+import logging
Verbs = ['subscribe', 'unsubscribe', 'managed_objects', 'adapter_state', 'default_adapter', 'avrcp_controls',
'connect', 'disconnect', 'pair', 'cancel_pairing', 'confirm_pairing', 'remove_device']
@@ -11,8 +12,8 @@ BTEventType = ['adapter_changes', 'device_changes', 'media', 'agent']
class BluetoothService(AGLBaseService):
- def __init__(self, ip, port):
- super().__init__(api='Bluetooth-Manager', ip=ip, port=port)
+ def __init__(self, ip, port=None, service='agl-service-bluetooth'):
+ super().__init__(api='Bluetooth-Manager', ip=ip, port=port, service=service)
async def subscribe(self, event='device_changes'):
await super().subscribe(event=event)
@@ -20,30 +21,21 @@ class BluetoothService(AGLBaseService):
async def unsubscribe(self, event='device_changes'):
await super().unsubscribe(event=event)
- async def managed_objects(self):
- verb = 'managed_objects'
- msgid = randint(0, 999999)
- msg = f'[2,"{msgid}","{self.api}/{verb}",""]'
- # print(msg)
- await self.send(msg)
- return await self.receive()
+ async def managed_objects(self, waitresponse=False):
+ return await self.request('managed_objects', waitresponse=waitresponse)
- async def adapter_state(self, param=None, value=None):
- verb = 'adapter_state'
- msgid = randint(0, 999999)
- if param:
- p = {'adapter': param}
+ async def adapter_state(self, adapter=None, value=None, waitresponse=False):
+ p = {}
+ if adapter:
+ p = {'adapter': adapter}
if isinstance(value, dict):
p = {**p, **value}
- # msg = f'[2,"{msgid}","{self.api}/{verb}","{param}": {value if value is not None else ""}]'
- msg = f'[2,"{msgid}","{self.api}/{verb}", {json.dumps(p)}]'
+ return await self.request('adapter_state', p, waitresponse=waitresponse)
+ if waitresponse:
+ return await self.request('adapter_state', p)
else:
- msg = f'[2,"{msgid}","{self.api}/{verb}", ""]'
-
- print(msg)
- await self.send(msg)
- return await self.receive()
+ await self.request('adapter_state', p)
async def default_adapter(self):
verb = 'default_adapter'
@@ -87,7 +79,7 @@ class BluetoothService(AGLBaseService):
async def main(loop):
addr = os.environ.get('AGL_TGT_IP', 'localhost')
- port = os.environ.get('AGL_TGT_PORT', '30005')
+ #port = os.environ.get('AGL_TGT_PORT', '30005')
BTS = await BluetoothService(ip=addr, port=port)
print(await BTS.adapter_state('hci1', {'uuids': ['0000110e-0000-1000-8000-00805f9b34fb']}))
diff --git a/geoclue.py b/geoclue.py
index f31d91e..ab4eba2 100644
--- a/geoclue.py
+++ b/geoclue.py
@@ -4,15 +4,16 @@ from aglbaseservice import AGLBaseService
class GeoClueService(AGLBaseService):
- def __init__(self, ip, port, api='geoclue'):
- super().__init__(ip=ip, port=port, api=api)
+ def __init__(self, ip, port=None, api='geoclue', service='agl-service-geoclue'):
+ super().__init__(ip=ip, port=port, api=api, service=service)
- async def location(self):
- verb = 'location'
- msgid = randint(0, 999999)
-
- await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]')
- return await self.receive()
+ async def location(self, waitresponse=False):
+ return await self.request('location', waitresponse=waitresponse)
+ # verb = 'location'
+ # msgid = randint(0, 999999)
+ #
+ # await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]')
+ # return await self.receive()
async def subscribe(self, event='location'):
super().subscribe(event=event)
@@ -22,7 +23,7 @@ class GeoClueService(AGLBaseService):
async def main(loop):
- GCS = await GeoClueService(ip='192.168.234.202', port='30009')
+ GCS = await GeoClueService(ip='192.168.128.13')
print(await GCS.location())
listener = loop.create_task(GCS.listener())
await listener
diff --git a/gps.py b/gps.py
index 217a34a..de4b1ba 100644
--- a/gps.py
+++ b/gps.py
@@ -8,11 +8,11 @@ import re
import argparse
class GPSService(AGLBaseService):
- def __init__(self, ip, port):
+ def __init__(self, ip, port=None):
super().__init__(api='gps', ip=ip, port=port, service='agl-service-gps')
async def location(self):
- return await self.request('location',waitresponse=True)
+ return await self.request('location', waitresponse=True)
async def subscribe(self, event='location'):
await super().subscribe(event=event)
@@ -24,83 +24,13 @@ class GPSService(AGLBaseService):
async def main(loop):
addr = os.environ.get('AGL_TGT_IP', 'localhost')
port = os.environ.get('AGL_TGT_PORT', '30011')
+ jsonpayload = os.environ.get('AGL_TGT_JSON_PAYLOAD', None)
- # gpss = await GPSService(ip=addr, port=port)
- async with asyncssh.connect(addr, username='root') as c:
- # find the name of the service since it is dynamically generated every time
- #TODO CHANGE ME to use the name of the service dynamically after cleaning this crap here
- servicestr = 'agl-service-gps'
- servicename = await c.run(f"systemctl --all | grep {servicestr} | awk '{{print $1}}'", check=False)
- if servicestr not in servicename.stdout:
- print(f"Unable to find service matching pattern '{servicestr}'")
+ gpss = await GPSService(addr)
+ print(await gpss.location())
+ listener = loop.create_task(gpss.listener())
+ await listener
- #TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util
- print(f"Found service name: {servicename.stdout.strip()}")
- # get the pid
- pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}')
- pid = int(pidres.stdout.strip(), 10)
- if pid is 0:
- print(f'Service {servicename.stdout.strip()} is stopped')
- exit(1)
- else:
- print(f'Service PID: {pidres.stdout.strip()}')
-
- # get all sockets in the process' fd directory and their respective inodes
- sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket')
- inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout))
-
- print(f"Socket inodes: {inodes}")
-
- alltcp = await c.run('cat /proc/net/tcp')
- # fieldsstr = ' '.join(alltcp.stdout.strip().splitlines()[0].strip().split()) + ' sref_cnt memloc rto pred_sclk ack_quick congest slowstart'
-
- # https://www.kernel.org/doc/Documentation/networking/proc_net_tcp.txt
- # ['sl', 'local_address', 'rem_address', 'st', 'tx_queue:rx_queue', 'tr:tm->when', 'retrnsmt', 'uid',
- # '0: 00000000:753E 00000000:0000 0A 00000000:00000000 00:00000000 00000000 1001
-
- # 'timeout', 'inode', 'sref_cnt', 'memloc', 'rto', 'pred_sclk', 'ackquick', 'congest', 'slowstart' ]
- # 0 20062 1 0000000095c038d6 100 0 0 10 0'
- # fields = fieldsstr.split()
-
- fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \
- ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}'
- tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]]
-
- # seen once an irregular line "65: 0D80A8C0:D5BE 8410A6BC:0050 06 00000000:00000000 03:000000F8 00000000 0 0 0 3 0000000083dad9fb"
- # parsing could break at some point, because returns None and cannot be parsed
-
- parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None]
- socketinodesbythisprocess = [l for l in parsedtcpsockets if l is isinstance(l,Result) and l.named['inode'] in inodes]
- # got dem sockets
- # expecting >1 because the process could be listening on 8080, all api services' ports are in 30000 port range
- for s in socketinodesbythisprocess:
- _, port = tuple(parse('{}:{}', s['local_address']))
- port = int(port,16)
- if port > 30000:
- print(f'found port {port}')
- break
-
-
-
- #thesocketswearelookingfor = list(filter(lambda x: ( l for l in parsed if l.named['inode'] in inodes), inodes ))
-
- # result = parse(fieldsstr, l)
- # if isinstance(Result, result):
- # result.named['inode'] in inodes
- #
- # print(result)
-
- # print(' '.join(alltcp.stdout.strip().splitlines()[1].strip().split()))
- # result = findall('{}: {}:{} {} {} {} {} {} {} ')
-
- # serviceport = await c.run(f'journalctl -u {servicename.stdout}')
- # print(serviceport.stdout)
- # matches = re.findall('Listening interface \*:(.*) \[',serviceport.stdout)
-
- print("breaketh pointeth h're")
- # print(await gpss.location())
-
- # listener = loop.create_task(gpss.listener())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
diff --git a/mediaplayer.py b/mediaplayer.py
index 21f9801..e7059b6 100644
--- a/mediaplayer.py
+++ b/mediaplayer.py
@@ -1,32 +1,22 @@
-import json
-from json import JSONDecodeError
import os
-import sys
import asyncio
-from random import randint
+import json
-from websockets import connect, ConnectionClosedError
-import concurrent
-from enum import IntEnum
from aglbaseservice import AGLBaseService
-global DEBUG
-DEBUG = True
-
class MediaPlayerService(AGLBaseService):
def __await__(self):
return super()._async_init().__await__()
- def __init__(self, ip, port):
- super().__init__(api='mediaplayer', ip=ip, port=port)
+ def __init__(self, ip, port = None):
+ super().__init__(api='mediaplayer', ip=ip, port=port, service='agl-service-mediaplayer')
- async def playlist(self):
- self.request('playlist')
- verb = 'playlist'
- msgid = randint(0, 999999)
-
- await self.send(f'[2,"{msgid}","{self.api}/{verb}",""]')
+ async def playlist(self, waitresponse=False):
+ if waitresponse:
+ return await self.request('playlist', waitresponse=waitresponse)
+ else:
+ await self.request('playlist')
async def subscribe(self, event='metadata'):
await super().subscribe(event=event)
@@ -37,7 +27,6 @@ class MediaPlayerService(AGLBaseService):
async def control(self, name, value=None):
verb = 'controls'
loopstate = ['off', 'playlist', 'track']
-
controls = {
'play': None,
'pause': None,
@@ -50,46 +39,46 @@ class MediaPlayerService(AGLBaseService):
'volume': 'volume',
'loop': 'state'
}
- assert name in controls.keys(), 'Tried to use non-existant {name} as control for {self.api}'
+ assert name in controls.keys(), 'Tried to use non-existent {name} as control for {self.api}'
- msgid = randint(0, 999999)
if name in ['play', 'pause', 'previous', 'next']:
- msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}"}}]'
+ msg = {'value': name}
elif name in ['seek', 'fast-forward', 'rewind']:
assert value > 0, "Tried to seek with negative integer"
- msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "position": "{str(value)}"}}]'
+ msg = {'value': name, controls[name]: str(value)}
elif name == 'pick-track':
assert type(value) is int, "Try picking a song with an integer"
assert value > 0, "Tried to pick a song with a negative integer"
- msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "index": {str(value)}}}]'
+ msg = {'value': name, controls[value]: str(value)}
elif name == 'volume':
assert type(value) is int, "Try setting the volume with an integer"
assert value > 0, "Tried to set the volume with a negative integer, use values betwen 0-100"
assert value < 100, "Tried to set the volume over 100%, use values betwen 0-100"
- msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "{name}": {str(value)}}}]'
+ msg = {'value': name, name: str(value)}
elif name == 'loop':
assert value in loopstate, f'Tried to set invalid loopstate - {value}, use "off", "playlist" or "track"'
- msg = f'[2,"{msgid}","{self.api}/{verb}", {{"value": "{name}", "{controls[name]}": {str(value)}}}]'
+ msg = {'value': name, controls[name]: str(value)}
- await self.send(msg)
+ await self.request(verb, msg)
async def main(loop):
addr = os.environ.get('AGL_TGT_IP', '192.168.234.202')
- port = os.environ.get('AGL_TGT_PORT', '30016')
+ port = os.environ.get('AGL_TGT_PORT', None)
MPS = await MediaPlayerService(ip=addr, port=port)
- listener = loop.create_task(MPS.listener())
+ # listener = loop.create_task(MPS.listener())
try:
await MPS.subscribe('metadata')
+ print(await MPS.playlist(waitresponse=True))
await MPS.control('next')
- await listener
+ # await listener
except KeyboardInterrupt:
pass
- listener.cancel()
+ # listener.cancel()
await MPS.unsubscribe('playlist')