""" dashboard/app.py ---------------- Flask backend for the BATL CARLA Orchestrator Dashboard. Serves the HTML UI, exposes REST endpoints, and streams real-time simulation output via Server-Sent Events (SSE). NOTE: This file lives inside the /dashboard sub-folder. PROJECT_ROOT is therefore TWO levels up: dashboard/app.py → dashboard/ → Fox/ """ import os import sys from flask import Flask, render_template, request, jsonify, Response import subprocess import time import psutil import logging # Suppress status polling logs to keep terminal clean class PollingFilter(logging.Filter): def filter(self, record): return "/api/simulator/status" not in record.getMessage() logging.getLogger('werkzeug').addFilter(PollingFilter()) # ------------------------------------------------------------------ # Path bootstrapping — make the project root importable regardless # of which directory the user launches from. # ------------------------------------------------------------------ PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) import config from src.scenario_loader import list_scenarios # ------------------------------------------------------------------ # Flask app — templates and static assets are siblings of this file. # ------------------------------------------------------------------ DASHBOARD_DIR = os.path.dirname(os.path.abspath(__file__)) app = Flask( __name__, template_folder=os.path.join(DASHBOARD_DIR, "templates"), static_folder=os.path.join(DASHBOARD_DIR, "static"), ) active_simulation_process = None def is_simulation_running(): """Returns True if a scenario process is currently alive.""" global active_simulation_process if active_simulation_process is None: return False return active_simulation_process.poll() is None # ── Routes ──────────────────────────────────────────────────────── @app.route("/") def index(): return render_template("index.html") @app.route("/api/config", methods=["GET"]) def get_config(): scenarios = list_scenarios() return jsonify({ "scenarios": scenarios, "default_scenario": getattr(config, "DEFAULT_SCENARIO", "braking"), "default_frames": getattr(config, "MAX_FRAMES", 200), "default_weather": getattr(config, "DEFAULT_WEATHER", "ClearNoon"), "weather_options": [ "ClearNoon", "CloudyNoon", "WetNoon", "SoftRainNoon", "HardRainNoon", "ClearSunset", "Night" ], }) @app.route("/api/scenario_params/", methods=["GET"]) def get_scenario_params(scenario_name): from src.scenario_loader import load_scenario try: scenario = load_scenario(scenario_name) tunable_params = {} # Convention: tunable constants are UPPERCASE class-level attributes for attr in dir(scenario): if attr.isupper() and not attr.startswith("_"): val = getattr(scenario, attr) if isinstance(val, (int, float, str, bool)): tunable_params[attr] = val return jsonify({"success": True, "params": tunable_params}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route("/api/simulator/status", methods=["GET"]) def simulator_status(): status = "offline" is_paused = False is_running = False try: for proc in psutil.process_iter(['name']): if proc.info['name'] and 'CarlaUE4' in proc.info['name']: is_running = True break except Exception: pass if is_running: status = "loading" try: import carla client = carla.Client('localhost', 2000) client.set_timeout(5.0) client.get_server_version() # Will raise if not ready status = "ready" world = client.get_world() is_paused = world.get_settings().synchronous_mode # HEARTBEAT: If idle (sync mode) and no scenario is running, tick once # to keep the UE4 window responsive. if is_paused and not is_simulation_running(): try: world.tick() except Exception: pass except Exception: pass return jsonify({"status": status, "is_paused": is_paused}) @app.route("/api/simulator/kill", methods=["POST"]) def simulator_kill(): """Forces all CarlaUE4 processes to terminate.""" try: killed_any = False for proc in psutil.process_iter(['name']): if proc.info['name'] and 'CarlaUE4' in proc.info['name']: try: proc.kill() killed_any = True except (psutil.NoSuchProcess, psutil.AccessDenied): pass return jsonify({"success": True, "killed": killed_any}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route("/api/shutdown", methods=["POST"]) def shutdown(): """Stops the Flask server itself.""" import threading def delay_exit(): time.sleep(1.0) print("[INFO] Shutdown triggered. Exiting...") os._exit(0) threading.Thread(target=delay_exit).start() return jsonify({"success": True, "message": "Server shutting down..."}) @app.route("/api/simulator/idle", methods=["POST"]) def simulator_idle(): try: import carla client = carla.Client('localhost', 2000) client.set_timeout(5.0) world = client.get_world() settings = world.get_settings() settings.synchronous_mode = True world.apply_settings(settings) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route("/api/simulator/wake", methods=["POST"]) def simulator_wake(): try: import carla client = carla.Client('localhost', 2000) client.set_timeout(5.0) world = client.get_world() settings = world.get_settings() settings.synchronous_mode = False settings.fixed_delta_seconds = None world.apply_settings(settings) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route("/api/simulator/launch", methods=["POST"]) def simulator_launch(): try: exe_path = getattr(config, "CARLA_EXECUTABLE_PATH", r"D:\CARLA\CARLA_0.9.16\CarlaUE4.exe") if not os.path.exists(exe_path): return jsonify({"success": False, "error": f"Executable not found at {exe_path}"}) # Launch detached subprocess.Popen([exe_path], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route("/api/stop", methods=["POST"]) def stop_simulation(): global active_simulation_process flag_path = os.path.join(PROJECT_ROOT, "tmp", "stop.flag") os.makedirs(os.path.dirname(flag_path), exist_ok=True) with open(flag_path, "w") as f: f.write("stop") if active_simulation_process is not None: try: active_simulation_process.wait(timeout=5) except subprocess.TimeoutExpired: active_simulation_process.terminate() active_simulation_process = None return jsonify({"success": True, "message": "Simulation stopped."}) @app.route("/api/run", methods=["POST"]) def run_simulation(): data = request.json or {} scenario = data.get("scenario") or "braking" frames = data.get("frames") or "" weather = data.get("weather") or "" no_record = bool(data.get("no_record")) params = data.get("params") or "" # Build the command — run.bat lives in PROJECT_ROOT cmd = ["cmd.exe", "/c", "run.bat", scenario] if frames: cmd.extend(["--frames", str(frames)]) if weather: cmd.extend(["--weather", weather]) if no_record: cmd.append("--no-record") if params: cmd.extend(["--params", f'"{params}"']) def generate(): global active_simulation_process try: # cwd = project root so that run.bat is reachable env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, cwd=PROJECT_ROOT, env=env, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, ) active_simulation_process = process import re ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') full_log = [] log_file_path = None yield f"data: [INFO] Starting command: {' '.join(cmd)}\n\n" def write_to_log(text): if log_file_path: try: with open(log_file_path, "a", encoding="utf-8") as f: f.write(text + "\n") except Exception: pass frame_show_count = 0 for raw_line in iter(process.stdout.readline, ""): if raw_line: # Strip ANSI and clean up trailing newlines clean_line = ansi_escape.sub('', raw_line).rstrip('\r\n') if not clean_line.strip(): continue # ------------------------------------------------------ # SSE Filtering: Sample high-frequency logs for the UI # ------------------------------------------------------ should_yield = True if "[FRAME " in clean_line: frame_show_count += 1 if frame_show_count % 10 != 1: # Show 1st, 11th, 21st, etc. should_yield = False if should_yield: yield f"data: {clean_line}\n\n" # ------------------------------------------------------ # Log Mirroring: Always write every cleaned line to disk # ------------------------------------------------------ if not log_file_path: full_log.append(clean_line) if "Saving data to: " in clean_line: try: # Extract path: "Saving data to: data\showcase_XXXX ---" # Note: the log says "--- Recorder initialized. Saving data to: data\showcase_2026... ---" path_part = clean_line.split("Saving data to: ")[1] rel_path = path_part.split("---")[0].strip() log_file_path = os.path.join(PROJECT_ROOT, rel_path, "console.log") os.makedirs(os.path.dirname(log_file_path), exist_ok=True) # Flush backlog for prev in full_log: write_to_log(prev) except Exception: pass else: # Once file path is known, append all new lines write_to_log(clean_line) process.stdout.close() return_code = process.wait() end_msg = f"[PROCESS_COMPLETED] Exit Code: {return_code}" yield f"data: {end_msg}\n\n" write_to_log(end_msg) except Exception as e: yield f"data: [ERROR] Failed to start process: {str(e)}\n\n" return Response(generate(), mimetype="text/event-stream") # ── Entry point ─────────────────────────────────────────────────── if __name__ == "__main__": print("Starting BATL CARLA Dashboard orchestrator...") print(f" Project root : {PROJECT_ROOT}") print(f" Dashboard dir: {DASHBOARD_DIR}") app.run(host="0.0.0.0", port=5000, debug=True)