aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pyagl/services/base.py99
1 files changed, 64 insertions, 35 deletions
diff --git a/pyagl/services/base.py b/pyagl/services/base.py
index e34d89c..39443a1 100644
--- a/pyagl/services/base.py
+++ b/pyagl/services/base.py
@@ -1,3 +1,4 @@
+from subprocess import check_output
from json import JSONDecodeError
from parse import Result, parse
from websockets import connect
@@ -134,13 +135,14 @@ class AGLBaseService:
return self._async_init()
async def _async_init(self):
- # setting ping_interval to None because AFB does not support websocket ping
+ # setting ping_timeout to None because AFB does not support websocket ping
# if set to !None, the library will close the socket after the default timeout
if self.port is None:
serviceport = await self.portfinder()
if serviceport is not None:
self.port = serviceport
else:
+ print("Service port: ", serviceport)
self.logger.error('Unable to find port')
exit(1)
@@ -162,50 +164,77 @@ class AGLBaseService:
return await self.websocket.recv()
async def portfinder(self):
+ fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}' \
+ ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}'
# TODO:handle ssh timeouts, asyncssh does not support it apparently, and connect returns context_manager which
# cannot be used with asyncio.wait_for
-
- async with asyncssh.connect(self.ip, username='root') as c:
-
- servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False)
- if self.service not in servicename.stdout:
- logging.error(f"Service matching pattern - '{self.service}' - NOT FOUND")
+ if self.ip == 'localhost' or self.ip == '127.0.0.1':
+ servicename = check_output(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'".encode(), shell=True)
+ servicename = servicename.decode().strip()
+ if self.service not in servicename:
+ self.logger.error(f"Service matching pattern - '{self.service}' - NOT FOUND")
exit(1)
- pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}')
- pid = int(pidres.stdout.strip(), 10)
+ pid = check_output(f'systemctl show --property MainPID --value {servicename}'.encode(), shell=True)
+ pid = int(pid.decode().strip(), 10)
if pid == 0:
- logging.warning(f'Service {servicename.stdout.strip()} is stopped')
+ self.logger.warning(f'Service {servicename} is stopped')
return None
else:
self.logger.debug(f'Service PID: {str(pid)}')
- sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket')
- inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets.stdout))
+ sockets = check_output(f'find /proc/{pid}/fd/ | xargs readlink | grep socket'.encode(), shell=True)
+ sockets = sockets.decode().strip()
+ inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets))
self.logger.debug(f"Socket inodes: {inodes}")
- procnettcp = await c.run('cat /proc/net/tcp')
- fieldsstr = '{sl}: {local_address} {rem_address} {st} {tx_queue}:{rx_queue} {tr}:{tmwhen} {retrnsmt} {uid}'\
- ' {timeout} {inode} {sref_cnt} {memloc} {rto} {pred_sclk} {ackquick} {congest} {slowstart}'
- tcpsockets = [' '.join(l.split()) for l in procnettcp.stdout.splitlines()[1:]]
- # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines
- parsedtcpsockets = []
- for l in tcpsockets:
- res = parse(fieldsstr, l)
- if isinstance(res, Result):
- parsedtcpsockets.append(res)
-
- socketinodesbythisprocess = [l for l in parsedtcpsockets if
- isinstance(l, Result) and
- l.named['inode'] in inodes and
- # 0A is listening state for the socket
- l.named['st'] == '0A']
-
- for s in socketinodesbythisprocess:
- _, port = tuple(parse('{}:{}', s['local_address']))
- port = int(port, 16)
- if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy
- self.logger.debug(f'Service running at port {port}')
- return port
+ procnettcp = check_output('cat /proc/net/tcp'.encode(), shell=True)
+ procnettcp = procnettcp.decode().splitlines()[1:]
+ self.logger.debug(procnettcp)
+
+ else:
+ async with asyncssh.connect(self.ip, username='root') as c:
+ servicename = await c.run(f"systemctl --all | grep {self.service}-- | awk '{{print $1}}'", check=False)
+ if self.service not in servicename.stdout:
+ self.logger.error(f"Service matching pattern - '{self.service}' - NOT FOUND")
+ exit(1)
+ pidres = await c.run(f'systemctl show --property MainPID --value {servicename.stdout.strip()}')
+ pid = int(pidres.stdout.strip(), 10)
+ if pid == 0:
+ self.logger.warning(f'Service {servicename.stdout.strip()} is stopped')
+ return None
+ else:
+ self.logger.debug(f'Service PID: {str(pid)}')
+
+ sockets = await c.run(f'find /proc/{pid}/fd/ | xargs readlink | grep socket')
+ inodes = frozenset(re.findall("socket:\\[(.*)\\]", sockets.stdout))
+ self.logger.debug(f"Socket inodes: {inodes}")
+
+ procnettcp = await c.run('cat /proc/net/tcp')
+ procnettcp = procnettcp.stdout.splitlines()[1:]
+
+ self.logger.debug(procnettcp)
+ tcpsockets = [' '.join(l.split()) for l in procnettcp]
+ # different lines with less stats appear sometimes, parse will return None, so ignore 'None' lines
+ parsedtcpsockets = []
+ for l in tcpsockets:
+ res = parse(fieldsstr, l)
+ if isinstance(res, Result):
+ parsedtcpsockets.append(res)
+
+ self.logger.debug(parsedtcpsockets)
+ socketinodesbythisprocess = [l for l in parsedtcpsockets if
+ isinstance(l, Result) and
+ l.named['inode'] in inodes and
+ # 0A is listening state for the socket
+ l.named['st'] == '0A']
+
+ self.logger.debug(socketinodesbythisprocess)
+ for s in socketinodesbythisprocess:
+ _, port = tuple(parse('{}:{}', s['local_address']))
+ port = int(port, 16)
+ if port >= 30000: # the port is above 30000 range, 8080 is some kind of proxy
+ self.logger.debug(f'Service running at port {port}')
+ return port
async def listener(self, stdout: bool = False):
while True: