-
Notifications
You must be signed in to change notification settings - Fork 10
Official yolo testing #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good start, but I'd like to see this be a ROS node w/ the relevant parameters that rather than writing to a CSV file, it will publish messages on a topic that has the information that you would write to the CSV. Most of your logic would stay the same. @Rishav-N thoughts? ^
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. @AbuWabu3697 rewrite this as a ros node and write to the vision_msgs/Detection2DArray topic The format of the message should be vision_msgs/msg/Detection2DArray. Check: Each Detection2D in the array should be formatted like this: Check the types for everything Detection2D expects using the links. You have all this from your cv results in the code below you just have to put it in this format and publish to the topic. Your current code has to be executed as a script. We don't want this instead we want it to be launched as a ros node here: src/subsystems/navigation/athena_planner/launch/nav2_nodes.launch.py. Then we can utilize this in the behavior tree. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,158 @@ | ||
| import cv2 | ||
| import time | ||
| import csv | ||
| import os | ||
| from datetime import datetime | ||
| from ultralytics import YOLO | ||
| import signal | ||
|
|
||
| # ========= CONFIG ========= | ||
| MODEL_PATH = "best.pt" # trained model | ||
| CAMERA_INDEX = 0 # change this after running find_cameras.py | ||
|
|
||
| SAVE_VIDEO = False # True = save annotated video, False = no video file | ||
| OUTPUT_DIR = "yolo_output" | ||
| VIDEO_FPS = 20.0 # output video fps (approx) | ||
| STOP = False | ||
| # ========================== | ||
|
|
||
| def handle_stop(sig, frame): | ||
| global STOP | ||
| STOP = True | ||
|
|
||
| signal.signal(signal.SIGINT, handle_stop) | ||
| signal.signal(signal.SIGTERM, handle_stop) | ||
|
|
||
| def main(): | ||
| # Make output directory | ||
| os.makedirs(OUTPUT_DIR, exist_ok=True) | ||
|
|
||
| # Load YOLO model | ||
| print(f"[INFO] Loading model from {MODEL_PATH}") | ||
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | ||
| WEIGHTS_PATH = os.path.join(BASE_DIR, MODEL_PATH) # if best.pt is in same folder as yolo_live_logger.py | ||
| model = YOLO(WEIGHTS_PATH) | ||
| print("[INFO] Classes:", model.names) | ||
|
|
||
| # Open camera | ||
| print(f"[INFO] Opening camera index {CAMERA_INDEX}") | ||
| cap = cv2.VideoCapture(CAMERA_INDEX) | ||
| if not cap.isOpened(): | ||
| print("[ERROR] Could not open camera. Try changing CAMERA_INDEX.") | ||
| return | ||
|
|
||
| # Get frame size for video writer | ||
| ret, frame = cap.read() | ||
| if not ret: | ||
| print("[ERROR] Could not read initial frame from camera.") | ||
| cap.release() | ||
| return | ||
| height, width = frame.shape[:2] | ||
|
|
||
| # Setup video writer if needed | ||
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| video_path = os.path.join(OUTPUT_DIR, f"yolo_run_{timestamp}.mp4") | ||
| csv_path = os.path.join(OUTPUT_DIR, f"detections_log_{timestamp}.csv") | ||
|
|
||
| if SAVE_VIDEO: | ||
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | ||
| writer = cv2.VideoWriter(video_path, fourcc, VIDEO_FPS, (width, height)) | ||
| print(f"[INFO] Saving video to {video_path}") | ||
| else: | ||
| writer = None | ||
|
|
||
| # Setup CSV logging | ||
| csv_file = open(csv_path, mode="w", newline="", encoding="utf-8") | ||
| csv_writer = csv.writer(csv_file) | ||
| csv_writer.writerow([ | ||
| "frame_id", "timestamp", "class_id", "class_name", | ||
| "confidence", "x1", "y1", "x2", "y2", "width_px", "height_px" | ||
| ]) | ||
| print(f"[INFO] Logging detections to {csv_path}") | ||
|
|
||
| frame_id = 0 | ||
| fps = 0.0 | ||
| t_prev = time.time() | ||
|
|
||
| print("[INFO] Press 'q' to quit.") | ||
| while True: | ||
|
|
||
| if STOP: | ||
| break | ||
|
|
||
| ret, frame = cap.read() | ||
| if not ret: | ||
| print("[WARN] Failed to grab frame, stopping.") | ||
| break | ||
|
|
||
| t_start = time.time() | ||
|
|
||
| # Run YOLO on the BGR frame | ||
| results = model(frame, verbose=False) | ||
|
|
||
| # Draw detections | ||
| for r in results: | ||
| for box in r.boxes: | ||
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() | ||
| x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) | ||
|
|
||
| cls_id = int(box.cls[0]) | ||
| conf = float(box.conf[0]) | ||
| class_name = model.names.get(cls_id, str(cls_id)) | ||
|
|
||
| # Draw box | ||
| cv2.rectangle(frame, (x1, y1), (x2, y2), | ||
| (0, 255, 0), 2) | ||
|
|
||
| # Label text | ||
| label = f"{class_name} {conf:.2f}" | ||
| cv2.putText(frame, label, (x1, max(y1 - 10, 0)), | ||
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, | ||
| (0, 255, 0), 2) | ||
|
|
||
| # Log to CSV | ||
| csv_writer.writerow([ | ||
| frame_id, | ||
| datetime.now().isoformat(), | ||
| cls_id, | ||
| class_name, | ||
| f"{conf:.4f}", | ||
| x1, y1, x2, y2, | ||
| x2 - x1, | ||
| y2 - y1 | ||
| ]) | ||
|
|
||
| # FPS calculation | ||
| t_end = time.time() | ||
| fps = 1.0 / max(t_end - t_start, 1e-6) | ||
|
|
||
| # Draw FPS on frame | ||
| cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), | ||
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2) | ||
|
|
||
| # Show frame | ||
| #cv2.imshow("YOLO Live Logger", frame) | ||
|
|
||
| # Write frame to video if enabled | ||
| if writer is not None: | ||
| writer.write(frame) | ||
|
|
||
| frame_id += 1 | ||
|
|
||
| # Quit on 'q' | ||
| #key = cv2.waitKey(1) & 0xFF | ||
| #if key == ord('q'): | ||
| # break | ||
|
|
||
| # Cleanup | ||
| cap.release() | ||
| if writer is not None: | ||
| writer.release() | ||
| csv_file.close() | ||
| cv2.destroyAllWindows() | ||
|
|
||
| print("[INFO] Done.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store with git lfs.
https://dev.to/mishmanners/how-to-use-git-lfs-to-store-big-files-on-github-2b2e