summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsuchinton2001 <suchinton.2001@gmail.com>2023-09-15 23:08:52 +0530
committerJan-Simon Moeller <jsmoeller@linuxfoundation.org>2023-09-18 15:57:18 +0000
commit746451c1587a2830b47904b5abb9f0b6c1b10641 (patch)
tree4c2531b4740f4b7c61f2e245636c40df9d9c9844
parentc9e50f62b3a59133b64d98d15bfe18c52b2ba7f5 (diff)
Update extra modules
V1: - Add module to feed Steering Control CAN messages - Add License headers to files - Modify UI_Handler to accommodate new Navigation bar animations - Minor code refactoring V2: - Add subscription handling in UI_Handler module - Add documentation for code - Remove redundant code Bug-AGL: SPEC-4905 Signed-off-by: suchinton2001 <suchinton.2001@gmail.com> Change-Id: I6f7c2c2fb00885064c7894329329f13e447648d3
-rw-r--r--extras/FeedCAN.py66
-rw-r--r--extras/FeedKuksa.py81
-rw-r--r--extras/Kuksa_Instance.py160
-rw-r--r--extras/UI_Handeler.py250
4 files changed, 447 insertions, 110 deletions
diff --git a/extras/FeedCAN.py b/extras/FeedCAN.py
new file mode 100644
index 0000000..92649f4
--- /dev/null
+++ b/extras/FeedCAN.py
@@ -0,0 +1,66 @@
+"""
+ Copyright 2023 Suchinton Chakravarty
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+
+import can
+
+def send_can_signal(frame):
+ """
+ Sends a CAN signal to the CAN bus using the given frame.
+ Args:
+ frame (str): The frame to be sent to the CAN bus.
+ Returns:
+ None
+ """
+ msg = separate_can_frame(frame)
+ bus = can.interface.Bus(channel='can0', bustype='socketcan')
+ #msg = can.Message(arbitration_id=can_id, data=data, is_extended_id=False)
+ try:
+ bus.send(msg)
+ print("CAN signal sent successfully:")
+ print("CAN ID:", hex(msg.arbitration_id))
+ print("Data:", msg.data)
+ if frame!="021#FFFFFFFF00000000":
+ # Turn off signal
+ send_can_signal("021#FFFFFFFF00000000")
+
+ except can.CanError:
+ print("Failed to send CAN signal")
+ finally:
+ bus.shutdown()
+
+def separate_can_frame(frame):
+ """
+ Separates a CAN frame into its arbitration ID and data parts.
+
+ Args:
+ frame (str): A string representing the CAN frame in the format "ARBID#DATA".
+
+ Returns:
+ can.Message: A can.Message object with the arbitration ID and data extracted from the input frame.
+ """
+ arb_id, data = frame.split("#")
+ arb_id = int(arb_id, 16)
+ data = bytes.fromhex(data)
+ message = can.Message(arbitration_id=arb_id, data=data)
+ return message
+
+
+def main():
+ frame = "021#FFFFFFFF10000000"
+ send_can_signal(frame)
+
+if __name__ == "__main__":
+ main()
diff --git a/extras/FeedKuksa.py b/extras/FeedKuksa.py
index d75bda0..903b442 100644
--- a/extras/FeedKuksa.py
+++ b/extras/FeedKuksa.py
@@ -15,41 +15,86 @@
"""
import time
+import logging
from PyQt5.QtCore import QThread
from . import Kuksa_Instance as kuksa_instance
class FeedKuksa(QThread):
+ """
+ A class to handle sending values to Kuksa.
+
+ Attributes:
+ -----------
+ stop_flag : bool
+ A flag to stop the thread.
+ kuksa : kuksa_instance.KuksaClientSingleton.instance()
+ An instance of the Kuksa client.
+ client : kuksa_instance.KuksaClientSingleton.instance().client
+ A client object to interact with the Kuksa server.
+ """
+
def __init__(self, parent=None):
+ """
+ Constructs all the necessary attributes for the FeedKuksa object.
+
+ Parameters:
+ -----------
+ parent : QObject
+ The parent object of the FeedKuksa object.
+ """
QThread.__init__(self,parent)
self.stop_flag = False
- self.set_instance()
def run(self):
- print("Starting thread")
+ """
+ Starts the thread and sets the instance of the Kuksa client.
+ """
+ logging.info("Starting thread")
self.set_instance()
- while not self.stop_flag:
- self.send_values()
def stop(self):
+ """
+ Stops the thread.
+ """
self.stop_flag = True
- print("Stopping thread")
+ logging.info("Stopping thread")
def set_instance(self):
- self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance()
- self.client = self.kuksa.get_client()
+ """
+ Sets the instance of the Kuksa client.
+ """
+ self.kuksa = kuksa_instance.KuksaClientSingleton.instance()
+ self.client = self.kuksa.client
+
+ def send_values(self, path=None, value=None, attribute=None):
+ """
+ Sends values to Kuksa.
+
+ Parameters:
+ -----------
+ path : str
+ The path to the value in Kuksa.
+ value : str
+ The value to be sent to Kuksa.
+ attribute : str
+ The attribute of the value in Kuksa.
- def send_values(self, Path=None, Value=None, Attribute=None):
+ Raises:
+ -------
+ Exception
+ If there is an error sending values to Kuksa.
+ """
if self.client is not None:
- if self.client.checkConnection() is True:
-
- if Attribute is not None:
- self.client.setValue(Path, Value, Attribute)
- else:
- self.client.setValue(Path, Value)
- else:
- print("Could not connect to Kuksa")
- self.set_instance()
+ if self.client.checkConnection():
+ try:
+ if attribute is not None:
+ self.client.setValue(path, str(value), attribute)
+ else:
+ self.client.setValue(path, str(value))
+ except Exception as e:
+ logging.error(f"Error sending values to kuksa {e}")
+ self.set_instance()
else:
- print("Kuksa client is None, try reconnecting")
+ logging.error("Kuksa client is None, try reconnecting")
time.sleep(2)
self.set_instance() \ No newline at end of file
diff --git a/extras/Kuksa_Instance.py b/extras/Kuksa_Instance.py
index a36ae2c..49662bd 100644
--- a/extras/Kuksa_Instance.py
+++ b/extras/Kuksa_Instance.py
@@ -1,3 +1,19 @@
+"""
+ Copyright 2023 Suchinton Chakravarty
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+
from typing import Optional
import kuksa_client as kuksa
import threading
@@ -6,60 +22,130 @@ import time
from extras import config
class KuksaClientSingleton:
- __instance: Optional["KuksaClientSingleton"] = None
- __lock = threading.Lock()
+ """
+ A singleton class that provides a single instance of KuksaClientThread.
+
+ This class is thread-safe and ensures that only one instance of KuksaClientThread is created.
+
+ Attributes:
+ _instance (Optional[KuksaClientSingleton]): The instance of the class.
+ _lock (threading.Lock): A lock to ensure thread-safety.
+ config (dict): The configuration for KuksaClientThread.
+ token (str): The path to the token file.
+ client (KuksaClientThread): The instance of KuksaClientThread.
+
+ Methods:
+ instance() -> KuksaClientSingleton: Returns the instance of the class.
+ reconnect(config, token) -> KuksaClientThread: Reconnects the client with the given configuration and token.
+ get_client() -> Optional[KuksaClientThread]: Returns the client instance.
+ get_config() -> dict: Returns the configuration.
+ get_token() -> str: Returns the path to the token file.
+ status() -> bool: Returns the status of the client connection.
+ """
+
+ _instance: Optional["KuksaClientSingleton"] = None
+ _lock = threading.Lock()
@staticmethod
- def get_instance() -> "KuksaClientSingleton":
- if KuksaClientSingleton.__instance is None:
- with KuksaClientSingleton.__lock:
- if KuksaClientSingleton.__instance is None:
- KuksaClientSingleton.__instance = KuksaClientSingleton()
- return KuksaClientSingleton.__instance
+ def instance() -> "KuksaClientSingleton":
+ """
+ Returns the instance of the class.
+
+ If the instance does not exist, it creates a new instance.
+
+ Returns:
+ KuksaClientSingleton: The instance of the class.
+ """
+ if KuksaClientSingleton._instance is None:
+ with KuksaClientSingleton._lock:
+ if KuksaClientSingleton._instance is None:
+ KuksaClientSingleton._instance = KuksaClientSingleton()
+ return KuksaClientSingleton._instance
def __init__(self):
- if KuksaClientSingleton.__instance is not None:
+ """
+ Initializes the class.
+
+ If the instance already exists, it raises an exception.
+
+ It initializes the configuration, token and client instance.
+ """
+ if KuksaClientSingleton._instance is not None:
raise Exception("This class is a singleton!")
- else:
- self.default_Config = config.KUKSA_CONFIG
- self.token = config.TOKEN_PATH
+ self.config = config.KUKSA_CONFIG
+ self.token = config.TOKEN_PATH
+
+ try:
+ self.client = kuksa.KuksaClientThread(self.config)
+ self.client.authorize(self.token)
+ time.sleep(2)
+ if not self.client.checkConnection():
+ self.client = None
+ except Exception as e:
+ print(e)
+
+ KuksaClientSingleton._instance = self
- try:
- self.client = kuksa.KuksaClientThread(self.default_Config)
- self.client.authorize(self.token)
- time.sleep(2)
- if self.client.checkConnection() == False:
- self.client = None
- except Exception as e:
- print(e)
-
+ def reconnect(self, config, token):
+ """
+ Reconnects the client with the given configuration and token.
- KuksaClientSingleton.__instance = self
+ Args:
+ config (dict): The configuration for KuksaClientThread.
+ token (str): The path to the token file.
- def reconnect_client(self, new_Config, new_Token):
- if self.client is not None:
+ Returns:
+ KuksaClientThread: The instance of KuksaClientThread.
+ """
+ if self.client:
self.client.stop()
- self.client = kuksa.KuksaClientThread(new_Config)
- self.client.authorize(new_Token)
+ self.client = kuksa.KuksaClientThread(config)
+ self.client.authorize(token)
return self.client
def get_client(self):
- return self.client
-
+ """
+ Returns the client instance.
+
+ Returns:
+ Optional[KuksaClientThread]: The instance of KuksaClientThread.
+ """
+ if self.client:
+ return self.client
+ else:
+ return None
+
def get_config(self):
- return self.default_Config
+ """
+ Returns the configuration.
+
+ Returns:
+ dict: The configuration for KuksaClientThread.
+ """
+ return self.config
def get_token(self):
+ """
+ Returns the path to the token file.
+
+ Returns:
+ str: The path to the token file.
+ """
return self.token
- def get_status(self):
- if self.client is not None:
- return self.client.checkConnection()
- else:
- return False
+ def status(self):
+ """
+ Returns the status of the client connection.
+
+ Returns:
+ bool: The status of the client connection.
+ """
+ return self.client.checkConnection() if self.client else False
def __del__(self):
- if self.client is not None:
- self.client.stop()
- return None
+ """
+ Stops the client instance.
+ """
+ if self.client:
+ self.client.stop() \ No newline at end of file
diff --git a/extras/UI_Handeler.py b/extras/UI_Handeler.py
index d591766..cbbaefc 100644
--- a/extras/UI_Handeler.py
+++ b/extras/UI_Handeler.py
@@ -1,81 +1,221 @@
+"""
+ Copyright 2023 Suchinton Chakravarty
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+
from main import *
+from PyQt5 import QtCore
from PyQt5.QtCore import QPropertyAnimation
-from PyQt5 import QtCore, QtWidgets
-from PyQt5.QtGui import QPixmap, QPainter
-from PyQt5.QtCore import QTimeLine
-from PyQt5.QtWidgets import QWidget, QStackedWidget, QPushButton
-from functools import partial
+from PyQt5.QtWidgets import QWidget
+from PyQt5.QtCore import QEasingCurve
+from PyQt5.QtWidgets import QGraphicsOpacityEffect
+from PyQt5.QtWidgets import QDesktopWidget
+import logging
+import json
+
+from . import Kuksa_Instance as kuksa_instance
+
+# Global variables
+subscribed = False
+should_execute_callback = True
class UI_Handeler(MainWindow):
- def toggleNavigationBar(self, maxWidth, enable):
- if enable:
- width = self.leftMenuSubContainer.width()
- maxExtend = maxWidth
- standard = 80
-
- if width == 80:
- widthExtended = maxExtend
- else:
- widthExtended = standard
+ """
+ This class handles the UI of the AGL Demo Control Panel application.
+ """
+
+ def Hide_Navbar(self, bool_arg):
+ """
+ This method hides the navigation bar of the UI.
+
+ Args:
+ - bool_arg: A boolean value indicating whether to hide the navigation bar or not.
+ """
+ height = self.BottomMenuSubContainer.height()
+ heightExtended = 75 if bool_arg else 0
- self.animation = QPropertyAnimation(self.leftMenuSubContainer, b"minimumWidth")
- self.animation.setDuration(400)
- self.animation.setStartValue(width)
- self.animation.setEndValue(widthExtended)
- self.animation.setEasingCurve(QtCore.QEasingCurve.InOutQuart)
- self.animation.start()
+ self.animation = QPropertyAnimation(self.BottomMenuSubContainer, b"minimumHeight")
+ self.animation.setDuration(400)
+ self.animation.setStartValue(height)
+ self.animation.setEndValue(heightExtended)
+ self.animation.setEasingCurve(QtCore.QEasingCurve.InOutQuart)
+ self.animation.start()
- # animate switching pages for QstackedWidget with the animation being a fade in and out
def animateSwitch(self, index):
+ """
+ This method animates the switching of pages for QstackedWidget with the animation being a fade in and out.
+
+ Args:
+ - index: The index of the page to switch to.
+ """
self.fader_widget = FaderWidget(self.stackedWidget.currentWidget(), self.stackedWidget.widget(index))
self.stackedWidget.setCurrentIndex(index)
- # add window resizing to the UI
def toggleMaximized(self):
+ """
+ This method toggles the maximized state of the window.
+ """
if self.isMaximized():
self.showNormal()
else:
self.showMaximized()
- # move the window by dragging the header
def moveWindow(self, event):
+ """
+ This method moves the window by dragging the header.
+ Args:
+ - event: The event object containing information about the mouse event.
+ """
if event.buttons() == QtCore.Qt.LeftButton:
self.move(self.pos() + event.globalPos() - self.clickPosition)
self.clickPosition = event.globalPos()
event.accept()
- # get the position of the mouse when clicked
- def mousePressEvent(self, event):
- self.clickPosition = event.globalPos()
- event.accept()
+ def set_instance(self):
+ """
+ This method sets the instance of the Kuksa client.
+
+ Returns:
+ - True if the client is connected to Kuksa, False otherwise.
+ """
+ self.kuksa = kuksa_instance.KuksaClientSingleton.instance()
+ self.client = self.kuksa.get_client()
+ if self.client is not None and self.client.checkConnection():
+ return True
+ else:
+ print("No connection to Kuksa")
+ return False
+
+ def subscribe_VSS_Signals(self):
+ """
+ This method subscribes to the VSS signals from Kuksa.
+ """
+ global subscribed
+ if not subscribed:
+ self.kuksa = kuksa_instance.KuksaClientSingleton.instance()
+ self.client = self.kuksa.get_client()
+ if self.client is not None and self.client.checkConnection():
+ signals = [
+ "Vehicle.Speed",
+ "Vehicle.Powertrain.CombustionEngine.Speed",
+ "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling",
+ "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling",
+ "Vehicle.Body.Lights.Hazard.IsSignaling",
+ "Vehicle.Powertrain.FuelSystem.Level",
+ "Vehicle.Powertrain.CombustionEngine.ECT",
+ "Vehicle.Powertrain.Transmission.SelectedGear",
+ "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature",
+ "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed",
+ "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature",
+ "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"]
+
+ for signal in signals:
+ self.client.subscribe(signal, lambda data: UI_Handeler.VSS_callback(self,data), 'value')
+ subscribed = True
+ else:
+ subscribed = False
+ print("No connection to Kuksa")
+
+ def VSS_callback(self,data):
+ """
+ This method is the callback function for the VSS signals from Kuksa.
+
+ Args:
+ - data: The data received from the signal.
+ """
+ global should_execute_callback
+ if should_execute_callback is False:
+ return
+ IC_Page = self.stackedWidget.widget(1)
+ HVAC_Page = self.stackedWidget.widget(2)
+
+ info = json.loads(data)
+ path = info.get('data', {}).get('path')
+ value = info.get('data', {}).get('dp', {}).get('value')
+
+ print(f"Received subscription event: {path} {value}")
+
+ if path == "Vehicle.Speed":
+ IC_Page.Speed_monitor.display(int(IC_Page.Speed_slider.value()))
+ IC_Page.Speed_slider.setValue(int(value))
+
+ if path == "Vehicle.Powertrain.CombustionEngine.Speed":
+ IC_Page.RPM_slider.setValue(int(value))
+ IC_Page.RPM_monitor.display(int(IC_Page.RPM_slider.value()))
+
+ if path == "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling":
+ IC_Page.leftIndicatorBtn.setChecked(bool(value))
+
+ if path == "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling":
+ IC_Page.rightIndicatorBtn.setChecked(bool(value))
+
+ if path == "Vehicle.Body.Lights.Hazard.IsSignaling":
+ IC_Page.hazardBtn.setChecked(bool(value))
+
+ if path == "Vehicle.Powertrain.FuelSystem.Level":
+ IC_Page.fuelLevel_slider.setValue(int(value))
+
+ if path == "Vehicle.Powertrain.CombustionEngine.ECT":
+ IC_Page.coolantTemp_slider.setValue(int(value))
+
+ if path == "Vehicle.Powertrain.Transmission.SelectedGear":
+ if int(value) == 127:
+ IC_Page.driveBtn.setChecked(True)
+ elif int(value) == 126:
+ IC_Page.parkBtn.setChecked(True)
+ elif int(value) == -1:
+ IC_Page.reverseBtn.setChecked(True)
+ elif int(value) == 0:
+ IC_Page.neutralBtn.setChecked(True)
+
+ if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature":
+ HVAC_Page.left_temp.setValue(int(value))
+
+ if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed":
+ HVAC_Page.left_fan.setValue(int(value))
+
+ if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature":
+ HVAC_Page.right_temp.setValue(int(value))
+
+ if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed":
+ HVAC_Page.right_fan.setValue(int(value))
- # get the position of the mouse when released
- def mouseReleaseEvent(self, event):
- self.clickPosition = None
- event.accept()
class FaderWidget(QWidget):
- def __init__(self, old_widget, new_widget):
- QWidget.__init__(self, new_widget)
- self.old_pixmap = QPixmap(new_widget.size())
- old_widget.render(self.old_pixmap)
- self.pixmap_opacity = 1.0
- self.timeline = QTimeLine()
- self.timeline.valueChanged.connect(self.animate)
- self.timeline.finished.connect(self.close)
- self.timeline.setDuration(250)
- self.timeline.start()
- self.resize(new_widget.size())
- self.show()
-
- def paintEvent(self, event):
- painter = QPainter()
- painter.begin(self)
- painter.setOpacity(self.pixmap_opacity)
- painter.drawPixmap(0, 0, self.old_pixmap)
- painter.end()
-
- def animate(self, value):
- self.pixmap_opacity = 1.0 - value
- self.repaint() \ No newline at end of file
+ def __init__(self, old_widget, new_widget):
+ super().__init__(new_widget)
+
+ self.old_widget = old_widget
+ self.new_widget = new_widget
+
+ self.effect = QGraphicsOpacityEffect()
+ self.new_widget.setGraphicsEffect(self.effect)
+
+ self.animation = QPropertyAnimation(self.effect, b"opacity")
+ self.animation.setDuration(300)
+ self.animation.setStartValue(0)
+ self.animation.setEndValue(1)
+ self.animation.setEasingCurve(QEasingCurve.OutCubic)
+ self.animation.finished.connect(self.close)
+
+ self.animate()
+
+ def animate(self):
+ self.animation.start()
+
+ def close(self):
+ self.old_widget.close()
+ self.new_widget.show()
+ super().close() \ No newline at end of file