diff options
-rw-r--r-- | pyagl/services/base.py | 99 |
1 files changed, 64 insertions, 35 deletions
diff --git a/pyagl/services/base.py b/pyagl/services/base.py index e34d89c..39443a1 100644 --- a/pyagl/services/base.py +++ b/pyagl/services/base.py @@ -1,3 +1,4 @@ +from subprocess import check_output from json import JSONDecodeError from parse import Result, parse from websockets import connect @@ -134,13 +135,14 @@ class AGLBaseService: return self._async_init() async def _async_init(self): - # setting ping_interval to None because AFB does not support websocket ping + # setting ping_timeout to None because AFB does not support websocket ping # if set to !None, the library will close the socket after the default timeout if self.port is None: serviceport = await self.portfinder() if serviceport is not None: self.port = serviceport else: + print("Service port: ", serviceport) self.logger.error('Unable to find port') exit(1) @@ -162,50 +164,77 @@ class AGLBaseService: return await self.websocket.recv() async def portfinder(self): + fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \ + ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}' # TODO:handle ssh timeouts, asyncssh does not support it apparently, and connect returns context_manager which # cannot be used with asyncio.wait_for - - async with asyncssh.connect(self.ip, username='root') as c: - - servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False) - if self.service not in servicename.stdout: - logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND") + if self.ip == 'localhost' or self.ip == '127.0.0.1': + servicename = check_output(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'".encode(), shell=True) + servicename = servicename.decode().strip() + if self.service not in servicename: + self.logger.error(f"Service matching pattern - '{self.service}' - NOT FOUND") exit(1) - pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}') - pid = int(pidres.stdout.strip(), 10) + pid = check_output(f'systemctl show --property MainPID --value {servicename}'.encode(), shell=True) + pid = int(pid.decode().strip(), 10) if pid == 0: - logging.warning(f'Service {servicename.stdout.strip()} is stopped') + self.logger.warning(f'Service {servicename} is stopped') return None else: self.logger.debug(f'Service PID: {str(pid)}') - sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket') - inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets.stdout)) + sockets = check_output(f'find /proc/{pid}/fd/ | xargs readlink | grep socket'.encode(), shell=True) + sockets = sockets.decode().strip() + inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets)) self.logger.debug(f"Socket inodes: {inodes}") - procnettcp = await c.run('cat /proc/net/tcp') - fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\ - ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}' - tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]] - # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines - parsedtcpsockets = [] - for l in tcpsockets: - res = parse(fieldsstr, l) - if isinstance(res, Result): - parsedtcpsockets.append(res) - - socketinodesbythisprocess = [l for l in parsedtcpsockets if - isinstance(l, Result) and - l.named['inode'] in inodes and - # 0A is listening state for the socket - l.named['st'] == '0A'] - - for s in socketinodesbythisprocess: - _, port = tuple(parse('{}:{}', s['local_address'])) - port = int(port, 16) - if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy - self.logger.debug(f'Service running at port {port}') - return port + procnettcp = check_output('cat /proc/net/tcp'.encode(), shell=True) + procnettcp = procnettcp.decode().splitlines()[1:] + self.logger.debug(procnettcp) + + else: + async with asyncssh.connect(self.ip, username='root') as c: + servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False) + if self.service not in servicename.stdout: + self.logger.error(f"Service matching pattern - '{self.service}' - NOT FOUND") + exit(1) + pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}') + pid = int(pidres.stdout.strip(), 10) + if pid == 0: + self.logger.warning(f'Service {servicename.stdout.strip()} is stopped') + return None + else: + self.logger.debug(f'Service PID: {str(pid)}') + + sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket') + inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets.stdout)) + self.logger.debug(f"Socket inodes: {inodes}") + + procnettcp = await c.run('cat /proc/net/tcp') + procnettcp = procnettcp.stdout.splitlines()[1:] + + self.logger.debug(procnettcp) + tcpsockets = [' '.join(l.split()) for l in procnettcp] + # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines + parsedtcpsockets = [] + for l in tcpsockets: + res = parse(fieldsstr, l) + if isinstance(res, Result): + parsedtcpsockets.append(res) + + self.logger.debug(parsedtcpsockets) + socketinodesbythisprocess = [l for l in parsedtcpsockets if + isinstance(l, Result) and + l.named['inode'] in inodes and + # 0A is listening state for the socket + l.named['st'] == '0A'] + + self.logger.debug(socketinodesbythisprocess) + for s in socketinodesbythisprocess: + _, port = tuple(parse('{}:{}', s['local_address'])) + port = int(port, 16) + if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy + self.logger.debug(f'Service running at port {port}') + return port async def listener(self, stdout: bool = False): while True: |