|
import typer |
|
from typing import Optional |
|
from pathlib import Path |
|
from loguru import logger |
|
import cv2 |
|
from tqdm import tqdm |
|
import numpy as np |
|
import pandas as pd |
|
import shutil |
|
from datetime import datetime |
|
import matplotlib |
|
import os |
|
|
|
matplotlib.use("Agg") |
|
import matplotlib.pyplot as plt |
|
|
|
from predict import Model |
|
|
|
app = typer.Typer() |
|
|
|
|
|
@app.command(help="Export videos to images (to a dir per video)") |
|
def export_videos_to_images( |
|
input_dir: Path = typer.Argument(..., help="Input directory"), |
|
output_dir: Path = typer.Argument(..., help="Output directory"), |
|
ext: str = typer.Option("avi", help="Video Extension"), |
|
path_filter: Optional[str] = typer.Option(None, help="input path filter"), |
|
patient_prefix: Optional[bool] = typer.Option( |
|
True, help="use patient info as output dir prefix" |
|
), |
|
copy_extent: Optional[bool] = typer.Option( |
|
True, help="copy extent files to output dir" |
|
), |
|
): |
|
|
|
logger.info(f"Function called with arguments: {locals()}") |
|
|
|
|
|
input_dir = Path(input_dir) |
|
output_dir = Path(output_dir) |
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
video_files = list(input_dir.glob(f"**/*.{ext.lower()}")) |
|
video_files.extend(list(input_dir.glob(f"**/*.{ext.upper()}"))) |
|
logger.info(f"# of avi videos found: {len(video_files)}") |
|
if path_filter is not None: |
|
logger.info(f"Filtering videos with {path_filter}") |
|
video_files = [x for x in video_files if path_filter in str(x)] |
|
logger.info(f"# of avi videos found after filtering: {len(video_files)}") |
|
|
|
video_files.sort(key=lambda x: x.name) |
|
|
|
logger.info(f"{os.linesep}" + f"{os.linesep}".join([str(x) for x in video_files])) |
|
|
|
|
|
|
|
|
|
|
|
if copy_extent: |
|
all_exist = True |
|
for video_path in video_files: |
|
extent_filename = video_path.stem.replace("video_", "extents_") |
|
extent_path = video_path.parent / f"{extent_filename}.csv" |
|
if not extent_path.exists(): |
|
logger.error(f"Extent file {extent_path} does not exist") |
|
all_exist = False |
|
if not all_exist: |
|
logger.error("Extent files do not exist for all videos") |
|
return |
|
|
|
for video_path in video_files: |
|
|
|
if copy_extent: |
|
extent_filename = video_path.stem.replace("video_", "extents_") |
|
extent_path = video_path.parent / f"{extent_filename}.csv" |
|
shutil.copy(extent_path, output_dir) |
|
|
|
|
|
patient_id = ( |
|
video_path.parent.parent.parent.name |
|
) |
|
|
|
video_name = video_path.stem |
|
logger.info(f"Processing video {video_name} of patient {patient_id}") |
|
|
|
|
|
sub_dir = output_dir / ( |
|
f"{patient_id}-{video_name}" if patient_prefix else video_name |
|
) |
|
sub_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
frame_count = 0 |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if ret: |
|
|
|
cv2.imwrite(str(sub_dir / f"{frame_count:03}.jpg"), frame) |
|
frame_count += 1 |
|
else: |
|
break |
|
|
|
|
|
@app.command(help="Evaluate model on a directory of images") |
|
def eval( |
|
input_dir: Path = typer.Argument(..., help="Input directory"), |
|
input_model: Path = typer.Argument(..., help="Input model"), |
|
imgsz: int = typer.Option(640, help="Image size"), |
|
class_id: int = typer.Option(0, help="Class id to filter"), |
|
conf_thresh: float = typer.Option(0.5, help="Confidence threshold"), |
|
video_ext: str = typer.Option("avi", help="Video Extension"), |
|
out_dir: Path = typer.Option("runs", help="Output directory"), |
|
gt_csv_path: Path = typer.Option( |
|
"results_20230822_aorta_identified_added_by_Ray.csv", |
|
help="Ground truth csv path", |
|
), |
|
no_extent: Optional[bool] = typer.Option(True, help="no extent file"), |
|
write_viz: Optional[bool] = typer.Option(False, help="write viz images"), |
|
gt_column_name: str = typer.Option("aorta_identified", help="Ground truth column"), |
|
): |
|
|
|
assert input_dir.exists(), f"Input directory {input_dir} does not exist" |
|
assert input_model.exists(), f"Input model {input_model} does not exist" |
|
assert gt_csv_path.exists(), f"Ground truth csv {gt_csv_path} does not exist" |
|
|
|
|
|
model = Model( |
|
model_path=str(input_model), |
|
imgsz=imgsz, |
|
classes=[class_id], |
|
device="CPU", |
|
plot_mask=True, |
|
conf_thres=conf_thresh, |
|
is_async=False, |
|
n_jobs=1, |
|
) |
|
|
|
|
|
out_dir = Path(out_dir) |
|
|
|
start_t = datetime.now() |
|
start_timestamp = start_t.strftime("%Y_%m_%d_%H_%M_%S") |
|
out_dir = out_dir / f"max_aorta_result-{start_timestamp}" |
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
logger.add(str(out_dir.absolute()) + "/eval_{time}.log") |
|
|
|
out_csv_p = out_dir / "results.csv" |
|
out_trace_csv_p = out_dir / "trace.csv" |
|
logger.info(f"Output directory: {out_dir}") |
|
|
|
|
|
input_dir = Path(input_dir) |
|
sub_dirs = [x for x in input_dir.iterdir() if x.is_dir()] |
|
sub_dirs.sort(key=lambda x: x.name) |
|
logger.info(f"# of subdirectories found: {len(sub_dirs)}") |
|
num_sub_dirs = len(sub_dirs) |
|
has_patient_prefix = False if sub_dirs[0].name.startswith("video") else True |
|
|
|
|
|
trace_headers = ["video", "image_idx", "aorta_pixels", "aorta_mm", "conf"] |
|
headers = ["video", "max_aorta_pixels", "max_aorta_mm", "max_image_idx", "conf"] |
|
if has_patient_prefix: |
|
headers.insert(0, "patient_info") |
|
|
|
|
|
for idx, sub_dir in enumerate(sub_dirs): |
|
max_aorta_w = -1 |
|
max_aorta_w_mm = -1 |
|
max_aorta_viz = None |
|
max_aorta_im_path = None |
|
max_center_x, max_center_y = -1, -1 |
|
max_conf = None |
|
max_im_n = "" |
|
|
|
|
|
|
|
video_filename = ( |
|
sub_dir.name |
|
if not has_patient_prefix |
|
else "-".join(sub_dir.name.split("-")[1:]) |
|
) |
|
extent_filename = video_filename.replace("video_", "extents_") |
|
extent_file = sub_dir.parent / f"{extent_filename}.csv" |
|
extents = None |
|
if not no_extent: |
|
assert extent_file.exists(), f"Extent file {extent_file} does not exist" |
|
extents = pd.read_csv(extent_file).to_dict("records") |
|
|
|
logger.info(f"Processing subdir {sub_dir.name} ({idx+1}/{num_sub_dirs})") |
|
|
|
images = list(sub_dir.glob("*.jpg")) |
|
|
|
images.sort(key=lambda img: img.name) |
|
logger.info(f"\t# of images found: {len(images)}") |
|
|
|
|
|
out_sub_viz_dir = out_dir / sub_dir.name |
|
Path(out_sub_viz_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
for im_idx, image_path in enumerate(tqdm(images)): |
|
|
|
cv_frame = cv2.imread(str(image_path)) |
|
cv_width = cv_frame.shape[1] |
|
|
|
|
|
viz_frame, results = model.predict(cv_frame) |
|
bbox_xyxy = results[0] |
|
conf = results[1] |
|
masks = results[2] |
|
|
|
|
|
if write_viz: |
|
cv2.imwrite( |
|
str(out_sub_viz_dir / f"{image_path.stem}_viz.jpg"), |
|
viz_frame, |
|
) |
|
|
|
trace_row = [ |
|
f"{sub_dir.name}.{video_ext}", |
|
image_path.stem, |
|
-1, |
|
-1, |
|
conf, |
|
] |
|
|
|
if masks is not None or bbox_xyxy is not None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
aorta_width = bbox_xyxy[3] - bbox_xyxy[1] |
|
|
|
|
|
w_mm_left, w_mm_right, w_mm_per_pixel = None, None, None |
|
if not no_extent: |
|
w_mm_left = extents[im_idx]["Width-Left(mm)"] |
|
w_mm_right = extents[im_idx]["Width-Right(mm)"] |
|
assert w_mm_right > 0 and w_mm_left < 0 |
|
w_mm_per_pixel = (w_mm_right - w_mm_left) / cv_width |
|
|
|
|
|
trace_row[2] = aorta_width |
|
trace_row[3] = aorta_width * w_mm_per_pixel if not no_extent else None |
|
|
|
|
|
cv2.imwrite( |
|
str(out_sub_viz_dir / f"{image_path.stem}_viz.jpg"), |
|
viz_frame, |
|
) |
|
|
|
shutil.copy(image_path, out_sub_viz_dir) |
|
|
|
if aorta_width > max_aorta_w: |
|
max_aorta_w = aorta_width |
|
max_aorta_viz = viz_frame.copy() |
|
max_aorta_im_path = image_path |
|
|
|
|
|
|
|
|
|
|
|
max_im_n = image_path.stem |
|
max_conf = conf |
|
logger.info( |
|
f"\tNew max aorta (pixels): {max_aorta_w:.2f}, conf: {max_conf:.2f}" |
|
) |
|
|
|
|
|
max_aorta_w_mm = ( |
|
max_aorta_w * w_mm_per_pixel if not no_extent else None |
|
) |
|
|
|
|
|
df = pd.DataFrame([trace_row], columns=trace_headers) |
|
df.to_csv( |
|
out_trace_csv_p, |
|
mode="a", |
|
header=not out_trace_csv_p.exists(), |
|
index=False, |
|
float_format="%.3f", |
|
) |
|
|
|
if max_aorta_w > 0: |
|
logger.info(f"\tMax aorta (pixels): {max_aorta_w:.2f}") |
|
|
|
out_raw_p = out_dir / f"raw_{sub_dir.name}_{max_im_n}.jpg" |
|
shutil.copy(max_aorta_im_path, out_raw_p) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out_viz_p = out_dir / f"viz_{sub_dir.name}_{max_im_n}.jpg" |
|
max_aorta_viz_rgb = cv2.cvtColor(max_aorta_viz, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
dpi = plt.rcParams["figure.dpi"] |
|
figsize = ( |
|
max_aorta_viz_rgb.shape[1] / dpi, |
|
max_aorta_viz_rgb.shape[0] / dpi, |
|
) |
|
|
|
fig = plt.figure(figsize=figsize) |
|
if not no_extent: |
|
|
|
extent = [ |
|
extents[im_idx]["Width-Left(mm)"], |
|
extents[im_idx]["Width-Right(mm)"], |
|
extents[im_idx]["Depth-Bottom(mm)"], |
|
extents[im_idx]["Depth-Top(mm)"], |
|
] |
|
plt.imshow(max_aorta_viz_rgb, extent=extent) |
|
plt.xlabel("Width [mm]") |
|
plt.ylabel("Depth [mm]") |
|
else: |
|
plt.imshow(max_aorta_viz_rgb) |
|
plt.savefig(str(out_viz_p)) |
|
plt.close(fig) |
|
|
|
|
|
else: |
|
logger.warning(f"\tNo aorta found in {sub_dir.name}") |
|
patient_info = sub_dir.name.split("-")[0] if has_patient_prefix else "" |
|
row = [ |
|
f"{sub_dir.name}.{video_ext}", |
|
max_aorta_w, |
|
max_aorta_w_mm, |
|
max_im_n, |
|
max_conf, |
|
] |
|
if has_patient_prefix: |
|
row.insert(0, patient_info) |
|
|
|
video_name = "-".join(sub_dir.name.split("-")[1:]) + f".{video_ext}" |
|
row[1] = video_name |
|
|
|
|
|
|
|
df = pd.DataFrame([row], columns=headers) |
|
df.to_csv( |
|
out_csv_p, |
|
mode="a", |
|
header=not out_csv_p.exists(), |
|
index=False, |
|
float_format="%.3f", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Done! Results written to {out_csv_p}") |
|
|
|
|
|
@app.command(help="Copy source images to viz result folder") |
|
def copy_srcimg_to_vizdir( |
|
src_img_dir: Path = typer.Argument(..., help="Source Images root directory"), |
|
out_viz_dir: Path = typer.Argument(..., help="Target viz dirtectory"), |
|
): |
|
vizs = list(Path(out_viz_dir).glob("**/*.jpg")) |
|
for viz in vizs: |
|
splits = viz.stem.split("_") |
|
ori_img = Path(src_img_dir) / splits[1] / f"{splits[2]}.jpg" |
|
shutil.copy(ori_img, Path(out_viz_dir) / f"{viz.stem}_src.jpg") |
|
|
|
|
|
if __name__ == "__main__": |
|
app() |
|
|