diff options
author | 2024-09-29 23:38:41 +0530 | |
---|---|---|
committer | 2024-10-10 18:31:58 +0530 | |
commit | 554ec4cd07d68f4bcb569277881e368c450d993a (patch) | |
tree | 0f17a3498c3e7881157fee69a3e306ce100c7119 /Scripts/carla_to_CAN.py | |
parent | 65d4619371979c8921ff155a6fe1d7de0e1d3598 (diff) |
Update Carla Playback Mode
- Now file playback changes values on control panel
- Added Config file path to ini file
- Fixed signal mapping for Indicator and Hazard lights
- Fixed crash of carla_to_CAN due to `Break` signal.
- can_messages.txt is now stored in assets dir
- Script Toggle shows error when can interface is not available
- Added cantools as new dependency
- Fixed default paths for can_messages playback file
Bug-AGL: SPEC-5161
Change-Id: I7b51ff3db1238e0c8addc19152d24d4ce2c8574e
Signed-off-by: Suchinton Chakravarty <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts/carla_to_CAN.py')
-rw-r--r-- | Scripts/carla_to_CAN.py | 90 |
1 files changed, 63 insertions, 27 deletions
diff --git a/Scripts/carla_to_CAN.py b/Scripts/carla_to_CAN.py index 951d29b..4ca6e32 100644 --- a/Scripts/carla_to_CAN.py +++ b/Scripts/carla_to_CAN.py @@ -7,12 +7,13 @@ import math import can import cantools import argparse +import subprocess +import sys # ============================================================================== # -- CAN ----------------------------------------------------------------------- # ============================================================================== - class CAN(object): """ Represents a Controller Area Network (CAN) interface for sending messages to a vehicle. @@ -32,9 +33,9 @@ class CAN(object): lights_cache (str): The cached indicator lights state. """ - def __init__(self): + def __init__(self, can_bus): self.db = cantools.database.load_file('agl-vcar.dbc') - self.can_bus = can.interface.Bus('vcan0', interface='socketcan') + self.can_bus = can_bus self.speed_message = self.db.get_message_by_name('Vehicle_Status_1') self.gear_message = self.db.get_message_by_name('Transmission') @@ -51,6 +52,12 @@ class CAN(object): self.engine_speed_cache = 0 self.gear_cache = "P" self.lights_cache = None + self.indicator_signals = { + 'LeftBlinker': {'PT_LeftTurnOn': 1, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0}, + 'RightBlinker': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 1, 'PT_HazardOn': 0}, + 'carla.libcarla.VehicleLightState(48)': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 1}, + 'NONE': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0} + } def send_car_speed(self, speed): """ @@ -122,21 +129,10 @@ class CAN(object): Args: indicator (str): The indicator lights state ('LeftBlinker', 'RightBlinker', 'HazardLights'). """ - # Mapping indicator names to signal values - indicators_mapping = { - 'LeftBlinker': {'PT_LeftTurnOn': 1}, - 'RightBlinker': {'PT_RightTurnOn': 1}, - 'HazardLights': {'PT_HazardOn': 1} - } - - # Default signal values - signals = {'PT_HazardOn': 0, 'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0} - - # Update signals based on the indicator argument - signals.update(indicators_mapping.get(indicator, {})) + signal = self.indicator_signals[str(indicator)] # Encode and send the CAN message - data = self.Vehicle_Status_3_message.encode(signals) + data = self.Vehicle_Status_3_message.encode(signal) message = can.Message( arbitration_id=self.Vehicle_Status_3_message.frame_id, data=data) self.can_bus.send(message) @@ -173,14 +169,45 @@ class CAN(object): self.engine_speed_cache = rpm if lights is not None and lights != self.lights_cache: - self.send_indicator(lights) - self.lights_cache = lights + try: + self.send_indicator(lights) + self.lights_cache = lights + except Exception as e: + print(f"Error sending indicator lights: {e}") +def check_interface(interface): + """Check if the given interface is up.""" + try: + # Use ip link show to check if the interface is up + subprocess.check_output(['ip', 'link', 'show', interface]) + return True + except subprocess.CalledProcessError: + return False + +def get_default_interface(): + """Get the default CAN interface from the config file.""" + try: + # Import config from the parent directory + sys.path.append('../') + from extras import config + return config.get_can_interface() + except ImportError: + # If extras module is not found, return None + return None + +def create_can_bus(can_interface): + """Create and return a CAN bus object for the given can_interface.""" + try: + return can.interface.Bus(channel=can_interface, interface='socketcan') + except OSError as e: + raise RuntimeError(f'Failed to open CAN interface "{can_interface}": {e}') + def main(host='127.0.0.1', port=2000): parser = argparse.ArgumentParser(description='Carla to CAN Converter') parser.add_argument('--host', default='127.0.0.1', help='IP of the host server') parser.add_argument('--port', default=2000, type=int, help='TCP port to listen to') + parser.add_argument('--interface', required=False, help='CAN interface to use') args = parser.parse_args() client = carla.Client(args.host, args.port) @@ -188,7 +215,18 @@ def main(host='127.0.0.1', port=2000): world = client.get_world() - can = CAN() + # Determine the CAN interfaces + can_interface = args.interface or get_default_interface() or 'vcan0' + + print(f"CAN interface: {can_interface}") + + # Check interfaces + if check_interface(can_interface): + can_bus = create_can_bus(can_interface) + else: + raise RuntimeError("No available CAN interface found. To setup vcan0, run `sudo ./vcan.sh`.") + + can = CAN(can_bus) player_vehicle = None @@ -202,7 +240,6 @@ def main(host='127.0.0.1', port=2000): return try: - speed_kmh_cache = None engine_rpm_cache = None throttle_cache = None @@ -228,7 +265,6 @@ def main(host='127.0.0.1', port=2000): lights = player_vehicle.get_light_state() - # if any values have changed, try to send the CAN message if (speed_kmh != speed_kmh_cache or engine_rpm != engine_rpm_cache or control.throttle != throttle_cache or @@ -240,18 +276,18 @@ def main(host='127.0.0.1', port=2000): throttle_cache = control.throttle gear_cache = gear lights_cache = lights - - can.send_can_message(speed_kmh, engine_rpm, + try: + can.send_can_message(speed_kmh, engine_rpm, control.throttle, gear, lights) + except Exception as e: + print(f"New error: {e}") except Exception as e: - print( - f"An error occurred: {e}. The CARLA simulation might have stopped.") + print(f"An error occurred: {e}. The CARLA simulation might have stopped.") finally: - if can.can_bus is not None: + if hasattr(can, 'can_bus') and can.can_bus is not None: can.can_bus.shutdown() print("CAN bus properly shut down.") - if __name__ == "__main__": main() |