diff options
Diffstat (limited to 'agl_service_voiceagent/utils/vss_interface.py')
-rw-r--r-- | agl_service_voiceagent/utils/vss_interface.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/agl_service_voiceagent/utils/vss_interface.py b/agl_service_voiceagent/utils/vss_interface.py new file mode 100644 index 0000000..a77e52c --- /dev/null +++ b/agl_service_voiceagent/utils/vss_interface.py @@ -0,0 +1,236 @@ +import json +import threading +from kuksa_client import KuksaClientThread +import sys +from pathlib import Path +import asyncio +import concurrent.futures +from kuksa_client.grpc.aio import VSSClient +from kuksa_client.grpc import Datapoint +import time +from agl_service_voiceagent.utils.config import get_config_value, get_logger + + +class VSSInterface: + """ + VSS Interface + + This class provides methods to initialize, authorize, connect, send values, + check the status, and close the Kuksa client. + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + """ + Get the unique instance of the class. + + Returns: + KuksaInterface: The instance of the class. + """ + with cls._lock: + if cls._instance is None: + cls._instance = super(VSSInterface, cls).__new__(cls) + cls._instance.init_client() + return cls._instance + + def init_client(self): + """ + Initialize the Kuksa client. + """ + # Defaults + self.hostname = str(get_config_value("hostname", "VSS")) + self.port = str(get_config_value("port", "VSS")) + self.token_filename = str(get_config_value("token_filename", "VSS")) + self.tls_server_name = str(get_config_value("tls_server_name", "VSS")) + self.verbose = False + self.insecure = bool(int(get_config_value("insecure", "VSS"))) + self.protocol = str(get_config_value("protocol", "VSS")) + self.ca_cert_filename = str(get_config_value("ca_cert_filename", "VSS")) + self.token = None + self.is_connected = False + self.logger = get_logger() + + self.set_token() + + # validate config + if not self.validate_config(): + exit(1) + + # define class methods + self.vss_client = None + + def validate_config(self): + """ + Validate the Kuksa client configuration. + + Returns: + bool: True if the configuration is valid, False otherwise. + """ + if self.hostname is None: + print("[-] Error: Kuksa IP address is not set.") + self.logger.error("Kuksa IP address is not set.") + return False + + if self.port is None: + print("[-] Error: Kuksa port is not set.") + self.logger.error("Kuksa port is not set.") + return False + + if self.token is None: + print("[-] Warning: Kuksa auth token is not set.") + self.logger.warning("Kuksa auth token is not set.") + + if self.protocol != "ws" and self.protocol != "grpc": + print("[-] Error: Invalid Kuksa protocol. Only 'ws' and 'grpc' are supported.") + self.logger.error("Invalid Kuksa protocol. Only 'ws' and 'grpc' are supported.") + return False + + return True + + def set_token(self): + """ + Set the Kuksa auth token. + """ + if self.token_filename != "": + token_file = open(self.token_filename, "r") + self.token = token_file.read() + else: + self.token = "" + + def get_vss_client(self): + """ + Get the VSS client instance. + + Returns: + VSSClientThread: The VSS client instance. + """ + if self.vss_client is None: + return None + return self.vss_client + + async def authorize_vss_client(self): + """ + Authorize the VSS client. + """ + if self.vss_client is None: + print("[-] Error: Failed to authorize Kuksa client. Kuksa client is not initialized.") + self.logger.error("Failed to authorize Kuksa client. Kuksa client is not initialized.") + return False + try: + await self.vss_client.authorize(self.token) + print(f"Authorized Kuksa client with token {self.token}") + return True + except Exception as e: + print(f"[-] Error: Failed to authorize Kuksa client: {e}") + self.logger.error(f"Failed to authorize Kuksa client: {e}") + return False + + async def get_server_info(self): + """ + Get the server information. + + Returns: + dict: The server information. + """ + if self.vss_client is None: + return None + try: + return await self.vss_client.get_server_info() + except Exception as e: + print(f"[-] Error: Failed to get server info: {e}") + self.logger.error(f"Failed to get server info: {e}") + return None + + async def connect_vss_client(self): + """ + Connect the VSS client. + """ + print(f"Connecting to KUKSA.val databroker at {self.hostname}:{self.port}") + try: + self.vss_client = VSSClient( + self.hostname, + self.port, + root_certificates=Path(self.ca_cert_filename), + token=self.token, + tls_server_name=self.tls_server_name, + ensure_startup_connection=True) + await self.vss_client.connect() + print(f"[+] Connected to KUKSA.val databroker at {self.hostname}:{self.port}") + self.is_connected = True + return True + except Exception as e: + print(f"[-] Error: Failed to connect to Kuksa val databroker: {e}") + self.logger.error(f"Failed to connect to Kuksa val databroker: {e}") + self.is_connected = False + return False + + + async def set_current_values(self, path=None, value=None): + """ + Set the current values. + + Args: + updates (dict): The updates to set. + """ + result = False + if self.vss_client is None: + print(f"[-] Error: Failed to send value '{value}' to Kuksa. Kuksa client is not initialized.") + self.logger.error(f"Failed to send value '{value}' to Kuksa. Kuksa client is not initialized.") + return result + try: + await self.vss_client.set_current_values({path: Datapoint(value)}) + result = True + except Exception as e: + print(f"[-] Error: Failed to send value '{value}' to Kuksa: {e}") + self.logger.error(f"Failed to send value '{value}' to Kuksa: {e}") + return result + + + async def get_current_values(self, path=None): + """ + Get the current values. + + Args: + paths (list): The paths to get. + + Returns: + dict: The current values. + + current_values = await client.get_current_values([ + 'Vehicle.Speed', + 'Vehicle.ADAS.ABS.IsActive', + ]) + speed_value = current_values['Vehicle.Speed'].value + """ + + if self.vss_client is None or self.is_connected is False: + return None + try: + result = await self.vss_client.get_current_values([path]) + return result[path].value + except Exception as e: + print(f"[-] Error: Failed to get current values: {e}") + self.logger.error(f"Failed to get current values: {e}") + return None + + async def disconnect_vss_client(self): + """ + Disconnect the VSS client. + """ + if self.vss_client is None: + print("[-] Error: Failed to disconnect Kuksa client. Kuksa client is not initialized.") + self.logger.error("Failed to disconnect Kuksa client. Kuksa client is not initialized.") + return False + try: + await self.vss_client.disconnect() + print("Disconnected from Kuksa val databroker.") + self.is_connected = False + return True + except Exception as e: + print(f"[-] Error: Failed to disconnect from Kuksa val databroker: {e}") + self.logger.error(f"Failed to disconnect from Kuksa val databroker: {e}") + return False + + |