import matplotlib.pyplot as plt from PIL import ImageFont from PIL import ImageDraw import multiprocessing from PIL import Image import numpy as np import itertools # import logging import math import cv2 import os # logging.basicConfig(filename=f'{os.getcwd()}/frame_processing.log', level=logging.INFO) # logging.info('Starting frame processing') fps = 0 def read_file(name): global fps cap = cv2.VideoCapture(name) fps = cap.get(cv2.CAP_PROP_FPS) if not cap.isOpened(): # logging.error("Cannot open Video") exit() frames = [] while True: ret,frame = cap.read() if not ret: # logging.info("Can't receive frame (stream end?). Exiting ...") break frames.append(frame) cap.release() cv2.destroyAllWindows() for i in range(len(frames)): # print(frames[i].shape) frames[i]=cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY) frames_with_index = [(frame, i) for i, frame in enumerate(frames)] return frames_with_index st = [0,1,2,3,4] dt = {} idx = 0; l = (tuple(i) for i in itertools.product(st, repeat=4) if tuple(reversed(i)) >= tuple(i)) l=list(l) cnt = 0 for i in range(0,len(l)): lt=l[i] mirror = tuple(reversed(lt)) dt[mirror]=i; dt[lt]=i; def calc_filtered_img(img): residual_img= np.zeros(img.shape) # residual_img = np.array(img); # fil = np.array([[-1,3,-3,1]]) # residual_img = cv2.filter2D(residual_img, -1, fil) for i in range(img.shape[0]): for j in range(img.shape[1]): residual_img[i, j] = - 3*img[i, j]; if(j>0): residual_img[i, j] += img[i, j-1] if(j+1<img.shape[1]): residual_img[i, j] += 3*img[i, j+1] if(j+2<img.shape[1]): residual_img[i,j]-= img[i, j+2] return residual_img def calc_q_t_img(img, q, t): qt_img = np.zeros(img.shape) dct = {} for i in range(img.shape[0]): for j in range(img.shape[1]): val = np.minimum(t, np.maximum(-t, np.round(img[i, j]/q))) dct[val] = dct.get(val,0)+1 qt_img[i, j] = val # print(dct) return qt_img def process_frame(frame_and_index): frame, index = frame_and_index # processing logic for a single frame # logging.info(f"Processing frame {index}") filtered_image = calc_filtered_img(frame) output_image = calc_q_t_img(filtered_image, q, t) output_image=output_image+2 # plt.imshow(output_image) return output_image.astype(np.uint8) # Center the filtered image at zero by adding 128 q = 3 t = 2 def process_video(frames_with_index): num_processes = multiprocessing.cpu_count() # logging.info(f"Using {num_processes} processes") pool = multiprocessing.Pool(num_processes) # process the frames in parallel processed_frames = pool.map(process_frame, frames_with_index) pool.close() pool.join() processed_frame_with_index = [(frame, i) for i, frame in enumerate(processed_frames)] return processed_frame_with_index co_occurrence_matrix_size = 5 co_occurrence_matrix_distance = 4 def each_frame(frame_and_index,processed_frames): # go rowise and column wise frame,index = frame_and_index freq_dict = {} for i in range( frame.shape[0]): for j in range( frame.shape[1]-co_occurrence_matrix_distance): row = frame[i] v1 = row[j:j+4] k1 = tuple(v1) freq_dict[k1]=freq_dict.get(k1,0)+1 freq_dict2={} for i in range( frame.shape[0]-co_occurrence_matrix_distance): for j in range( frame.shape[1]): column = frame[:, j] v2 = column[i:i+4] k2 = tuple(v2) freq_dict2[k2]=freq_dict2.get(k2,0)+1 freq_dict3={} for i in range( frame.shape[0]): for j in range( frame.shape[1]): # get next possible 4 frames if index < len(processed_frames)-3: f1 = processed_frames[index+1][i,j] f2 = processed_frames[index+2][i,j] f3 = processed_frames[index+3][i,j] k = (frame[i,j], f1, f2, f3) freq_dict3[k]=freq_dict3.get(k,0)+1 # logging.info(f"hist made for frame {index}") return (freq_dict,freq_dict2,freq_dict3) def extract_video(processed_frame_with_index): processed_frames = [frame for frame, index in processed_frame_with_index] num_processes = multiprocessing.cpu_count() # logging.info(f"Using2 {num_processes} processes") pool = multiprocessing.Pool(num_processes) # process the frames in parallel freq_dict_list = pool.starmap(each_frame, zip(processed_frame_with_index,itertools.repeat(processed_frames))) pool.close() pool.join() return freq_dict_list def final(freq_dict_list): descriptors = [] for freq_dicts in freq_dict_list: di1=[] for freq_dict in freq_dicts: frame = np.zeros(325); for(k,v) in freq_dict.items(): frame[dt[k]]+=v di1.append(frame); descriptors.append(di1) descriptors=np.array(descriptors); desc_1d = descriptors.reshape(descriptors.shape[0],-1) mean_1d = np.mean(desc_1d,axis=0) co_variance_1d = np.zeros((1,1)) for frame in desc_1d: mean_1d+=frame mean_1d=frame/len(desc_1d) for frame in desc_1d: tmp = frame-mean_1d co_variance_1d+=np.matmul(tmp,tmp.T) co_variance_1d=co_variance_1d/len(desc_1d) mean = np.zeros(descriptors[0].shape) co_variance = np.zeros((3,3)) for frame in descriptors: mean+=frame mean=frame/len(descriptors) # print(mean) for frame in descriptors: tmp=frame-mean tc=np.matmul(tmp,tmp.T) co_variance+=tc co_variance=co_variance/len(descriptors) return (mean,co_variance,descriptors,mean_1d,co_variance_1d,desc_1d) def final_main(input1,input2): f1 = read_file(input1) of1 = read_file(input2) pf1 = process_video(f1) pof1=process_video(of1) fd1 = extract_video(pf1) ofd1 = extract_video(pof1) mean1,co_variance1,disc1,mean_1d_1,co_variance_1d_1,desc_1d_1=final(fd1) mean2,co_variance2,disc2,mean_1d_2,co_variance_1d_2,desc_1d_2=final(ofd1) distances = [] for index,disc in enumerate(disc1): gm = disc - mean2 dm = np.matmul(np.matmul(gm.T,np.linalg.inv(co_variance2)),gm) dm_sq = np.sqrt(np.abs(dm)) distances.append(dm_sq) distances = np.array(distances) dist2 = [] for index, disc in enumerate(disc2): gm = disc - mean2 dm = np.matmul(np.matmul(gm.T,np.linalg.inv(co_variance2)),gm) dm_sq = np.sqrt(np.abs(dm)) dist2.append(dm_sq) dist2 = np.array(dist2) fourcc = cv2.VideoWriter_fourcc(*'mp4v') height =f1[0][0].shape[0]+of1[0][0].shape[0] width = 325+f1[0][0].shape[1] video = cv2.VideoWriter('video.mp4', fourcc, 30, (width,height)) inital_diff,final_diff = 10000,-1 result = '' for index, dist in enumerate(distances): heatmap = dist; frame,index = f1[index] different = False if index<len(of1): frame2 = of1[index][0] diff = dist - dist2[index] if not np.allclose(diff, np.zeros(diff.shape)): different = True inital_diff = min(inital_diff, index) final_diff = max(final_diff, index) sum1= np.sum(dist) sum2 = np.sum(dist2[index]) new_im = Image.new('RGB', (width, height)) new_im.paste(Image.fromarray(frame), (0, 0)) new_im.paste(Image.fromarray(frame2), (0, frame.shape[0])) heatmapshow = None heatmapshow = cv2.normalize(heatmap, heatmapshow, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U) heatmapshow = cv2.applyColorMap(heatmapshow, cv2.COLORMAP_JET) new_im.paste(Image.fromarray(heatmapshow), (frame.shape[1], 0)) draw = ImageDraw.Draw(new_im) text = "The images are same." if different: text = "The images are different." text_width, text_height = draw.textsize(text) x = (new_im.width - text_width) / 2 y = (new_im.height - text_height) / 2 draw.text((x, y), text, fill=(255, 255, 255)) new_im = np.array(new_im) video.write(new_im) outputString = "" if inital_diff != 10000: outputString+=f"Initial difference at frame {inital_diff} at time {inital_diff/fps} seconds" outputString+=f"Final difference at frame {final_diff} at time {final_diff/fps} seconds" video.release() return ("video.mp4",outputString)