diff options
author | Suchinton <suchinton.2001@gmail.com> | 2024-07-01 00:10:54 +0530 |
---|---|---|
committer | Suchinton <suchinton.2001@gmail.com> | 2024-07-15 07:52:42 +0530 |
commit | b0844193f37f477c9e7e509e0b4eaf221886192b (patch) | |
tree | a125cc5d3d90db067eb65399b1c4d812a0f0a4b1 /Scripts/record_playback.py | |
parent | 25d451d87046a1cfbf7ac3cd47c2303fd29a22c5 (diff) |
Add Python Script to Convert CARLA data into CAN messagessalmon_18.90.0salmon/18.90.018.90.0
V1:
- Add carla_to_CAN.py script to convert CARLA data into CAN messages
- Add README and requirements.txt
V2:
- Add script to record and playback messages from can interface
- Fix mappings to agl-vcar.dbc file
V3:
- Fix playback feature for record_playback.py
- Update requirements.txt
- Update README to explain setup and usage of Scripts with CARLA
V4:
- Add file playback feature to Demo Control Panel
- Remove dependency on numpy to calculate vehicle speed, use math lib instead
- record_playback.py can now be imported and also be used in standalone mode
- Fix: Now data is sent to CAN interface only when it is updated
- Fix: Delay is now based on previous timestamp and not the starting timestamp
- Fix: Send correct Gear messages, compatible with the agl-vcar signals
Bug-AGL: SPEC-5161
Change-Id: I18a14e8e6ac4d24e6ed8774402fb93a36dec274e
Signed-off-by: Suchinton <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts/record_playback.py')
-rw-r--r-- | Scripts/record_playback.py | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py new file mode 100644 index 0000000..e518356 --- /dev/null +++ b/Scripts/record_playback.py @@ -0,0 +1,141 @@ +# Copyright (C) 2024 Suchinton Chakravarty +# +# SPDX-License-Identifier: Apache-2.0 + +import can +import time +from rich.console import Console +import os +import argparse + +class CAN_playback: + def __init__(self, interface='vcan0'): + """ + Initialize the CAN Tool with the specified interface. + + Args: + interface (str): The CAN interface name (default: 'vcan0') + """ + self.console_mode = False + self.interface = interface + self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000) + self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt') + + def write_to_file(self, messages): + """ + Write captured CAN messages to a file. + + Args: + messages (list): List of can.Message objects to write + """ + with open(self.output_file, 'w') as file: + for msg in messages: + file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n") + + def playback_messages(self): + if os.path.exists(self.output_file): + #console.print("Replaying captured messages...") + messages = [] + with open(self.output_file, 'r') as file: + for line in file: + parts = line.strip().split(',', 3) # Split into at most 4 parts + timestamp, arbitration_id, dlc, data_str = parts + # Extract the data bytes, removing '0x' and splitting by ',' + data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] + + msg = can.Message( + timestamp=float(timestamp), + arbitration_id=int(arbitration_id, 0), + dlc=int(dlc), + data=data_bytes + ) + messages.append(msg) + self.replay_messages(messages) + + def stop(self): + self._running = False + if self.bus is not None: + self.bus.shutdown() + + def replay_messages(self, messages): + """ + Replay CAN messages on the specified bus. + + Args: + messages (list): List of can.Message objects to replay + """ + self._running = True + start_time = messages[0].timestamp + for msg in messages: + delay = msg.timestamp - start_time + self.bus.send(msg) + time.sleep(delay) + start_time = msg.timestamp + if self._running == False: return + + def capture_can_messages(self): + """ + Capture CAN messages from the specified bus. + + Returns: + list: List of captured can.Message objects + """ + messages = [] + + if self.console_mode: + console = Console() + console.print(f"Capturing CAN messages on {self.interface}. Press Ctrl+C to stop.") + + + try: + while True: + message = self.bus.recv() + if message is not None: + messages.append(message) + except KeyboardInterrupt: + console.print("Capture stopped.") + + return messages + +def main(): + from rich.console import Console + from rich.prompt import Prompt + + parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool') + parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)') + args = parser.parse_args() + + # Initialize the CAN Tool with the specified interface + can_tool = CAN_playback(interface=args.interface) + can_tool.console_mode = True + + console = Console() + while True: + console.print("\n[bold]CAN Message Capture and Replay[/bold]") + console.print("1. Capture CAN messages") + console.print("2. Replay captured messages") + console.print("3. Exit") + + choice = Prompt.ask("Enter your choice", choices=['1', '2', '3']) + + if choice == '1': + messages = can_tool.capture_can_messages() + console.print(f"Captured {len(messages)} messages.") + can_tool.write_to_file(messages) + console.print(f"CAN messages written to {can_tool.output_file}") + elif choice == '2': + if os.path.exists(can_tool.output_file): + console.print("Replaying captured messages...") + can_tool.playback_messages() + console.print("Replay completed.") + else: + console.print(f"No captured messages found in {can_tool.output_file}") + + + + else: + console.print("Exiting...") + break + +if __name__ == "__main__": + main() |