diff options
author | Edi Feschiyan <edi.feschiyan@konsulko.com> | 2020-04-15 15:04:02 +0300 |
---|---|---|
committer | Edi Feschiyan <edi.feschiyan@konsulko.com> | 2020-04-15 15:04:02 +0300 |
commit | 7519a78657685f14f93983f474d75ae0efb4a2ad (patch) | |
tree | 29461ca66ec4b07e4d2ab89e25c2c5aaf34049e6 /aglbaseservice.py | |
parent | 43d73e9cd7fe1ee5f08a8a62feb96c35750ca588 (diff) |
Cleaning up services' classes
Diffstat (limited to 'aglbaseservice.py')
-rw-r--r-- | aglbaseservice.py | 68 |
1 files changed, 44 insertions, 24 deletions
diff --git a/aglbaseservice.py b/aglbaseservice.py index 81a868e..fb905ca 100644 --- a/aglbaseservice.py +++ b/aglbaseservice.py @@ -9,10 +9,12 @@ from websockets import connect import logging import asyncssh import re -from parse import * - +from parse import Result, parse from typing import Union +logging.getLogger('AGLBaseService') +logging.basicConfig(level=logging.DEBUG) + class AFBT(IntEnum): REQUEST = 2, RESPONSE = 3, @@ -37,6 +39,7 @@ class AGLBaseService: token = None uuid = None service = None + logger = None def __init__(self, api: str, ip: str, port: str = None, url: str = None, token: str = 'HELLO', uuid: str = 'magic', service: str = None): @@ -47,6 +50,7 @@ class AGLBaseService: self.token = token self.uuid = uuid self.service = service + self.logger = logging.getLogger(service) def __await__(self): return self._async_init().__await__() @@ -57,6 +61,14 @@ class AGLBaseService: async def _async_init(self): # setting ping_interval to None because AFB does not support websocket ping # if set to !None, the library will close the socket after the default timeout + if self.port is None: + serviceport = await self.portfinder() + if serviceport is not None: + self.port = serviceport + else: + self.logger('Unable to find port') + exit(1) + URL = f'ws://{self.ip}:{self.port}/api?token={self.token}&uuid={self.uuid}' self._conn = connect(close_timeout=0, uri=URL, subprotocols=['x-afb-ws-json1'], ping_interval=None) self.websocket = await self._conn.__aenter__() @@ -76,44 +88,51 @@ class AGLBaseService: async def portfinder(self): async with asyncssh.connect(self.ip, username='root') as c: - servicename = await c.run(f"systemctl --all | grep {self.service} | awk '{{print $1}}'", check=False) + servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False) if self.service not in servicename.stdout: - print(f"Unable to find service matching pattern '{self.service}'") + logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND") exit(1) - # TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util - print(f"Found service name: {servicename.stdout.strip()}") - pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}') + pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}') pid = int(pidres.stdout.strip(), 10) if pid is 0: - print(f'Service {servicename.stdout.strip()} is stopped') + logging.warning(f'Service {servicename.stdout.strip()} is stopped') return None else: - print(f'Service PID: {pidres.stdout.strip()}') + logging.debug(f'Service PID: {str(pid)}') - sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket') + sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket') inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout)) + logging.debug(f"Socket inodes: {inodes}") - print(f"Socket inodes: {inodes}") - - alltcp = await c.run('cat /proc/net/tcp') - fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \ + procnettcp = await c.run('cat /proc/net/tcp') + fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\ ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}' - tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]] - parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None] + tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]] + # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines + parsedtcpsockets = [] + for l in tcpsockets: + res = parse(fieldsstr, l) + if isinstance(res, Result): + parsedtcpsockets.append(res) + socketinodesbythisprocess = [l for l in parsedtcpsockets if - l is isinstance(l, Result) and l.named['inode'] in inodes] + isinstance(l, Result) and + l.named['inode'] in inodes and + # 0A is listening state for the socket + l.named['st'] == '0A'] + for s in socketinodesbythisprocess: _, port = tuple(parse('{}:{}', s['local_address'])) port = int(port, 16) - if port > 30000: - print(f'found port {port}') + if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy + logging.debug(f'Service running at port {port}') return port async def listener(self): try: while True: msg = await self.receive() - print(f"Received {msg}") + # self.logger.debug(f"Received websocket{msg}") try: data = json.loads(msg) if isinstance(data, list): @@ -123,18 +142,19 @@ class AGLBaseService: addresponse(msgid, data) except JSONDecodeError: - print("Not decoding a non-json message") + self.logger.warning("Not decoding a non-json message") except KeyboardInterrupt: - print("Received keyboard interrupt, exiting") + logging.debug("Received keyboard interrupt, exiting") except asyncio.CancelledError: - print("Websocket listener coroutine stopped") + logging.warning("Websocket listener coroutine stopped") except Exception as e: - print("Unhandled seal: " + str(e)) + logging.error("Unhandled seal: " + str(e)) async def request(self, verb: str, values: Union[str, dict] = "", msgid: int = randint(0, 9999999), waitresponse: bool = False): l = json.dumps([AFBT.REQUEST, str(msgid), f'{self.api}/{verb}', values]) + self.logger.debug(f'[AGL] <- {l}') await self.send(l) if waitresponse: return await self.receive() |