aboutsummaryrefslogtreecommitdiffstats
path: root/extras/Kuksa_Instance.py
diff options
context:
space:
mode:
authorsuchinton2001 <suchinton.2001@gmail.com>2023-07-22 18:39:14 +0530
committersuchinton2001 <suchinton.2001@gmail.com>2023-09-07 18:31:07 +0530
commitdb9f586a19fed7bcd04be3596fc30dc53f61b1db (patch)
tree476d86c085137779f47ee6b409e3a8aaac68991d /extras/Kuksa_Instance.py
parentf9b00b992d88edc0e9c31de809a1a981139c4fde (diff)
Upload progress on AGL demo control panel in one batch
AGL Demo Control Panel is a PyQt5 application used to simulate CAN bus signals using Kuksa.val v1: Initial commit v2: Remove unused assets v3: Add Opensans fonts, remove un-used styles and add Lisences as attributions v4: - Remove Opensans fonts, default to Dejavu fonts - Replace feather icons with carbon icons. - Reusing AGL demo app assests for HVAC and Steering wheel inputs. v5: Remove assets/Images/Lisences.md attribution file Signed-off-by: suchinton2001 <suchinton.2001@gmail.com> Change-Id: I1529495deff6fc27eacb92f7a29c4f71f8c8d5d9
Diffstat (limited to 'extras/Kuksa_Instance.py')
-rw-r--r--extras/Kuksa_Instance.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/extras/Kuksa_Instance.py b/extras/Kuksa_Instance.py
new file mode 100644
index 0000000..a36ae2c
--- /dev/null
+++ b/extras/Kuksa_Instance.py
@@ -0,0 +1,65 @@
+from typing import Optional
+import kuksa_client as kuksa
+import threading
+import time
+
+from extras import config
+
+class KuksaClientSingleton:
+ __instance: Optional["KuksaClientSingleton"] = None
+ __lock = threading.Lock()
+
+ @staticmethod
+ def get_instance() -> "KuksaClientSingleton":
+ if KuksaClientSingleton.__instance is None:
+ with KuksaClientSingleton.__lock:
+ if KuksaClientSingleton.__instance is None:
+ KuksaClientSingleton.__instance = KuksaClientSingleton()
+ return KuksaClientSingleton.__instance
+
+ def __init__(self):
+ if KuksaClientSingleton.__instance is not None:
+ raise Exception("This class is a singleton!")
+ else:
+
+ self.default_Config = config.KUKSA_CONFIG
+ self.token = config.TOKEN_PATH
+
+ try:
+ self.client = kuksa.KuksaClientThread(self.default_Config)
+ self.client.authorize(self.token)
+ time.sleep(2)
+ if self.client.checkConnection() == False:
+ self.client = None
+ except Exception as e:
+ print(e)
+
+
+ KuksaClientSingleton.__instance = self
+
+ def reconnect_client(self, new_Config, new_Token):
+ if self.client is not None:
+ self.client.stop()
+ self.client = kuksa.KuksaClientThread(new_Config)
+ self.client.authorize(new_Token)
+ return self.client
+
+ def get_client(self):
+ return self.client
+
+ def get_config(self):
+ return self.default_Config
+
+ def get_token(self):
+ return self.token
+
+ def get_status(self):
+ if self.client is not None:
+ return self.client.checkConnection()
+ else:
+ return False
+
+ def __del__(self):
+ if self.client is not None:
+ self.client.stop()
+ return None