diff options
Diffstat (limited to 'Scripts/record_playback.py')
-rw-r--r-- | Scripts/record_playback.py | 164 |
1 files changed, 114 insertions, 50 deletions
diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py index 5d3956e..0e631ad 100644 --- a/Scripts/record_playback.py +++ b/Scripts/record_playback.py @@ -7,79 +7,147 @@ import time from rich.console import Console import os import argparse +from PyQt6.QtCore import QThread, pyqtSignal +import cantools +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(current_dir)) + +from extras import config + +class CAN_playback(QThread): + finished = pyqtSignal() + + speedUpdate = pyqtSignal(int) + gearUpdate = pyqtSignal(str) + engineSpeedUpdate = pyqtSignal(int) + throttleUpdate = pyqtSignal(int) + indicatorUpdate = pyqtSignal(str) + + try: + playbackFile = config.get_playback_file() + except Exception as e: + print(e) + # playbackFile = os.path.join(current_dir, "can_messages.txt") -class CAN_playback: def __init__(self, interface='vcan0'): - """ - Initialize the CAN Tool with the specified interface. + super(CAN_playback, self).__init__() + self._running = False - Args: - interface (str): The CAN interface name (default: 'vcan0') - """ self.console_mode = False self.interface = interface self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000) - self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt') + self.output_file = self.playbackFile + + # Load the DBC file + self.db = cantools.database.load_file(os.path.join(os.path.dirname(__file__), 'agl-vcar.dbc')) + + # Create a mapping of relevant messages + self.message_map = { + 'Vehicle_Status_1': self.decode_speed, + 'Transmission': self.decode_gear, + 'Vehicle_Status_2': self.decode_engine_speed, + 'Engine': self.decode_throttle, + 'Vehicle_Status_3': self.decode_indicators + } def write_to_file(self, messages): - """ - Write captured CAN messages to a file. - - Args: - messages (list): List of can.Message objects to write - """ with open(self.output_file, 'w') as file: for msg in messages: file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n") def playback_messages(self): if os.path.exists(self.output_file): - #console.print("Replaying captured messages...") - messages = [] - with open(self.output_file, 'r') as file: - for line in file: - parts = line.strip().split(',', 3) # Split into at most 4 parts - timestamp, arbitration_id, dlc, data_str = parts - # Extract the data bytes, removing '0x' and splitting by ',' - data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] - - msg = can.Message( - timestamp=float(timestamp), - arbitration_id=int(arbitration_id, 0), - dlc=int(dlc), - data=data_bytes - ) - messages.append(msg) - self.replay_messages(messages) - + messages = [] + with open(self.output_file, 'r') as file: + for line in file: + parts = line.strip().split(',', 3) + timestamp, arbitration_id, dlc, data_str = parts + data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte] + + msg = can.Message( + timestamp=float(timestamp), + arbitration_id=int(arbitration_id, 0), + dlc=int(dlc), + data=data_bytes + ) + messages.append(msg) + self.replay_messages(messages) + def stop(self): self._running = False if self.bus is not None: self.bus.shutdown() + self.finished.emit() def replay_messages(self, messages): - """ - Replay CAN messages on the specified bus. - - Args: - messages (list): List of can.Message objects to replay - """ self._running = True start_time = messages[0].timestamp for msg in messages: delay = msg.timestamp - start_time self.bus.send(msg) + + # Decode and emit signals for relevant messages + if msg.arbitration_id == self.db.get_message_by_frame_id(msg.arbitration_id).frame_id: + message_name = self.db.get_message_by_frame_id(msg.arbitration_id).name + if message_name in self.message_map: + decoded_data = self.decode_message(message_name, msg.data) + self.message_map[message_name](decoded_data) + time.sleep(delay) start_time = msg.timestamp - if self._running == False: return + if not self._running: + return + + def decode_message(self, message_name, data): + message = self.db.get_message_by_name(message_name) + return message.decode(data) + + def decode_speed(self, decoded_data): + speed = int(decoded_data['PT_VehicleAvgSpeed']) + self.speedUpdate.emit(speed) + + def decode_gear(self, decoded_data): + gear_value = int(decoded_data['Gear']) + if gear_value == 126: + gear = "P" + elif gear_value == 127: + gear = "D" + elif gear_value == -1 or gear_value == -2: + gear = "R" + elif gear_value == 0: + gear = "N" + else: + gear = f"{gear_value}" + self.gearUpdate.emit(gear) + + def decode_engine_speed(self, decoded_data): + engine_speed = int(decoded_data['PT_EngineSpeed']) + self.engineSpeedUpdate.emit(engine_speed) + + def decode_throttle(self, decoded_data): + throttle_position = int(decoded_data['ThrottlePosition']) + self.throttleUpdate.emit(throttle_position) + + def decode_indicators(self, decoded_data): + if decoded_data['PT_HazardOn'] == 1: + self.indicatorUpdate.emit('HazardOn') + if decoded_data['PT_HazardOn'] == 0: + self.indicatorUpdate.emit('HazardOff') + + if decoded_data['PT_LeftTurnOn'] == 1: + self.indicatorUpdate.emit('LeftBlinkerOn') + print("Left Blinker On") + if decoded_data['PT_LeftTurnOn'] == 0: + self.indicatorUpdate.emit('LeftBlinkerOff') + + if decoded_data['PT_RightTurnOn'] == 1: + self.indicatorUpdate.emit('RightBlinkerOn') + if decoded_data['PT_RightTurnOn'] == 0: + self.indicatorUpdate.emit('RightBlinkerOff') def capture_can_messages(self): - """ - Capture CAN messages from the specified bus. - - Returns: - list: List of captured can.Message objects - """ messages = [] if self.console_mode: @@ -101,17 +169,16 @@ def main(): from rich.console import Console from rich.prompt import Prompt - parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool') + parser = argparse.ArgumentParser(description='CAN Message Capture and Playback Tool') parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)') args = parser.parse_args() - # Initialize the CAN Tool with the specified interface can_tool = CAN_playback(interface=args.interface) can_tool.console_mode = True console = Console() while True: - console.print("\n[bold]CAN Message Capture and Replay[/bold]") + console.print("\n[bold]CAN Message Capture and Playback[/bold]") console.print("1. Capture CAN messages") console.print("2. Replay captured messages") console.print("3. Exit") @@ -131,12 +198,9 @@ def main(): else: console.print(f"No captured messages found in {can_tool.output_file}") - - else: console.print("Exiting...") break if __name__ == "__main__": main() - |