CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

289 lines
11 KiB

"""
dashboard/app.py
----------------
Flask backend for the BATL CARLA Orchestrator Dashboard.
Serves the HTML UI, exposes REST endpoints, and streams
real-time simulation output via Server-Sent Events (SSE).
NOTE: This file lives inside the /dashboard sub-folder.
PROJECT_ROOT is therefore TWO levels up: dashboard/app.py → dashboard/ → Fox/
"""
import os
import sys
from flask import Flask, render_template, request, jsonify, Response
import subprocess
import time
import psutil
# ------------------------------------------------------------------
# Path bootstrapping — make the project root importable regardless
# of which directory the user launches from.
# ------------------------------------------------------------------
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import config
from src.scenario_loader import list_scenarios
# ------------------------------------------------------------------
# Flask app — templates and static assets are siblings of this file.
# ------------------------------------------------------------------
DASHBOARD_DIR = os.path.dirname(os.path.abspath(__file__))
app = Flask(
__name__,
template_folder=os.path.join(DASHBOARD_DIR, "templates"),
static_folder=os.path.join(DASHBOARD_DIR, "static"),
)
active_simulation_process = None
# ── Routes ────────────────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/config", methods=["GET"])
def get_config():
scenarios = list_scenarios()
return jsonify({
"scenarios": scenarios,
"default_scenario": getattr(config, "DEFAULT_SCENARIO", "braking"),
"default_frames": getattr(config, "MAX_FRAMES", 200),
"default_weather": getattr(config, "DEFAULT_WEATHER", "ClearNoon"),
"weather_options": [
"ClearNoon", "CloudyNoon", "WetNoon",
"SoftRainNoon", "HardRainNoon", "ClearSunset", "Night"
],
})
@app.route("/api/scenario_params/<scenario_name>", methods=["GET"])
def get_scenario_params(scenario_name):
from src.scenario_loader import load_scenario
try:
scenario = load_scenario(scenario_name)
tunable_params = {}
# Convention: tunable constants are UPPERCASE class-level attributes
for attr in dir(scenario):
if attr.isupper() and not attr.startswith("_"):
val = getattr(scenario, attr)
if isinstance(val, (int, float, str, bool)):
tunable_params[attr] = val
return jsonify({"success": True, "params": tunable_params})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/simulator/status", methods=["GET"])
def simulator_status():
status = "offline"
is_paused = False
is_running = False
try:
for proc in psutil.process_iter(['name']):
if proc.info['name'] and 'CarlaUE4' in proc.info['name']:
is_running = True
break
except Exception:
pass
if is_running:
status = "loading"
try:
import carla
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
client.get_server_version() # Will raise if not ready
status = "ready"
is_paused = client.get_world().get_settings().synchronous_mode
except Exception:
pass
return jsonify({"status": status, "is_paused": is_paused})
@app.route("/api/simulator/idle", methods=["POST"])
def simulator_idle():
try:
import carla
client = carla.Client('localhost', 2000)
client.set_timeout(5.0)
world = client.get_world()
settings = world.get_settings()
settings.synchronous_mode = True
world.apply_settings(settings)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/simulator/wake", methods=["POST"])
def simulator_wake():
try:
import carla
client = carla.Client('localhost', 2000)
client.set_timeout(5.0)
world = client.get_world()
settings = world.get_settings()
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
world.apply_settings(settings)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/simulator/launch", methods=["POST"])
def simulator_launch():
try:
exe_path = getattr(config, "CARLA_EXECUTABLE_PATH", r"D:\CARLA\CARLA_0.9.16\CarlaUE4.exe")
if not os.path.exists(exe_path):
return jsonify({"success": False, "error": f"Executable not found at {exe_path}"})
# Launch detached
subprocess.Popen([exe_path], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/stop", methods=["POST"])
def stop_simulation():
global active_simulation_process
flag_path = os.path.join(PROJECT_ROOT, "tmp", "stop.flag")
os.makedirs(os.path.dirname(flag_path), exist_ok=True)
with open(flag_path, "w") as f:
f.write("stop")
if active_simulation_process is not None:
try:
active_simulation_process.wait(timeout=5)
except subprocess.TimeoutExpired:
active_simulation_process.terminate()
active_simulation_process = None
return jsonify({"success": True, "message": "Simulation stopped."})
@app.route("/api/run", methods=["POST"])
def run_simulation():
data = request.json or {}
scenario = data.get("scenario") or "braking"
frames = data.get("frames") or ""
weather = data.get("weather") or ""
no_record = bool(data.get("no_record"))
params = data.get("params") or ""
# Build the command — run.bat lives in PROJECT_ROOT
cmd = ["cmd.exe", "/c", "run.bat", scenario]
if frames:
cmd.extend(["--frames", str(frames)])
if weather:
cmd.extend(["--weather", weather])
if no_record:
cmd.append("--no-record")
if params:
cmd.extend(["--params", f'"{params}"'])
def generate():
global active_simulation_process
try:
# cwd = project root so that run.bat is reachable
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=PROJECT_ROOT,
env=env,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
)
active_simulation_process = process
import re
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
full_log = []
log_file_path = None
yield f"data: [INFO] Starting command: {' '.join(cmd)}\n\n"
def write_to_log(text):
if log_file_path:
try:
with open(log_file_path, "a", encoding="utf-8") as f:
f.write(text + "\n")
except Exception:
pass
frame_show_count = 0
for raw_line in iter(process.stdout.readline, ""):
if raw_line:
# Strip ANSI and clean up trailing newlines
clean_line = ansi_escape.sub('', raw_line).rstrip('\r\n')
if not clean_line.strip():
continue
# ------------------------------------------------------
# SSE Filtering: Sample high-frequency logs for the UI
# ------------------------------------------------------
should_yield = True
if "[FRAME " in clean_line:
frame_show_count += 1
if frame_show_count % 10 != 1: # Show 1st, 11th, 21st, etc.
should_yield = False
if should_yield:
yield f"data: {clean_line}\n\n"
# ------------------------------------------------------
# Log Mirroring: Always write every cleaned line to disk
# ------------------------------------------------------
if not log_file_path:
full_log.append(clean_line)
if "Saving data to: " in clean_line:
try:
# Extract path: "Saving data to: data\showcase_XXXX ---"
# Note: the log says "--- Recorder initialized. Saving data to: data\showcase_2026... ---"
path_part = clean_line.split("Saving data to: ")[1]
rel_path = path_part.split("---")[0].strip()
log_file_path = os.path.join(PROJECT_ROOT, rel_path, "console.log")
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
# Flush backlog
for prev in full_log:
write_to_log(prev)
except Exception:
pass
else:
# Once file path is known, append all new lines
write_to_log(clean_line)
process.stdout.close()
return_code = process.wait()
end_msg = f"[PROCESS_COMPLETED] Exit Code: {return_code}"
yield f"data: {end_msg}\n\n"
write_to_log(end_msg)
except Exception as e:
yield f"data: [ERROR] Failed to start process: {str(e)}\n\n"
return Response(generate(), mimetype="text/event-stream")
# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
print("Starting BATL CARLA Dashboard orchestrator...")
print(f" Project root : {PROJECT_ROOT}")
print(f" Dashboard dir: {DASHBOARD_DIR}")
app.run(host="0.0.0.0", port=5000, debug=True)