diff options
author | suchinton2001 <suchinton.2001@gmail.com> | 2023-09-15 23:08:52 +0530 |
---|---|---|
committer | Jan-Simon Moeller <jsmoeller@linuxfoundation.org> | 2023-09-18 15:57:18 +0000 |
commit | 746451c1587a2830b47904b5abb9f0b6c1b10641 (patch) | |
tree | 4c2531b4740f4b7c61f2e245636c40df9d9c9844 | |
parent | c9e50f62b3a59133b64d98d15bfe18c52b2ba7f5 (diff) |
Update extra modules
V1:
- Add module to feed Steering Control CAN messages
- Add License headers to files
- Modify UI_Handler to accommodate new Navigation bar animations
- Minor code refactoring
V2:
- Add subscription handling in UI_Handler module
- Add documentation for code
- Remove redundant code
Bug-AGL: SPEC-4905
Signed-off-by: suchinton2001 <suchinton.2001@gmail.com>
Change-Id: I6f7c2c2fb00885064c7894329329f13e447648d3
-rw-r--r-- | extras/FeedCAN.py | 66 | ||||
-rw-r--r-- | extras/FeedKuksa.py | 81 | ||||
-rw-r--r-- | extras/Kuksa_Instance.py | 160 | ||||
-rw-r--r-- | extras/UI_Handeler.py | 250 |
4 files changed, 447 insertions, 110 deletions
diff --git a/extras/FeedCAN.py b/extras/FeedCAN.py new file mode 100644 index 0000000..92649f4 --- /dev/null +++ b/extras/FeedCAN.py @@ -0,0 +1,66 @@ +""" + Copyright 2023 Suchinton Chakravarty + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import can + +def send_can_signal(frame): + """ + Sends a CAN signal to the CAN bus using the given frame. + Args: + frame (str): The frame to be sent to the CAN bus. + Returns: + None + """ + msg = separate_can_frame(frame) + bus = can.interface.Bus(channel='can0', bustype='socketcan') + #msg = can.Message(arbitration_id=can_id, data=data, is_extended_id=False) + try: + bus.send(msg) + print("CAN signal sent successfully:") + print("CAN ID:", hex(msg.arbitration_id)) + print("Data:", msg.data) + if frame!="021#FFFFFFFF00000000": + # Turn off signal + send_can_signal("021#FFFFFFFF00000000") + + except can.CanError: + print("Failed to send CAN signal") + finally: + bus.shutdown() + +def separate_can_frame(frame): + """ + Separates a CAN frame into its arbitration ID and data parts. + + Args: + frame (str): A string representing the CAN frame in the format "ARBID#DATA". + + Returns: + can.Message: A can.Message object with the arbitration ID and data extracted from the input frame. + """ + arb_id, data = frame.split("#") + arb_id = int(arb_id, 16) + data = bytes.fromhex(data) + message = can.Message(arbitration_id=arb_id, data=data) + return message + + +def main(): + frame = "021#FFFFFFFF10000000" + send_can_signal(frame) + +if __name__ == "__main__": + main() diff --git a/extras/FeedKuksa.py b/extras/FeedKuksa.py index d75bda0..903b442 100644 --- a/extras/FeedKuksa.py +++ b/extras/FeedKuksa.py @@ -15,41 +15,86 @@ """ import time +import logging from PyQt5.QtCore import QThread from . import Kuksa_Instance as kuksa_instance class FeedKuksa(QThread): + """ + A class to handle sending values to Kuksa. + + Attributes: + ----------- + stop_flag : bool + A flag to stop the thread. + kuksa : kuksa_instance.KuksaClientSingleton.instance() + An instance of the Kuksa client. + client : kuksa_instance.KuksaClientSingleton.instance().client + A client object to interact with the Kuksa server. + """ + def __init__(self, parent=None): + """ + Constructs all the necessary attributes for the FeedKuksa object. + + Parameters: + ----------- + parent : QObject + The parent object of the FeedKuksa object. + """ QThread.__init__(self,parent) self.stop_flag = False - self.set_instance() def run(self): - print("Starting thread") + """ + Starts the thread and sets the instance of the Kuksa client. + """ + logging.info("Starting thread") self.set_instance() - while not self.stop_flag: - self.send_values() def stop(self): + """ + Stops the thread. + """ self.stop_flag = True - print("Stopping thread") + logging.info("Stopping thread") def set_instance(self): - self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance() - self.client = self.kuksa.get_client() + """ + Sets the instance of the Kuksa client. + """ + self.kuksa = kuksa_instance.KuksaClientSingleton.instance() + self.client = self.kuksa.client + + def send_values(self, path=None, value=None, attribute=None): + """ + Sends values to Kuksa. + + Parameters: + ----------- + path : str + The path to the value in Kuksa. + value : str + The value to be sent to Kuksa. + attribute : str + The attribute of the value in Kuksa. - def send_values(self, Path=None, Value=None, Attribute=None): + Raises: + ------- + Exception + If there is an error sending values to Kuksa. + """ if self.client is not None: - if self.client.checkConnection() is True: - - if Attribute is not None: - self.client.setValue(Path, Value, Attribute) - else: - self.client.setValue(Path, Value) - else: - print("Could not connect to Kuksa") - self.set_instance() + if self.client.checkConnection(): + try: + if attribute is not None: + self.client.setValue(path, str(value), attribute) + else: + self.client.setValue(path, str(value)) + except Exception as e: + logging.error(f"Error sending values to kuksa {e}") + self.set_instance() else: - print("Kuksa client is None, try reconnecting") + logging.error("Kuksa client is None, try reconnecting") time.sleep(2) self.set_instance()
\ No newline at end of file diff --git a/extras/Kuksa_Instance.py b/extras/Kuksa_Instance.py index a36ae2c..49662bd 100644 --- a/extras/Kuksa_Instance.py +++ b/extras/Kuksa_Instance.py @@ -1,3 +1,19 @@ +""" + Copyright 2023 Suchinton Chakravarty + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + from typing import Optional import kuksa_client as kuksa import threading @@ -6,60 +22,130 @@ import time from extras import config class KuksaClientSingleton: - __instance: Optional["KuksaClientSingleton"] = None - __lock = threading.Lock() + """ + A singleton class that provides a single instance of KuksaClientThread. + + This class is thread-safe and ensures that only one instance of KuksaClientThread is created. + + Attributes: + _instance (Optional[KuksaClientSingleton]): The instance of the class. + _lock (threading.Lock): A lock to ensure thread-safety. + config (dict): The configuration for KuksaClientThread. + token (str): The path to the token file. + client (KuksaClientThread): The instance of KuksaClientThread. + + Methods: + instance() -> KuksaClientSingleton: Returns the instance of the class. + reconnect(config, token) -> KuksaClientThread: Reconnects the client with the given configuration and token. + get_client() -> Optional[KuksaClientThread]: Returns the client instance. + get_config() -> dict: Returns the configuration. + get_token() -> str: Returns the path to the token file. + status() -> bool: Returns the status of the client connection. + """ + + _instance: Optional["KuksaClientSingleton"] = None + _lock = threading.Lock() @staticmethod - def get_instance() -> "KuksaClientSingleton": - if KuksaClientSingleton.__instance is None: - with KuksaClientSingleton.__lock: - if KuksaClientSingleton.__instance is None: - KuksaClientSingleton.__instance = KuksaClientSingleton() - return KuksaClientSingleton.__instance + def instance() -> "KuksaClientSingleton": + """ + Returns the instance of the class. + + If the instance does not exist, it creates a new instance. + + Returns: + KuksaClientSingleton: The instance of the class. + """ + if KuksaClientSingleton._instance is None: + with KuksaClientSingleton._lock: + if KuksaClientSingleton._instance is None: + KuksaClientSingleton._instance = KuksaClientSingleton() + return KuksaClientSingleton._instance def __init__(self): - if KuksaClientSingleton.__instance is not None: + """ + Initializes the class. + + If the instance already exists, it raises an exception. + + It initializes the configuration, token and client instance. + """ + if KuksaClientSingleton._instance is not None: raise Exception("This class is a singleton!") - else: - self.default_Config = config.KUKSA_CONFIG - self.token = config.TOKEN_PATH + self.config = config.KUKSA_CONFIG + self.token = config.TOKEN_PATH + + try: + self.client = kuksa.KuksaClientThread(self.config) + self.client.authorize(self.token) + time.sleep(2) + if not self.client.checkConnection(): + self.client = None + except Exception as e: + print(e) + + KuksaClientSingleton._instance = self - try: - self.client = kuksa.KuksaClientThread(self.default_Config) - self.client.authorize(self.token) - time.sleep(2) - if self.client.checkConnection() == False: - self.client = None - except Exception as e: - print(e) - + def reconnect(self, config, token): + """ + Reconnects the client with the given configuration and token. - KuksaClientSingleton.__instance = self + Args: + config (dict): The configuration for KuksaClientThread. + token (str): The path to the token file. - def reconnect_client(self, new_Config, new_Token): - if self.client is not None: + Returns: + KuksaClientThread: The instance of KuksaClientThread. + """ + if self.client: self.client.stop() - self.client = kuksa.KuksaClientThread(new_Config) - self.client.authorize(new_Token) + self.client = kuksa.KuksaClientThread(config) + self.client.authorize(token) return self.client def get_client(self): - return self.client - + """ + Returns the client instance. + + Returns: + Optional[KuksaClientThread]: The instance of KuksaClientThread. + """ + if self.client: + return self.client + else: + return None + def get_config(self): - return self.default_Config + """ + Returns the configuration. + + Returns: + dict: The configuration for KuksaClientThread. + """ + return self.config def get_token(self): + """ + Returns the path to the token file. + + Returns: + str: The path to the token file. + """ return self.token - def get_status(self): - if self.client is not None: - return self.client.checkConnection() - else: - return False + def status(self): + """ + Returns the status of the client connection. + + Returns: + bool: The status of the client connection. + """ + return self.client.checkConnection() if self.client else False def __del__(self): - if self.client is not None: - self.client.stop() - return None + """ + Stops the client instance. + """ + if self.client: + self.client.stop()
\ No newline at end of file diff --git a/extras/UI_Handeler.py b/extras/UI_Handeler.py index d591766..cbbaefc 100644 --- a/extras/UI_Handeler.py +++ b/extras/UI_Handeler.py @@ -1,81 +1,221 @@ +""" + Copyright 2023 Suchinton Chakravarty + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + from main import * +from PyQt5 import QtCore from PyQt5.QtCore import QPropertyAnimation -from PyQt5 import QtCore, QtWidgets -from PyQt5.QtGui import QPixmap, QPainter -from PyQt5.QtCore import QTimeLine -from PyQt5.QtWidgets import QWidget, QStackedWidget, QPushButton -from functools import partial +from PyQt5.QtWidgets import QWidget +from PyQt5.QtCore import QEasingCurve +from PyQt5.QtWidgets import QGraphicsOpacityEffect +from PyQt5.QtWidgets import QDesktopWidget +import logging +import json + +from . import Kuksa_Instance as kuksa_instance + +# Global variables +subscribed = False +should_execute_callback = True class UI_Handeler(MainWindow): - def toggleNavigationBar(self, maxWidth, enable): - if enable: - width = self.leftMenuSubContainer.width() - maxExtend = maxWidth - standard = 80 - - if width == 80: - widthExtended = maxExtend - else: - widthExtended = standard + """ + This class handles the UI of the AGL Demo Control Panel application. + """ + + def Hide_Navbar(self, bool_arg): + """ + This method hides the navigation bar of the UI. + + Args: + - bool_arg: A boolean value indicating whether to hide the navigation bar or not. + """ + height = self.BottomMenuSubContainer.height() + heightExtended = 75 if bool_arg else 0 - self.animation = QPropertyAnimation(self.leftMenuSubContainer, b"minimumWidth") - self.animation.setDuration(400) - self.animation.setStartValue(width) - self.animation.setEndValue(widthExtended) - self.animation.setEasingCurve(QtCore.QEasingCurve.InOutQuart) - self.animation.start() + self.animation = QPropertyAnimation(self.BottomMenuSubContainer, b"minimumHeight") + self.animation.setDuration(400) + self.animation.setStartValue(height) + self.animation.setEndValue(heightExtended) + self.animation.setEasingCurve(QtCore.QEasingCurve.InOutQuart) + self.animation.start() - # animate switching pages for QstackedWidget with the animation being a fade in and out def animateSwitch(self, index): + """ + This method animates the switching of pages for QstackedWidget with the animation being a fade in and out. + + Args: + - index: The index of the page to switch to. + """ self.fader_widget = FaderWidget(self.stackedWidget.currentWidget(), self.stackedWidget.widget(index)) self.stackedWidget.setCurrentIndex(index) - # add window resizing to the UI def toggleMaximized(self): + """ + This method toggles the maximized state of the window. + """ if self.isMaximized(): self.showNormal() else: self.showMaximized() - # move the window by dragging the header def moveWindow(self, event): + """ + This method moves the window by dragging the header. + Args: + - event: The event object containing information about the mouse event. + """ if event.buttons() == QtCore.Qt.LeftButton: self.move(self.pos() + event.globalPos() - self.clickPosition) self.clickPosition = event.globalPos() event.accept() - # get the position of the mouse when clicked - def mousePressEvent(self, event): - self.clickPosition = event.globalPos() - event.accept() + def set_instance(self): + """ + This method sets the instance of the Kuksa client. + + Returns: + - True if the client is connected to Kuksa, False otherwise. + """ + self.kuksa = kuksa_instance.KuksaClientSingleton.instance() + self.client = self.kuksa.get_client() + if self.client is not None and self.client.checkConnection(): + return True + else: + print("No connection to Kuksa") + return False + + def subscribe_VSS_Signals(self): + """ + This method subscribes to the VSS signals from Kuksa. + """ + global subscribed + if not subscribed: + self.kuksa = kuksa_instance.KuksaClientSingleton.instance() + self.client = self.kuksa.get_client() + if self.client is not None and self.client.checkConnection(): + signals = [ + "Vehicle.Speed", + "Vehicle.Powertrain.CombustionEngine.Speed", + "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling", + "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling", + "Vehicle.Body.Lights.Hazard.IsSignaling", + "Vehicle.Powertrain.FuelSystem.Level", + "Vehicle.Powertrain.CombustionEngine.ECT", + "Vehicle.Powertrain.Transmission.SelectedGear", + "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature", + "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed", + "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature", + "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"] + + for signal in signals: + self.client.subscribe(signal, lambda data: UI_Handeler.VSS_callback(self,data), 'value') + subscribed = True + else: + subscribed = False + print("No connection to Kuksa") + + def VSS_callback(self,data): + """ + This method is the callback function for the VSS signals from Kuksa. + + Args: + - data: The data received from the signal. + """ + global should_execute_callback + if should_execute_callback is False: + return + IC_Page = self.stackedWidget.widget(1) + HVAC_Page = self.stackedWidget.widget(2) + + info = json.loads(data) + path = info.get('data', {}).get('path') + value = info.get('data', {}).get('dp', {}).get('value') + + print(f"Received subscription event: {path} {value}") + + if path == "Vehicle.Speed": + IC_Page.Speed_monitor.display(int(IC_Page.Speed_slider.value())) + IC_Page.Speed_slider.setValue(int(value)) + + if path == "Vehicle.Powertrain.CombustionEngine.Speed": + IC_Page.RPM_slider.setValue(int(value)) + IC_Page.RPM_monitor.display(int(IC_Page.RPM_slider.value())) + + if path == "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling": + IC_Page.leftIndicatorBtn.setChecked(bool(value)) + + if path == "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling": + IC_Page.rightIndicatorBtn.setChecked(bool(value)) + + if path == "Vehicle.Body.Lights.Hazard.IsSignaling": + IC_Page.hazardBtn.setChecked(bool(value)) + + if path == "Vehicle.Powertrain.FuelSystem.Level": + IC_Page.fuelLevel_slider.setValue(int(value)) + + if path == "Vehicle.Powertrain.CombustionEngine.ECT": + IC_Page.coolantTemp_slider.setValue(int(value)) + + if path == "Vehicle.Powertrain.Transmission.SelectedGear": + if int(value) == 127: + IC_Page.driveBtn.setChecked(True) + elif int(value) == 126: + IC_Page.parkBtn.setChecked(True) + elif int(value) == -1: + IC_Page.reverseBtn.setChecked(True) + elif int(value) == 0: + IC_Page.neutralBtn.setChecked(True) + + if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature": + HVAC_Page.left_temp.setValue(int(value)) + + if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed": + HVAC_Page.left_fan.setValue(int(value)) + + if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature": + HVAC_Page.right_temp.setValue(int(value)) + + if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed": + HVAC_Page.right_fan.setValue(int(value)) - # get the position of the mouse when released - def mouseReleaseEvent(self, event): - self.clickPosition = None - event.accept() class FaderWidget(QWidget): - def __init__(self, old_widget, new_widget): - QWidget.__init__(self, new_widget) - self.old_pixmap = QPixmap(new_widget.size()) - old_widget.render(self.old_pixmap) - self.pixmap_opacity = 1.0 - self.timeline = QTimeLine() - self.timeline.valueChanged.connect(self.animate) - self.timeline.finished.connect(self.close) - self.timeline.setDuration(250) - self.timeline.start() - self.resize(new_widget.size()) - self.show() - - def paintEvent(self, event): - painter = QPainter() - painter.begin(self) - painter.setOpacity(self.pixmap_opacity) - painter.drawPixmap(0, 0, self.old_pixmap) - painter.end() - - def animate(self, value): - self.pixmap_opacity = 1.0 - value - self.repaint()
\ No newline at end of file + def __init__(self, old_widget, new_widget): + super().__init__(new_widget) + + self.old_widget = old_widget + self.new_widget = new_widget + + self.effect = QGraphicsOpacityEffect() + self.new_widget.setGraphicsEffect(self.effect) + + self.animation = QPropertyAnimation(self.effect, b"opacity") + self.animation.setDuration(300) + self.animation.setStartValue(0) + self.animation.setEndValue(1) + self.animation.setEasingCurve(QEasingCurve.OutCubic) + self.animation.finished.connect(self.close) + + self.animate() + + def animate(self): + self.animation.start() + + def close(self): + self.old_widget.close() + self.new_widget.show() + super().close()
\ No newline at end of file |