aboutsummaryrefslogtreecommitdiffstats
path: root/Scripts/carla_to_CAN.py
diff options
context:
space:
mode:
authorSuchinton Chakravarty <suchinton.2001@gmail.com>2024-09-29 23:38:41 +0530
committerSuchinton Chakravarty <suchinton.2001@gmail.com>2024-10-10 18:31:58 +0530
commit554ec4cd07d68f4bcb569277881e368c450d993a (patch)
tree0f17a3498c3e7881157fee69a3e306ce100c7119 /Scripts/carla_to_CAN.py
parent65d4619371979c8921ff155a6fe1d7de0e1d3598 (diff)
Update Carla Playback Mode
- Now file playback changes values on control panel - Added Config file path to ini file - Fixed signal mapping for Indicator and Hazard lights - Fixed crash of carla_to_CAN due to `Break` signal. - can_messages.txt is now stored in assets dir - Script Toggle shows error when can interface is not available - Added cantools as new dependency - Fixed default paths for can_messages playback file Bug-AGL: SPEC-5161 Change-Id: I7b51ff3db1238e0c8addc19152d24d4ce2c8574e Signed-off-by: Suchinton Chakravarty <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts/carla_to_CAN.py')
-rw-r--r--Scripts/carla_to_CAN.py90
1 files changed, 63 insertions, 27 deletions
diff --git a/Scripts/carla_to_CAN.py b/Scripts/carla_to_CAN.py
index 951d29b..4ca6e32 100644
--- a/Scripts/carla_to_CAN.py
+++ b/Scripts/carla_to_CAN.py
@@ -7,12 +7,13 @@ import math
import can
import cantools
import argparse
+import subprocess
+import sys
# ==============================================================================
# -- CAN -----------------------------------------------------------------------
# ==============================================================================
-
class CAN(object):
"""
Represents a Controller Area Network (CAN) interface for sending messages to a vehicle.
@@ -32,9 +33,9 @@ class CAN(object):
lights_cache (str): The cached indicator lights state.
"""
- def __init__(self):
+ def __init__(self, can_bus):
self.db = cantools.database.load_file('agl-vcar.dbc')
- self.can_bus = can.interface.Bus('vcan0', interface='socketcan')
+ self.can_bus = can_bus
self.speed_message = self.db.get_message_by_name('Vehicle_Status_1')
self.gear_message = self.db.get_message_by_name('Transmission')
@@ -51,6 +52,12 @@ class CAN(object):
self.engine_speed_cache = 0
self.gear_cache = "P"
self.lights_cache = None
+ self.indicator_signals = {
+ 'LeftBlinker': {'PT_LeftTurnOn': 1, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0},
+ 'RightBlinker': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 1, 'PT_HazardOn': 0},
+ 'carla.libcarla.VehicleLightState(48)': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 1},
+ 'NONE': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0}
+ }
def send_car_speed(self, speed):
"""
@@ -122,21 +129,10 @@ class CAN(object):
Args:
indicator (str): The indicator lights state ('LeftBlinker', 'RightBlinker', 'HazardLights').
"""
- # Mapping indicator names to signal values
- indicators_mapping = {
- 'LeftBlinker': {'PT_LeftTurnOn': 1},
- 'RightBlinker': {'PT_RightTurnOn': 1},
- 'HazardLights': {'PT_HazardOn': 1}
- }
-
- # Default signal values
- signals = {'PT_HazardOn': 0, 'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0}
-
- # Update signals based on the indicator argument
- signals.update(indicators_mapping.get(indicator, {}))
+ signal = self.indicator_signals[str(indicator)]
# Encode and send the CAN message
- data = self.Vehicle_Status_3_message.encode(signals)
+ data = self.Vehicle_Status_3_message.encode(signal)
message = can.Message(
arbitration_id=self.Vehicle_Status_3_message.frame_id, data=data)
self.can_bus.send(message)
@@ -173,14 +169,45 @@ class CAN(object):
self.engine_speed_cache = rpm
if lights is not None and lights != self.lights_cache:
- self.send_indicator(lights)
- self.lights_cache = lights
+ try:
+ self.send_indicator(lights)
+ self.lights_cache = lights
+ except Exception as e:
+ print(f"Error sending indicator lights: {e}")
+def check_interface(interface):
+ """Check if the given interface is up."""
+ try:
+ # Use ip link show to check if the interface is up
+ subprocess.check_output(['ip', 'link', 'show', interface])
+ return True
+ except subprocess.CalledProcessError:
+ return False
+
+def get_default_interface():
+ """Get the default CAN interface from the config file."""
+ try:
+ # Import config from the parent directory
+ sys.path.append('../')
+ from extras import config
+ return config.get_can_interface()
+ except ImportError:
+ # If extras module is not found, return None
+ return None
+
+def create_can_bus(can_interface):
+ """Create and return a CAN bus object for the given can_interface."""
+ try:
+ return can.interface.Bus(channel=can_interface, interface='socketcan')
+ except OSError as e:
+ raise RuntimeError(f'Failed to open CAN interface "{can_interface}": {e}')
+
def main(host='127.0.0.1', port=2000):
parser = argparse.ArgumentParser(description='Carla to CAN Converter')
parser.add_argument('--host', default='127.0.0.1', help='IP of the host server')
parser.add_argument('--port', default=2000, type=int, help='TCP port to listen to')
+ parser.add_argument('--interface', required=False, help='CAN interface to use')
args = parser.parse_args()
client = carla.Client(args.host, args.port)
@@ -188,7 +215,18 @@ def main(host='127.0.0.1', port=2000):
world = client.get_world()
- can = CAN()
+ # Determine the CAN interfaces
+ can_interface = args.interface or get_default_interface() or 'vcan0'
+
+ print(f"CAN interface: {can_interface}")
+
+ # Check interfaces
+ if check_interface(can_interface):
+ can_bus = create_can_bus(can_interface)
+ else:
+ raise RuntimeError("No available CAN interface found. To setup vcan0, run `sudo ./vcan.sh`.")
+
+ can = CAN(can_bus)
player_vehicle = None
@@ -202,7 +240,6 @@ def main(host='127.0.0.1', port=2000):
return
try:
-
speed_kmh_cache = None
engine_rpm_cache = None
throttle_cache = None
@@ -228,7 +265,6 @@ def main(host='127.0.0.1', port=2000):
lights = player_vehicle.get_light_state()
- # if any values have changed, try to send the CAN message
if (speed_kmh != speed_kmh_cache or
engine_rpm != engine_rpm_cache or
control.throttle != throttle_cache or
@@ -240,18 +276,18 @@ def main(host='127.0.0.1', port=2000):
throttle_cache = control.throttle
gear_cache = gear
lights_cache = lights
-
- can.send_can_message(speed_kmh, engine_rpm,
+ try:
+ can.send_can_message(speed_kmh, engine_rpm,
control.throttle, gear, lights)
+ except Exception as e:
+ print(f"New error: {e}")
except Exception as e:
- print(
- f"An error occurred: {e}. The CARLA simulation might have stopped.")
+ print(f"An error occurred: {e}. The CARLA simulation might have stopped.")
finally:
- if can.can_bus is not None:
+ if hasattr(can, 'can_bus') and can.can_bus is not None:
can.can_bus.shutdown()
print("CAN bus properly shut down.")
-
if __name__ == "__main__":
main()