diff options
author | 2024-09-29 23:38:41 +0530 | |
---|---|---|
committer | 2024-10-10 18:31:58 +0530 | |
commit | 554ec4cd07d68f4bcb569277881e368c450d993a (patch) | |
tree | 0f17a3498c3e7881157fee69a3e306ce100c7119 | |
parent | 65d4619371979c8921ff155a6fe1d7de0e1d3598 (diff) |
Update Carla Playback Mode
- Now file playback changes values on control panel
- Added Config file path to ini file
- Fixed signal mapping for Indicator and Hazard lights
- Fixed crash of carla_to_CAN due to `Break` signal.
- can_messages.txt is now stored in assets dir
- Script Toggle shows error when can interface is not available
- Added cantools as new dependency
- Fixed default paths for can_messages playback file
Bug-AGL: SPEC-5161
Change-Id: I7b51ff3db1238e0c8addc19152d24d4ce2c8574e
Signed-off-by: Suchinton Chakravarty <suchinton.2001@gmail.com>
-rw-r--r-- | Scripts/carla_to_CAN.py | 90 | ||||
-rw-r--r-- | Scripts/record_playback.py | 164 | ||||
-rw-r--r-- | Widgets/ICPage.py | 72 | ||||
-rw-r--r-- | Widgets/animatedToggle.py | 20 | ||||
-rw-r--r-- | extras/config.ini | 4 | ||||
-rw-r--r-- | extras/config.py | 41 | ||||
-rw-r--r-- | requirements.txt | 1 |
7 files changed, 298 insertions, 94 deletions
diff --git a/Scripts/carla_to_CAN.py b/Scripts/carla_to_CAN.py index 951d29b..4ca6e32 100644 --- a/Scripts/carla_to_CAN.py +++ b/Scripts/carla_to_CAN.py @@ -7,12 +7,13 @@ import math import can import cantools import argparse +import subprocess +import sys # ============================================================================== # -- CAN ----------------------------------------------------------------------- # ============================================================================== - class CAN(object): """ Represents a Controller Area Network (CAN) interface for sending messages to a vehicle. @@ -32,9 +33,9 @@ class CAN(object): lights_cache (str): The cached indicator lights state. """ - def __init__(self): + def __init__(self, can_bus): self.db = cantools.database.load_file('agl-vcar.dbc') - self.can_bus = can.interface.Bus('vcan0', interface='socketcan') + self.can_bus = can_bus self.speed_message = self.db.get_message_by_name('Vehicle_Status_1') self.gear_message = self.db.get_message_by_name('Transmission') @@ -51,6 +52,12 @@ class CAN(object): self.engine_speed_cache = 0 self.gear_cache = "P" self.lights_cache = None + self.indicator_signals = { + 'LeftBlinker': {'PT_LeftTurnOn': 1, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0}, + 'RightBlinker': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 1, 'PT_HazardOn': 0}, + 'carla.libcarla.VehicleLightState(48)': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 1}, + 'NONE': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0} + } def send_car_speed(self, speed): """ @@ -122,21 +129,10 @@ class CAN(object): Args: indicator (str): The indicator lights state ('LeftBlinker', 'RightBlinker', 'HazardLights'). """ - # Mapping indicator names to signal values - indicators_mapping = { - 'LeftBlinker': {'PT_LeftTurnOn': 1}, - 'RightBlinker': {'PT_RightTurnOn': 1}, - 'HazardLights': {'PT_HazardOn': 1} - } - - # Default signal values - signals = {'PT_HazardOn': 0, 'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0} - - # Update signals based on the indicator argument - signals.update(indicators_mapping.get(indicator, {})) + signal = self.indicator_signals[str(indicator)] # Encode and send the CAN message - data = self.Vehicle_Status_3_message.encode(signals) + data = self.Vehicle_Status_3_message.encode(signal) message = can.Message( arbitration_id=self.Vehicle_Status_3_message.frame_id, data=data) self.can_bus.send(message) @@ -173,14 +169,45 @@ class CAN(object): self.engine_speed_cache = rpm if lights is not None and lights != self.lights_cache: - self.send_indicator(lights) - self.lights_cache = lights + try: + self.send_indicator(lights) + self.lights_cache = lights + except Exception as e: + print(f"Error sending indicator lights: {e}") +def check_interface(interface): + """Check if the given interface is up.""" + try: + # Use ip link show to check if the interface is up + subprocess.check_output(['ip', 'link', 'show', interface]) + return True + except subprocess.CalledProcessError: + return False + +def get_default_interface(): + """Get the default CAN interface from the config file.""" + try: + # Import config from the parent directory + sys.path.append('../') + from extras import config + return config.get_can_interface() + except ImportError: + # If extras module is not found, return None + return None + +def create_can_bus(can_interface): + """Create and return a CAN bus object for the given can_interface.""" + try: + return can.interface.Bus(channel=can_interface, interface='socketcan') + except OSError as e: + raise RuntimeError(f'Failed to open CAN interface "{can_interface}": {e}') + def main(host='127.0.0.1', port=2000): parser = argparse.ArgumentParser(description='Carla to CAN Converter') parser.add_argument('--host', default='127.0.0.1', help='IP of the host server') parser.add_argument('--port', default=2000, type=int, help='TCP port to listen to') + parser.add_argument('--interface', required=False, help='CAN interface to use') args = parser.parse_args() client = carla.Client(args.host, args.port) @@ -188,7 +215,18 @@ def main(host='127.0.0.1', port=2000): world = client.get_world() - can = CAN() + # Determine the CAN interfaces + can_interface = args.interface or get_default_interface() or 'vcan0' + + print(f"CAN interface: {can_interface}") + + # Check interfaces + if check_interface(can_interface): + can_bus = create_can_bus(can_interface) + else: + raise RuntimeError("No available CAN interface found. To setup vcan0, run `sudo ./vcan.sh`.") + + can = CAN(can_bus) player_vehicle = None @@ -202,7 +240,6 @@ def main(host='127.0.0.1', port=2000): return try: - speed_kmh_cache = None engine_rpm_cache = None throttle_cache = None @@ -228,7 +265,6 @@ def main(host='127.0.0.1', port=2000): lights = player_vehicle.get_light_state() - # if any values have changed, try to send the CAN message if (speed_kmh != speed_kmh_cache or engine_rpm != engine_rpm_cache or control.throttle != throttle_cache or @@ -240,18 +276,18 @@ def main(host='127.0.0.1', port=2000): throttle_cache = control.throttle gear_cache = gear lights_cache = lights - - can.send_can_message(speed_kmh, engine_rpm, + try: + can.send_can_message(speed_kmh, engine_rpm, control.throttle, gear, lights) + except Exception as e: + print(f"New error: {e}") except Exception as e: - print( - f"An error occurred: {e}. The CARLA simulation might have stopped.") + print(f"An error occurred: {e}. The CARLA simulation might have stopped.") finally: - if can.can_bus is not None: + if hasattr(can, 'can_bus') and can.can_bus is not None: can.can_bus.shutdown() print("CAN bus properly shut down.") - if __name__ == "__main__": main() diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py index 5d3956e..0e631ad 100644 --- a/Scripts/record_playback.py +++ b/Scripts/record_playback.py @@ -7,79 +7,147 @@ import time from rich.console import Console import os import argparse +from PyQt6.QtCore import QThread, pyqtSignal +import cantools +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(current_dir)) + +from extras import config + +class CAN_playback(QThread): + finished = pyqtSignal() + + speedUpdate = pyqtSignal(int) + gearUpdate = pyqtSignal(str) + engineSpeedUpdate = pyqtSignal(int) + throttleUpdate = pyqtSignal(int) + indicatorUpdate = pyqtSignal(str) + + try: + playbackFile = config.get_playback_file() + except Exception as e: + print(e) + # playbackFile = os.path.join(current_dir, "can_messages.txt") -class CAN_playback: def __init__(self, interface='vcan0'): - """ - Initialize the CAN Tool with the specified interface. + super(CAN_playback, self).__init__() + self._running = False - Args: - interface (str): The CAN interface name (default: 'vcan0') - """ self.console_mode = False self.interface = interface self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000) - self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt') + self.output_file = self.playbackFile + + # Load the DBC file + self.db = cantools.database.load_file(os.path.join(os.path.dirname(__file__), 'agl-vcar.dbc')) + + # Create a mapping of relevant messages + self.message_map = { + 'Vehicle_Status_1': self.decode_speed, + 'Transmission': self.decode_gear, + 'Vehicle_Status_2': self.decode_engine_speed, + 'Engine': self.decode_throttle, + 'Vehicle_Status_3': self.decode_indicators + } def write_to_file(self, messages): - """ - Write captured CAN messages to a file. - - Args: - messages (list): List of can.Message objects to write - """ with open(self.output_file, 'w') as file: for msg in messages: file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n") def playback_messages(self): if os.path.exists(self.output_file): - #console.print("Replaying captured messages...") - messages = [] - with open(self.output_file, 'r') as file: - for line in file: - parts = line.strip().split(',', 3) # Split into at most 4 parts - timestamp, arbitration_id, dlc, data_str = parts - # Extract the data bytes, removing '0x' and splitting by ',' - data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] - - msg = can.Message( - timestamp=float(timestamp), - arbitration_id=int(arbitration_id, 0), - dlc=int(dlc), - data=data_bytes - ) - messages.append(msg) - self.replay_messages(messages) - + messages = [] + with open(self.output_file, 'r') as file: + for line in file: + parts = line.strip().split(',', 3) + timestamp, arbitration_id, dlc, data_str = parts + data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] + + msg = can.Message( + timestamp=float(timestamp), + arbitration_id=int(arbitration_id, 0), + dlc=int(dlc), + data=data_bytes + ) + messages.append(msg) + self.replay_messages(messages) + def stop(self): self._running = False if self.bus is not None: self.bus.shutdown() + self.finished.emit() def replay_messages(self, messages): - """ - Replay CAN messages on the specified bus. - - Args: - messages (list): List of can.Message objects to replay - """ self._running = True start_time = messages[0].timestamp for msg in messages: delay = msg.timestamp - start_time self.bus.send(msg) + + # Decode and emit signals for relevant messages + if msg.arbitration_id == self.db.get_message_by_frame_id(msg.arbitration_id).frame_id: + message_name = self.db.get_message_by_frame_id(msg.arbitration_id).name + if message_name in self.message_map: + decoded_data = self.decode_message(message_name, msg.data) + self.message_map[message_name](decoded_data) + time.sleep(delay) start_time = msg.timestamp - if self._running == False: return + if not self._running: + return + + def decode_message(self, message_name, data): + message = self.db.get_message_by_name(message_name) + return message.decode(data) + + def decode_speed(self, decoded_data): + speed = int(decoded_data['PT_VehicleAvgSpeed']) + self.speedUpdate.emit(speed) + + def decode_gear(self, decoded_data): + gear_value = int(decoded_data['Gear']) + if gear_value == 126: + gear = "P" + elif gear_value == 127: + gear = "D" + elif gear_value == -1 or gear_value == -2: + gear = "R" + elif gear_value == 0: + gear = "N" + else: + gear = f"{gear_value}" + self.gearUpdate.emit(gear) + + def decode_engine_speed(self, decoded_data): + engine_speed = int(decoded_data['PT_EngineSpeed']) + self.engineSpeedUpdate.emit(engine_speed) + + def decode_throttle(self, decoded_data): + throttle_position = int(decoded_data['ThrottlePosition']) + self.throttleUpdate.emit(throttle_position) + + def decode_indicators(self, decoded_data): + if decoded_data['PT_HazardOn'] == 1: + self.indicatorUpdate.emit('HazardOn') + if decoded_data['PT_HazardOn'] == 0: + self.indicatorUpdate.emit('HazardOff') + + if decoded_data['PT_LeftTurnOn'] == 1: + self.indicatorUpdate.emit('LeftBlinkerOn') + print("Left Blinker On") + if decoded_data['PT_LeftTurnOn'] == 0: + self.indicatorUpdate.emit('LeftBlinkerOff') + + if decoded_data['PT_RightTurnOn'] == 1: + self.indicatorUpdate.emit('RightBlinkerOn') + if decoded_data['PT_RightTurnOn'] == 0: + self.indicatorUpdate.emit('RightBlinkerOff') def capture_can_messages(self): - """ - Capture CAN messages from the specified bus. - - Returns: - list: List of captured can.Message objects - """ messages = [] if self.console_mode: @@ -101,17 +169,16 @@ def main(): from rich.console import Console from rich.prompt import Prompt - parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool') + parser = argparse.ArgumentParser(description='CAN Message Capture and Playback Tool') parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)') args = parser.parse_args() - # Initialize the CAN Tool with the specified interface can_tool = CAN_playback(interface=args.interface) can_tool.console_mode = True console = Console() while True: - console.print("\n[bold]CAN Message Capture and Replay[/bold]") + console.print("\n[bold]CAN Message Capture and Playback[/bold]") console.print("1. Capture CAN messages") console.print("2. Replay captured messages") console.print("3. Exit") @@ -131,12 +198,9 @@ def main(): else: console.print(f"No captured messages found in {can_tool.output_file}") - - else: console.print("Exiting...") break if __name__ == "__main__": main() - diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py index e3d4ddf..b40177c 100644 --- a/Widgets/ICPage.py +++ b/Widgets/ICPage.py @@ -9,7 +9,7 @@ import sys from PyQt6 import uic, QtCore, QtWidgets from PyQt6.QtWidgets import QApplication from PyQt6.QtGui import QIcon, QPixmap, QPainter -from PyQt6.QtCore import QObject, pyqtSignal +from PyQt6.QtCore import QObject, pyqtSignal, QThread from PyQt6.QtWidgets import QWidget, QFrame from PyQt6.QtQuickWidgets import QQuickWidget import threading @@ -34,15 +34,10 @@ from Widgets.animatedToggle import AnimatedToggle def Gauge(gaugeType): """QWidget - This function creates a full gauge widget with the specified maximum value, current value, and unit. - - Args: - - maxValue: The maximum value of the gauge. - - value: The current value of the gauge. - - unit: The unit of the gauge. + This function creates gauge widgest of types RPM, Speed, Fuel and Coolant. Returns: - - A QQuickWidget object representing the full gauge widget. + - A QQuickWidget object representing the gauge widget. """ RPM_GaugeQML = os.path.join(current_dir, "../QMLWidgets/Full_Gauge/RPMGauge.qml") @@ -174,6 +169,39 @@ class ICWidget(Base, Form): self.rightIndicatorBtn.toggled.connect(self.rightIndicatorBtnClicked) self.hazardBtn.toggled.connect(self.hazardBtnClicked) + self.Playback = CAN_playback() + self.Playback_connections() + + def Playback_connections(self): + self.Playback.speedUpdate.connect(self.set_Vehicle_Speed) + self.Playback.gearUpdate.connect(self.playback_set_Vehicle_Gear) + self.Playback.engineSpeedUpdate.connect(self.set_Vehicle_RPM) + self.Playback.indicatorUpdate.connect(self.playback_set_Vehicle_Indicators) + + def playback_set_Vehicle_Gear(self, gear): + if gear == "P": + self.parkBtn.setChecked(True) + if gear == "D": + self.driveBtn.setChecked(True) + if gear == "R": + self.reverseBtn.setChecked(True) + if gear == "N": + self.neutralBtn.setChecked(True) + + def playback_set_Vehicle_Indicators(self, indicator): + if indicator == "HazardOn": + self.hazardBtn.setChecked(True) + elif indicator == "HazardOff": + self.hazardBtn.setChecked(False) + elif indicator == "LeftBlinkerOn": + self.leftIndicatorBtn.setChecked(True) + elif indicator == "LeftBlinkerOff": + self.leftIndicatorBtn.setChecked(False) + elif indicator == "RightBlinkerOn": + self.rightIndicatorBtn.setChecked(True) + elif indicator == "RightBlinkerOff": + self.rightIndicatorBtn.setChecked(False) + def set_Vehicle_Speed(self, speed): self.Speed_slider.setValue(speed) @@ -223,7 +251,8 @@ class ICWidget(Base, Form): Updates the fuel level monitor with the current fuel level value. """ fuelLevel = int(self.fuelLevel_slider.value()) - self.Fuel_Gauge.rootObject().setProperty('value', fuelLevel/100) + FuelGaugeLevel = fuelLevel / 100 + self.Fuel_Gauge.rootObject().setProperty('value', FuelGaugeLevel) print(self.Fuel_Gauge.rootObject().property('value')) try: self.kuksa_client.set(self.IC.fuelLevel, str(fuelLevel)) @@ -327,13 +356,26 @@ class ICWidget(Base, Form): def handle_Script_toggle(self): if config.file_playback_enabled(): - can_tool = CAN_playback() if self.Script_toggle.isChecked(): - can_tool_thread = threading.Thread( - target=can_tool.playback_messages) - can_tool_thread.start() - else: - can_tool.stop() + # start the playback thread + self.thread = QThread() + self.Playback.moveToThread(self.thread) + self.thread.started.connect(self.Playback.playback_messages) + self.thread.start() + # hide sliders + self.Speed_slider.hide() + self.RPM_slider.hide() + # set default values for coolent and fuel + self.coolantTemp_slider.setValue(90) + self.fuelLevel_slider.setValue(50) + + elif not self.Script_toggle.isChecked(): + self.Playback.stop() + self.thread.quit() + self.thread.wait() + + self.Speed_slider.show() + self.RPM_slider.show() else: if self.Script_toggle.isChecked(): diff --git a/Widgets/animatedToggle.py b/Widgets/animatedToggle.py index 7cfe254..f636ca0 100644 --- a/Widgets/animatedToggle.py +++ b/Widgets/animatedToggle.py @@ -6,6 +6,7 @@ import sys from PyQt6.QtGui import QColor, QPainter, QPainterPath, QBrush from PyQt6.QtCore import pyqtProperty, QPropertyAnimation, QPoint, QEasingCurve from PyQt6.QtWidgets import QApplication, QCheckBox +from PyQt6.QtCore import QTimer class AnimatedToggle(QCheckBox): @@ -117,6 +118,24 @@ class AnimatedToggle(QCheckBox): def hitButton(self, pos: QPoint): return self.contentsRect().contains(pos) + + # write a function to show an error, take color as input and change the color of the toggle switch for a few seconds while wiggling the switch + def showError(self): + # change the color of the toggle switch + self._bg_color = QColor("#FF0000") + # keep the switch in the off position + self.setChecked(False) + # wiggle the switch + for anim in [self.create_animation, self.create_bg_color_animation]: + animation = anim(False) + animation.start() + # reset the color after a few seconds + QTimer.singleShot(3000, self.resetColor) + + def resetColor(self): + self._bg_color = QColor("#965D62") + self.update_pos_color(self.isChecked()) + def paintEvent(self, event): """ @@ -138,7 +157,6 @@ class AnimatedToggle(QCheckBox): border_radius = self.height() / 2 toggle_width = self.height() * 2 - toggle_margin = self.height() * 0.3 circle_size = self.height() * 0.8 if self.circle_pos is None: diff --git a/extras/config.ini b/extras/config.ini index a533578..9d443da 100644 --- a/extras/config.ini +++ b/extras/config.ini @@ -1,8 +1,10 @@ [default] -fullscreen-mode = true +fullscreen-mode = false hvac-enabled = true steering-wheel-enabled = true file-playback-enabled = true +file-playback-path = +can-interface = [vss-server] ip = localhost diff --git a/extras/config.py b/extras/config.py index a0c60fd..7540e61 100644 --- a/extras/config.py +++ b/extras/config.py @@ -16,7 +16,9 @@ import os import platform +import can from configparser import ConfigParser +import logging python_version = f"python{'.'.join(platform.python_version_tuple()[:2])}" @@ -58,9 +60,17 @@ config_path = next((path for path, exists in CONFIG_PATHS.items() if exists), No if config_path: config.read(config_path) +PLAYBACK_FILE_PATHS = check_paths( + config.get('default', 'file-playback-path', fallback=None), + "/home/agl-driver/can_messages.txt", + "/etc/agl-demo-control-panel/can_messages.txt", + os.path.abspath(os.path.join(os.path.dirname(__file__), "../assets/can_messages.txt")) +) + CA_PATH = next((path for path, exists in CA_PATHS.items() if exists), None) WS_TOKEN = next((path for path, exists in WS_TOKEN_PATHS.items() if exists), None) GRPC_TOKEN = next((path for path, exists in GRPC_TOKEN_PATHS.items() if exists), None) +PLAYBACK_FILE = next((path for path, exists in PLAYBACK_FILE_PATHS.items() if exists), None) KUKSA_CONFIG = {} KUKSA_TOKEN = None @@ -154,6 +164,37 @@ def steering_wheel_enabled(): def file_playback_enabled(): return config.getboolean('default', 'file-playback-enabled', fallback=True) +def get_playback_file(): + if PLAYBACK_FILE is not None: + return PLAYBACK_FILE + else: + # save file in project baseDir/Scripts/can_messages.txt + with open(os.path.join(os.path.dirname(__file__), "can_messages.txt"), "w") as file: + file.write("") + # update config.ini + config.set('default', 'file-playback-path', os.path.join(os.path.dirname(__file__), "can_messages.txt")) + with open(USER_CONFIG_PATH, 'w') as configfile: + config.write(configfile) + + return os.path.join(os.path.dirname(__file__), "can_messages.txt") + +# Function to get the can interface name from config.ini +def get_can_interface(): + print(config.get('default', 'can-interface', fallback=None)) + return config.get('default', 'can-interface', fallback=None) + +# function to check if can interface is enabled or not +def can_interface_enabled(): + can_interface = get_can_interface() + if can_interface is not None: + try: + # check if an interface by the name exists + can.interface.Bus(interface='socketcan', channel=can_interface) + return True + except Exception as e: + logging.error(f"Error: {e}") + return False + return False if not config.has_section('vss-server'): config.add_section('vss-server') diff --git a/requirements.txt b/requirements.txt index ab78645..1fb014b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,5 +6,6 @@ PySide6_Addons==6.7.1 PySide6_Essentials==6.7.1 kuksa-client==0.4.0 python-can>=4.2.2 +cantools requests rich
\ No newline at end of file |