diff options
author | Suchinton Chakravarty <suchinton.2001@gmail.com> | 2024-09-29 23:38:41 +0530 |
---|---|---|
committer | Suchinton Chakravarty <suchinton.2001@gmail.com> | 2024-10-10 18:31:58 +0530 |
commit | 554ec4cd07d68f4bcb569277881e368c450d993a (patch) | |
tree | 0f17a3498c3e7881157fee69a3e306ce100c7119 /Scripts | |
parent | 65d4619371979c8921ff155a6fe1d7de0e1d3598 (diff) |
Update Carla Playback Mode
- Now file playback changes values on control panel
- Added Config file path to ini file
- Fixed signal mapping for Indicator and Hazard lights
- Fixed crash of carla_to_CAN due to `Break` signal.
- can_messages.txt is now stored in assets dir
- Script Toggle shows error when can interface is not available
- Added cantools as new dependency
- Fixed default paths for can_messages playback file
Bug-AGL: SPEC-5161
Change-Id: I7b51ff3db1238e0c8addc19152d24d4ce2c8574e
Signed-off-by: Suchinton Chakravarty <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts')
-rw-r--r-- | Scripts/carla_to_CAN.py | 90 | ||||
-rw-r--r-- | Scripts/record_playback.py | 164 |
2 files changed, 177 insertions, 77 deletions
diff --git a/Scripts/carla_to_CAN.py b/Scripts/carla_to_CAN.py index 951d29b..4ca6e32 100644 --- a/Scripts/carla_to_CAN.py +++ b/Scripts/carla_to_CAN.py @@ -7,12 +7,13 @@ import math import can import cantools import argparse +import subprocess +import sys # ============================================================================== # -- CAN ----------------------------------------------------------------------- # ============================================================================== - class CAN(object): """ Represents a Controller Area Network (CAN) interface for sending messages to a vehicle. @@ -32,9 +33,9 @@ class CAN(object): lights_cache (str): The cached indicator lights state. """ - def __init__(self): + def __init__(self, can_bus): self.db = cantools.database.load_file('agl-vcar.dbc') - self.can_bus = can.interface.Bus('vcan0', interface='socketcan') + self.can_bus = can_bus self.speed_message = self.db.get_message_by_name('Vehicle_Status_1') self.gear_message = self.db.get_message_by_name('Transmission') @@ -51,6 +52,12 @@ class CAN(object): self.engine_speed_cache = 0 self.gear_cache = "P" self.lights_cache = None + self.indicator_signals = { + 'LeftBlinker': {'PT_LeftTurnOn': 1, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0}, + 'RightBlinker': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 1, 'PT_HazardOn': 0}, + 'carla.libcarla.VehicleLightState(48)': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 1}, + 'NONE': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0} + } def send_car_speed(self, speed): """ @@ -122,21 +129,10 @@ class CAN(object): Args: indicator (str): The indicator lights state ('LeftBlinker', 'RightBlinker', 'HazardLights'). """ - # Mapping indicator names to signal values - indicators_mapping = { - 'LeftBlinker': {'PT_LeftTurnOn': 1}, - 'RightBlinker': {'PT_RightTurnOn': 1}, - 'HazardLights': {'PT_HazardOn': 1} - } - - # Default signal values - signals = {'PT_HazardOn': 0, 'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0} - - # Update signals based on the indicator argument - signals.update(indicators_mapping.get(indicator, {})) + signal = self.indicator_signals[str(indicator)] # Encode and send the CAN message - data = self.Vehicle_Status_3_message.encode(signals) + data = self.Vehicle_Status_3_message.encode(signal) message = can.Message( arbitration_id=self.Vehicle_Status_3_message.frame_id, data=data) self.can_bus.send(message) @@ -173,14 +169,45 @@ class CAN(object): self.engine_speed_cache = rpm if lights is not None and lights != self.lights_cache: - self.send_indicator(lights) - self.lights_cache = lights + try: + self.send_indicator(lights) + self.lights_cache = lights + except Exception as e: + print(f"Error sending indicator lights: {e}") +def check_interface(interface): + """Check if the given interface is up.""" + try: + # Use ip link show to check if the interface is up + subprocess.check_output(['ip', 'link', 'show', interface]) + return True + except subprocess.CalledProcessError: + return False + +def get_default_interface(): + """Get the default CAN interface from the config file.""" + try: + # Import config from the parent directory + sys.path.append('../') + from extras import config + return config.get_can_interface() + except ImportError: + # If extras module is not found, return None + return None + +def create_can_bus(can_interface): + """Create and return a CAN bus object for the given can_interface.""" + try: + return can.interface.Bus(channel=can_interface, interface='socketcan') + except OSError as e: + raise RuntimeError(f'Failed to open CAN interface "{can_interface}": {e}') + def main(host='127.0.0.1', port=2000): parser = argparse.ArgumentParser(description='Carla to CAN Converter') parser.add_argument('--host', default='127.0.0.1', help='IP of the host server') parser.add_argument('--port', default=2000, type=int, help='TCP port to listen to') + parser.add_argument('--interface', required=False, help='CAN interface to use') args = parser.parse_args() client = carla.Client(args.host, args.port) @@ -188,7 +215,18 @@ def main(host='127.0.0.1', port=2000): world = client.get_world() - can = CAN() + # Determine the CAN interfaces + can_interface = args.interface or get_default_interface() or 'vcan0' + + print(f"CAN interface: {can_interface}") + + # Check interfaces + if check_interface(can_interface): + can_bus = create_can_bus(can_interface) + else: + raise RuntimeError("No available CAN interface found. To setup vcan0, run `sudo ./vcan.sh`.") + + can = CAN(can_bus) player_vehicle = None @@ -202,7 +240,6 @@ def main(host='127.0.0.1', port=2000): return try: - speed_kmh_cache = None engine_rpm_cache = None throttle_cache = None @@ -228,7 +265,6 @@ def main(host='127.0.0.1', port=2000): lights = player_vehicle.get_light_state() - # if any values have changed, try to send the CAN message if (speed_kmh != speed_kmh_cache or engine_rpm != engine_rpm_cache or control.throttle != throttle_cache or @@ -240,18 +276,18 @@ def main(host='127.0.0.1', port=2000): throttle_cache = control.throttle gear_cache = gear lights_cache = lights - - can.send_can_message(speed_kmh, engine_rpm, + try: + can.send_can_message(speed_kmh, engine_rpm, control.throttle, gear, lights) + except Exception as e: + print(f"New error: {e}") except Exception as e: - print( - f"An error occurred: {e}. The CARLA simulation might have stopped.") + print(f"An error occurred: {e}. The CARLA simulation might have stopped.") finally: - if can.can_bus is not None: + if hasattr(can, 'can_bus') and can.can_bus is not None: can.can_bus.shutdown() print("CAN bus properly shut down.") - if __name__ == "__main__": main() diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py index 5d3956e..0e631ad 100644 --- a/Scripts/record_playback.py +++ b/Scripts/record_playback.py @@ -7,79 +7,147 @@ import time from rich.console import Console import os import argparse +from PyQt6.QtCore import QThread, pyqtSignal +import cantools +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(current_dir)) + +from extras import config + +class CAN_playback(QThread): + finished = pyqtSignal() + + speedUpdate = pyqtSignal(int) + gearUpdate = pyqtSignal(str) + engineSpeedUpdate = pyqtSignal(int) + throttleUpdate = pyqtSignal(int) + indicatorUpdate = pyqtSignal(str) + + try: + playbackFile = config.get_playback_file() + except Exception as e: + print(e) + # playbackFile = os.path.join(current_dir, "can_messages.txt") -class CAN_playback: def __init__(self, interface='vcan0'): - """ - Initialize the CAN Tool with the specified interface. + super(CAN_playback, self).__init__() + self._running = False - Args: - interface (str): The CAN interface name (default: 'vcan0') - """ self.console_mode = False self.interface = interface self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000) - self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt') + self.output_file = self.playbackFile + + # Load the DBC file + self.db = cantools.database.load_file(os.path.join(os.path.dirname(__file__), 'agl-vcar.dbc')) + + # Create a mapping of relevant messages + self.message_map = { + 'Vehicle_Status_1': self.decode_speed, + 'Transmission': self.decode_gear, + 'Vehicle_Status_2': self.decode_engine_speed, + 'Engine': self.decode_throttle, + 'Vehicle_Status_3': self.decode_indicators + } def write_to_file(self, messages): - """ - Write captured CAN messages to a file. - - Args: - messages (list): List of can.Message objects to write - """ with open(self.output_file, 'w') as file: for msg in messages: file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n") def playback_messages(self): if os.path.exists(self.output_file): - #console.print("Replaying captured messages...") - messages = [] - with open(self.output_file, 'r') as file: - for line in file: - parts = line.strip().split(',', 3) # Split into at most 4 parts - timestamp, arbitration_id, dlc, data_str = parts - # Extract the data bytes, removing '0x' and splitting by ',' - data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] - - msg = can.Message( - timestamp=float(timestamp), - arbitration_id=int(arbitration_id, 0), - dlc=int(dlc), - data=data_bytes - ) - messages.append(msg) - self.replay_messages(messages) - + messages = [] + with open(self.output_file, 'r') as file: + for line in file: + parts = line.strip().split(',', 3) + timestamp, arbitration_id, dlc, data_str = parts + data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] + + msg = can.Message( + timestamp=float(timestamp), + arbitration_id=int(arbitration_id, 0), + dlc=int(dlc), + data=data_bytes + ) + messages.append(msg) + self.replay_messages(messages) + def stop(self): self._running = False if self.bus is not None: self.bus.shutdown() + self.finished.emit() def replay_messages(self, messages): - """ - Replay CAN messages on the specified bus. - - Args: - messages (list): List of can.Message objects to replay - """ self._running = True start_time = messages[0].timestamp for msg in messages: delay = msg.timestamp - start_time self.bus.send(msg) + + # Decode and emit signals for relevant messages + if msg.arbitration_id == self.db.get_message_by_frame_id(msg.arbitration_id).frame_id: + message_name = self.db.get_message_by_frame_id(msg.arbitration_id).name + if message_name in self.message_map: + decoded_data = self.decode_message(message_name, msg.data) + self.message_map[message_name](decoded_data) + time.sleep(delay) start_time = msg.timestamp - if self._running == False: return + if not self._running: + return + + def decode_message(self, message_name, data): + message = self.db.get_message_by_name(message_name) + return message.decode(data) + + def decode_speed(self, decoded_data): + speed = int(decoded_data['PT_VehicleAvgSpeed']) + self.speedUpdate.emit(speed) + + def decode_gear(self, decoded_data): + gear_value = int(decoded_data['Gear']) + if gear_value == 126: + gear = "P" + elif gear_value == 127: + gear = "D" + elif gear_value == -1 or gear_value == -2: + gear = "R" + elif gear_value == 0: + gear = "N" + else: + gear = f"{gear_value}" + self.gearUpdate.emit(gear) + + def decode_engine_speed(self, decoded_data): + engine_speed = int(decoded_data['PT_EngineSpeed']) + self.engineSpeedUpdate.emit(engine_speed) + + def decode_throttle(self, decoded_data): + throttle_position = int(decoded_data['ThrottlePosition']) + self.throttleUpdate.emit(throttle_position) + + def decode_indicators(self, decoded_data): + if decoded_data['PT_HazardOn'] == 1: + self.indicatorUpdate.emit('HazardOn') + if decoded_data['PT_HazardOn'] == 0: + self.indicatorUpdate.emit('HazardOff') + + if decoded_data['PT_LeftTurnOn'] == 1: + self.indicatorUpdate.emit('LeftBlinkerOn') + print("Left Blinker On") + if decoded_data['PT_LeftTurnOn'] == 0: + self.indicatorUpdate.emit('LeftBlinkerOff') + + if decoded_data['PT_RightTurnOn'] == 1: + self.indicatorUpdate.emit('RightBlinkerOn') + if decoded_data['PT_RightTurnOn'] == 0: + self.indicatorUpdate.emit('RightBlinkerOff') def capture_can_messages(self): - """ - Capture CAN messages from the specified bus. - - Returns: - list: List of captured can.Message objects - """ messages = [] if self.console_mode: @@ -101,17 +169,16 @@ def main(): from rich.console import Console from rich.prompt import Prompt - parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool') + parser = argparse.ArgumentParser(description='CAN Message Capture and Playback Tool') parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)') args = parser.parse_args() - # Initialize the CAN Tool with the specified interface can_tool = CAN_playback(interface=args.interface) can_tool.console_mode = True console = Console() while True: - console.print("\n[bold]CAN Message Capture and Replay[/bold]") + console.print("\n[bold]CAN Message Capture and Playback[/bold]") console.print("1. Capture CAN messages") console.print("2. Replay captured messages") console.print("3. Exit") @@ -131,12 +198,9 @@ def main(): else: console.print(f"No captured messages found in {can_tool.output_file}") - - else: console.print("Exiting...") break if __name__ == "__main__": main() - |