aboutsummaryrefslogtreecommitdiffstats
path: root/agl_service_voiceagent/utils/vss_interface.py
blob: a77e52c4c7e736279f0fb35b74062e53e858f55e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import json
import threading
from kuksa_client import KuksaClientThread
import sys
from pathlib import Path
import asyncio
import concurrent.futures
from kuksa_client.grpc.aio import VSSClient
from kuksa_client.grpc import Datapoint
import time
from agl_service_voiceagent.utils.config import get_config_value, get_logger


class VSSInterface:
    """
    VSS Interface

    This class provides methods to initialize, authorize, connect, send values,
    check the status, and close the Kuksa client.
    """

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        """
        Get the unique instance of the class.

        Returns:
            KuksaInterface: The instance of the class.
        """
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(VSSInterface, cls).__new__(cls)
                cls._instance.init_client()
        return cls._instance

    def init_client(self):
        """
        Initialize the Kuksa client.
        """
        # Defaults
        self.hostname = str(get_config_value("hostname", "VSS"))
        self.port = str(get_config_value("port", "VSS"))
        self.token_filename = str(get_config_value("token_filename", "VSS"))
        self.tls_server_name = str(get_config_value("tls_server_name", "VSS"))
        self.verbose = False
        self.insecure = bool(int(get_config_value("insecure", "VSS")))
        self.protocol = str(get_config_value("protocol", "VSS"))
        self.ca_cert_filename = str(get_config_value("ca_cert_filename", "VSS"))
        self.token = None
        self.is_connected = False
        self.logger = get_logger()

        self.set_token()

        # validate config
        if not self.validate_config():
            exit(1)

        # define class methods
        self.vss_client = None

    def validate_config(self):
        """
        Validate the Kuksa client configuration.

        Returns:
            bool: True if the configuration is valid, False otherwise.
        """
        if self.hostname is None:
            print("[-] Error: Kuksa IP address is not set.")
            self.logger.error("Kuksa IP address is not set.")
            return False

        if self.port is None:
            print("[-] Error: Kuksa port is not set.")
            self.logger.error("Kuksa port is not set.")
            return False

        if self.token is None:
            print("[-] Warning: Kuksa auth token is not set.")
            self.logger.warning("Kuksa auth token is not set.")

        if self.protocol != "ws" and self.protocol != "grpc":
            print("[-] Error: Invalid Kuksa protocol. Only 'ws' and 'grpc' are supported.")
            self.logger.error("Invalid Kuksa protocol. Only 'ws' and 'grpc' are supported.")
            return False

        return True
    
    def set_token(self):
        """
        Set the Kuksa auth token.
        """
        if self.token_filename != "":
            token_file = open(self.token_filename, "r")
            self.token = token_file.read()
        else:
            self.token = ""

    def get_vss_client(self):
        """
        Get the VSS client instance.

        Returns:
            VSSClientThread: The VSS client instance.
        """
        if self.vss_client is None:
            return None
        return self.vss_client
    
    async def authorize_vss_client(self):
        """
        Authorize the VSS client.
        """
        if self.vss_client is None:
            print("[-] Error: Failed to authorize Kuksa client. Kuksa client is not initialized.")
            self.logger.error("Failed to authorize Kuksa client. Kuksa client is not initialized.")
            return False
        try:
            await self.vss_client.authorize(self.token)
            print(f"Authorized Kuksa client with token {self.token}")
            return True
        except Exception as e:
            print(f"[-] Error: Failed to authorize Kuksa client: {e}")
            self.logger.error(f"Failed to authorize Kuksa client: {e}")
            return False
        
    async def get_server_info(self):
        """
        Get the server information.

        Returns:
            dict: The server information.
        """
        if self.vss_client is None:
            return None
        try:
            return await self.vss_client.get_server_info()
        except Exception as e:
            print(f"[-] Error: Failed to get server info: {e}")
            self.logger.error(f"Failed to get server info: {e}")
            return None
    
    async def connect_vss_client(self):
        """
        Connect the VSS client.
        """
        print(f"Connecting to KUKSA.val databroker at {self.hostname}:{self.port}")
        try:
            self.vss_client = VSSClient(
                                self.hostname,
                                self.port,
                                root_certificates=Path(self.ca_cert_filename),
                                token=self.token,
                                tls_server_name=self.tls_server_name,
                                ensure_startup_connection=True)
            await self.vss_client.connect()
            print(f"[+] Connected to KUKSA.val databroker at {self.hostname}:{self.port}")
            self.is_connected = True
            return True
        except Exception as e:
            print(f"[-] Error: Failed to connect to Kuksa val databroker: {e}")
            self.logger.error(f"Failed to connect to Kuksa val databroker: {e}")
            self.is_connected = False
            return False


    async def set_current_values(self, path=None, value=None):
        """
        Set the current values.

        Args:
            updates (dict): The updates to set.
        """
        result = False
        if self.vss_client is None:
            print(f"[-] Error: Failed to send value '{value}' to Kuksa. Kuksa client is not initialized.")
            self.logger.error(f"Failed to send value '{value}' to Kuksa. Kuksa client is not initialized.")
            return result
        try:
            await self.vss_client.set_current_values({path: Datapoint(value)})
            result = True
        except Exception as e:
            print(f"[-] Error: Failed to send value '{value}' to Kuksa: {e}")
            self.logger.error(f"Failed to send value '{value}' to Kuksa: {e}")
        return result
    

    async def get_current_values(self, path=None):
        """
        Get the current values.

        Args:
            paths (list): The paths to get.

        Returns:
            dict: The current values.

        current_values = await client.get_current_values([
                'Vehicle.Speed',
                'Vehicle.ADAS.ABS.IsActive',
            ])
            speed_value = current_values['Vehicle.Speed'].value
        """

        if self.vss_client is None or self.is_connected is False:
            return None
        try:
            result = await self.vss_client.get_current_values([path])
            return result[path].value
        except Exception as e:
            print(f"[-] Error: Failed to get current values: {e}")
            self.logger.error(f"Failed to get current values: {e}")
            return None

    async def disconnect_vss_client(self):
        """
        Disconnect the VSS client.
        """
        if self.vss_client is None:
            print("[-] Error: Failed to disconnect Kuksa client. Kuksa client is not initialized.")
            self.logger.error("Failed to disconnect Kuksa client. Kuksa client is not initialized.")
            return False
        try:
            await self.vss_client.disconnect()
            print("Disconnected from Kuksa val databroker.")
            self.is_connected = False
            return True
        except Exception as e:
            print(f"[-] Error: Failed to disconnect from Kuksa val databroker: {e}")
            self.logger.error(f"Failed to disconnect from Kuksa val databroker: {e}")
            return False