summaryrefslogtreecommitdiffstats
path: root/aglbaseservice.py
diff options
context:
space:
mode:
authorEdi Feschiyan <edi.feschiyan@konsulko.com>2020-04-15 15:04:02 +0300
committerEdi Feschiyan <edi.feschiyan@konsulko.com>2020-04-15 15:04:02 +0300
commit7519a78657685f14f93983f474d75ae0efb4a2ad (patch)
tree29461ca66ec4b07e4d2ab89e25c2c5aaf34049e6 /aglbaseservice.py
parent43d73e9cd7fe1ee5f08a8a62feb96c35750ca588 (diff)
Cleaning up services' classes
Diffstat (limited to 'aglbaseservice.py')
-rw-r--r--aglbaseservice.py68
1 files changed, 44 insertions, 24 deletions
diff --git a/aglbaseservice.py b/aglbaseservice.py
index 81a868e..fb905ca 100644
--- a/aglbaseservice.py
+++ b/aglbaseservice.py
@@ -9,10 +9,12 @@ from websockets import connect
import logging
import asyncssh
import re
-from parse import *
-
+from parse import Result, parse
from typing import Union
+logging.getLogger('AGLBaseService')
+logging.basicConfig(level=logging.DEBUG)
+
class AFBT(IntEnum):
REQUEST = 2,
RESPONSE = 3,
@@ -37,6 +39,7 @@ class AGLBaseService:
token = None
uuid = None
service = None
+ logger = None
def __init__(self, api: str, ip: str, port: str = None, url: str = None,
token: str = 'HELLO', uuid: str = 'magic', service: str = None):
@@ -47,6 +50,7 @@ class AGLBaseService:
self.token = token
self.uuid = uuid
self.service = service
+ self.logger = logging.getLogger(service)
def __await__(self):
return self._async_init().__await__()
@@ -57,6 +61,14 @@ class AGLBaseService:
async def _async_init(self):
# setting ping_interval to None because AFB does not support websocket ping
# if set to !None, the library will close the socket after the default timeout
+ if self.port is None:
+ serviceport = await self.portfinder()
+ if serviceport is not None:
+ self.port = serviceport
+ else:
+ self.logger('Unable to find port')
+ exit(1)
+
URL = f'ws://{self.ip}:{self.port}/api?token={self.token}&uuid={self.uuid}'
self._conn = connect(close_timeout=0, uri=URL, subprotocols=['x-afb-ws-json1'], ping_interval=None)
self.websocket = await self._conn.__aenter__()
@@ -76,44 +88,51 @@ class AGLBaseService:
async def portfinder(self):
async with asyncssh.connect(self.ip, username='root') as c:
- servicename = await c.run(f"systemctl --all | grep {self.service} | awk '{{print $1}}'", check=False)
+ servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False)
if self.service not in servicename.stdout:
- print(f"Unable to find service matching pattern '{self.service}'")
+ logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND")
exit(1)
- # TODO decide what to do if the service is not started - scan for disabled units/run service via afm-util
- print(f"Found service name: {servicename.stdout.strip()}")
- pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout}')
+ pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}')
pid = int(pidres.stdout.strip(), 10)
if pid is 0:
- print(f'Service {servicename.stdout.strip()} is stopped')
+ logging.warning(f'Service {servicename.stdout.strip()} is stopped')
return None
else:
- print(f'Service PID: {pidres.stdout.strip()}')
+ logging.debug(f'Service PID: {str(pid)}')
- sockets = await c.run(f'find /proc/{pidres.stdout.strip()}/fd/ | xargs readlink | grep socket')
+ sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket')
inodes = frozenset(re.findall('socket:\[(.*)\]', sockets.stdout))
+ logging.debug(f"Socket inodes: {inodes}")
- print(f"Socket inodes: {inodes}")
-
- alltcp = await c.run('cat /proc/net/tcp')
- fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \
+ procnettcp = await c.run('cat /proc/net/tcp')
+ fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\
' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}'
- tcpsockets = [' '.join(l.split()) for l in alltcp.stdout.splitlines()[1:]]
- parsedtcpsockets = [parse(fieldsstr, l) for l in tcpsockets if l is not None]
+ tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]]
+ # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines
+ parsedtcpsockets = []
+ for l in tcpsockets:
+ res = parse(fieldsstr, l)
+ if isinstance(res, Result):
+ parsedtcpsockets.append(res)
+
socketinodesbythisprocess = [l for l in parsedtcpsockets if
- l is isinstance(l, Result) and l.named['inode'] in inodes]
+ isinstance(l, Result) and
+ l.named['inode'] in inodes and
+ # 0A is listening state for the socket
+ l.named['st'] == '0A']
+
for s in socketinodesbythisprocess:
_, port = tuple(parse('{}:{}', s['local_address']))
port = int(port, 16)
- if port > 30000:
- print(f'found port {port}')
+ if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy
+ logging.debug(f'Service running at port {port}')
return port
async def listener(self):
try:
while True:
msg = await self.receive()
- print(f"Received {msg}")
+ # self.logger.debug(f"Received websocket{msg}")
try:
data = json.loads(msg)
if isinstance(data, list):
@@ -123,18 +142,19 @@ class AGLBaseService:
addresponse(msgid, data)
except JSONDecodeError:
- print("Not decoding a non-json message")
+ self.logger.warning("Not decoding a non-json message")
except KeyboardInterrupt:
- print("Received keyboard interrupt, exiting")
+ logging.debug("Received keyboard interrupt, exiting")
except asyncio.CancelledError:
- print("Websocket listener coroutine stopped")
+ logging.warning("Websocket listener coroutine stopped")
except Exception as e:
- print("Unhandled seal: " + str(e))
+ logging.error("Unhandled seal: " + str(e))
async def request(self, verb: str, values: Union[str, dict] = "", msgid: int = randint(0, 9999999),
waitresponse: bool = False):
l = json.dumps([AFBT.REQUEST, str(msgid), f'{self.api}/{verb}', values])
+ self.logger.debug(f'[AGL] <- {l}')
await self.send(l)
if waitresponse:
return await self.receive()