diff options
Diffstat (limited to 'agl_service_voiceagent/utils/kuksa_interface.py')
-rw-r--r-- | agl_service_voiceagent/utils/kuksa_interface.py | 178 |
1 files changed, 161 insertions, 17 deletions
diff --git a/agl_service_voiceagent/utils/kuksa_interface.py b/agl_service_voiceagent/utils/kuksa_interface.py index 3e1c045..9270379 100644 --- a/agl_service_voiceagent/utils/kuksa_interface.py +++ b/agl_service_voiceagent/utils/kuksa_interface.py @@ -14,53 +14,197 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time +import json +import threading from kuksa_client import KuksaClientThread from agl_service_voiceagent.utils.config import get_config_value class KuksaInterface: - def __init__(self): + """ + Kuksa Interface + + This class provides methods to initialize, authorize, connect, send values, + check the status, and close the Kuksa client. + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + """ + Get the unique instance of the class. + + Returns: + KuksaInterface: The instance of the class. + """ + with cls._lock: + if cls._instance is None: + cls._instance = super(KuksaInterface, cls).__new__(cls) + cls._instance.init_client() + return cls._instance + + + def init_client(self): + """ + Initialize the Kuksa client configuration. + """ # get config values - self.ip = get_config_value("ip", "Kuksa") - self.port = get_config_value("port", "Kuksa") + self.ip = str(get_config_value("ip", "Kuksa")) + self.port = str(get_config_value("port", "Kuksa")) self.insecure = get_config_value("insecure", "Kuksa") self.protocol = get_config_value("protocol", "Kuksa") self.token = get_config_value("token", "Kuksa") + print(self.ip, self.port, self.insecure, self.protocol, self.token) + # define class methods self.kuksa_client = None def get_kuksa_client(self): + """ + Get the Kuksa client instance. + + Returns: + KuksaClientThread: The Kuksa client instance. + """ return self.kuksa_client - + def get_kuksa_status(self): + """ + Check the status of the Kuksa client connection. + + Returns: + bool: True if the client is connected, False otherwise. + """ if self.kuksa_client: return self.kuksa_client.checkConnection() - return False def connect_kuksa_client(self): + """ + Connect and start the Kuksa client. + """ try: - self.kuksa_client = KuksaClientThread({ - "ip": self.ip, - "port": self.port, - "insecure": self.insecure, - "protocol": self.protocol, - }) - self.kuksa_client.authorize(self.token) + with self._lock: + if self.kuksa_client is None: + self.kuksa_client = KuksaClientThread({ + "ip": self.ip, + "port": self.port, + "insecure": self.insecure, + "protocol": self.protocol, + }) + self.kuksa_client.start() + time.sleep(2) # Give the thread time to start + + if not self.get_kuksa_status(): + print("[-] Error: Connection to Kuksa server failed.") + else: + print("[+] Connection to Kuksa established.") except Exception as e: - print("Error: ", e) + print("[-] Error: Connection to Kuksa server failed. ", str(e)) + + + def authorize_kuksa_client(self): + """ + Authorize the Kuksa client with the provided token. + """ + if self.kuksa_client: + response = self.kuksa_client.authorize(self.token) + response = json.loads(response) + if "error" in response: + error_message = response.get("error", "Unknown error") + print(f"[-] Error: Authorization failed. {error_message}") + else: + print("[+] Kuksa client authorized successfully.") + else: + print("[-] Error: Kuksa client is not initialized. Call `connect_kuksa_client` first.") + + + def send_values(self, path=None, value=None): + """ + Send values to the Kuksa server. + + Args: + path (str): The path to the value. + value (str): The value to be sent. + Returns: + bool: True if the value was set, False otherwise. + """ + result = False + if self.kuksa_client is None: + print("[-] Error: Kuksa client is not initialized.") + return + + if self.get_kuksa_status(): + try: + response = self.kuksa_client.setValue(path, value) + response = json.loads(response) + if not "error" in response: + print(f"[+] Value '{value}' sent to Kuksa successfully.") + result = True + else: + error_message = response.get("error", "Unknown error") + print(f"[-] Error: Failed to send value '{value}' to Kuksa. {error_message}") + + except Exception as e: + print("[-] Error: Failed to send values to Kuksa. ", str(e)) + + else: + print("[-] Error: Connection to Kuksa failed.") + + return result - def send_values(self, Path=None, Value=None): + def get_value(self, path=None): + """ + Get values from the Kuksa server. + + Args: + path (str): The path to the value. + Returns: + str: The value if the path is valid, None otherwise. + """ + result = None if self.kuksa_client is None: - print("Error: Kuksa client is not initialized.") + print("[-] Error: Kuksa client is not initialized.") + return if self.get_kuksa_status(): - self.kuksa_client.setValue(Path, Value) + try: + response = self.kuksa_client.getValue(path) + response = json.loads(response) + if not "error" in response: + result = response.get("data", None) + result = result.get("dp", None) + result = result.get("value", None) + + else: + error_message = response.get("error", "Unknown error") + print(f"[-] Error: Failed to get value from Kuksa. {error_message}") + + except Exception as e: + print("[-] Error: Failed to get values from Kuksa. ", str(e)) else: - print("Error: Connection to Kuksa failed.") + print("[-] Error: Connection to Kuksa failed.") + + return result + + + def close_kuksa_client(self): + """ + Close and stop the Kuksa client. + """ + try: + with self._lock: + if self.kuksa_client: + self.kuksa_client.stop() + self.kuksa_client = None + print("[+] Kuksa client stopped.") + except Exception as e: + print("[-] Error: Failed to close Kuksa client. ", str(e))
\ No newline at end of file |