[ Foro de Python ]
video = cv2.VideoCapture("./Videoplestimografia/02_02_2021/1080P/VID_20210202_090214.mp4")
shape_predictor = dlib.shape_predictor("./shape_predictor/shape_predictor_68_face_landmarks.dat")
prototxtPath = "./Face-Mask-Detection-master/face_detector/deploy.prototxt"
weightsPath = "./Face-Mask-Detection-master/face_detector/res10_300x300_ssd_iter_140000.caffemodel"
face_detector = cv2.dnn.readNet(prototxtPath, weightsPath)
model = load_model("./Face-Mask-Detection-master/mask_detector.model")
(major_ver, minor_ver, subminor_ver) = (cv2.__version__).split('.')
if int(major_ver) < 3 :
fps_original = round(video.get(cv2.cv.CV_CAP_PROP_FPS))
print("Frames per second using video.get(cv2.cv.CV_CAP_PROP_FPS): {0}".format(fps_original))
else :
fps_original = round(video.get(cv2.CAP_PROP_FPS))
print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(fps_original))
frame_count = 0
timestamps = []
settings = Settings()
fftlength = 70
# used to record the time when we processed last frame
prev_frame_time = 0
# used to record the time at which we processed current frame
new_frame_time = 0
rPPG_roi1 = []
rPPG_roi2 = []
prev_face = {}
prev_roi1 = {}
prev_roi2 = {}
R, G, B = [0, 1, 2]
band_hr = [40, 240]
band_rr = [10, 40]
state_mask = None
while(video.isOpened()):
# Capture frame-by-frame
ret, frame = video.read()
if not ret: break
frame = cv2.resize(frame, (256, 256))
# Conseguir señales rbg del tamaño original del frame
(h, w) = frame.shape[:2]
#//////////////////////////////// Revisar posible sobrante
bpm = 0
new_frame_time = time.time()
dt = new_frame_time - prev_frame_time
fps_real_time = 1/(dt)
prev_frame_time = new_frame_time
if len(timestamps) == 0:
timestamps.append(0)
else:
timestamps.append(timestamps[-1] + dt)
real_fps.value = 'FPS : ' + str(int(fps_real_time))
#//////////////////////////////////////
rPPG1_roi1 = []
rPPG1_roi2 = []
blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0))
face_detector.setInput(blob)
detections = face_detector.forward()
for i in range(0, detections.shape[2]): #Caras encontradas iteración
confidence = detections[0, 0, i, 2]
if confidence > 0.5:
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = np.round(box).astype('int')
face = dlib.rectangle(startX, startY, endX, endY)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
shape = shape_predictor(gray, face)
prev_face = {"x1": face.left(), "y1": face.top(), "x2": face.right(), "y2": face.bottom()}
if state_mask == None:
face_frame = frame[startY:endY, startX:endX]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_BGR2RGB)
face_frame = cv2.resize(face_frame, (224, 224))
face_frame = img_to_array(face_frame)
face_frame = preprocess_input(face_frame)
face_frame = np.expand_dims(face_frame, axis=0)
preds = model.predict(face_frame)
state_mask = "with_mask" if preds[0].argmax(axis=-1) == 0 else "without_mask"
if state_mask == "with_mask":
#with mask
prev_roi1 = {
"x1": shape.part(20-1).x, "y1": shape.part(20-1).y - abs(shape.part(29-1).y - shape.part(31-1).y),
"x2": shape.part(22-1).x, "y2": shape.part(20-1).y
}
prev_roi2 = {
"x1": shape.part(23-1).x, "y1": shape.part(25-1).y - abs(shape.part(29-1).y - shape.part(31-1).y),
"x2": shape.part(25-1).x, "y2": shape.part(25-1).y
}
avg_width = round((abs(prev_roi1["x1"] - prev_roi1["x2"]) + abs(prev_roi2["x1"] - prev_roi2["x2"])) / 2)
avg_height = round((abs(prev_roi1["y1"] - prev_roi1["y2"]) + abs(prev_roi2["y1"] - prev_roi2["y2"])) / 2)
prev_roi1 = {
"x1": shape.part(20-1).x, "y1": prev_roi1["y1"],
"x2": prev_roi1["x1"] + avg_width, "y2": prev_roi1["y1"] + avg_height
}
prev_roi2 = {
"x1": shape.part(23-1).x, "y1": prev_roi2["y1"],
"x2": prev_roi2["x1"] + avg_width, "y2": prev_roi2["y1"] + avg_height
}
else:
#without mask
# Que tome las 5 regiones de interes
prev_roi1 = {
"x1": shape.part(37-1).x, "y1": shape.part(29-1).y,
"x2": shape.part(40-1).x, "y2": shape.part(31-1).y
}
prev_roi2 = {
"x1": shape.part(43-1).x, "y1": shape.part(29-1).y,
"x2": shape.part(46-1).x, "y2": shape.part(31-1).y
}
avg_width = round((abs(prev_roi1["x1"] - prev_roi1["x2"]) + abs(prev_roi2["x1"] - prev_roi2["x2"])) / 2)
avg_height = round((abs(prev_roi1["y1"] - prev_roi1["y2"]) + abs(prev_roi2["y1"] - prev_roi2["y2"])) / 2)
prev_roi1 = {
"x1": shape.part(37-1).x, "y1": shape.part(29-1).y,
"x2": prev_roi1["x1"] + avg_width, "y2": prev_roi1["y1"] + avg_height
}
prev_roi2 = {
"x1": shape.part(43-1).x, "y1": shape.part(29-1).y,
"x2": prev_roi2["x1"] + avg_width, "y2": prev_roi2["y1"] + avg_height
}
### Skin
###
#print(prev_roi1)
roi1 = frame[prev_roi1["y1"]:prev_roi1["y2"], prev_roi1["x1"]:prev_roi1["x2"]]
roi2 = frame[prev_roi2["y1"]:prev_roi2["y2"], prev_roi2["x1"]:prev_roi2["x2"]]
b_roi1, g_roi1, r_roi1 = cv2.split(roi1)
r_mean_roi1 = np.mean(r_roi1)
g_mean_roi1 = np.mean(g_roi1)
b_mean_roi1 = np.mean(b_roi1)
ppg_roi1 = [r_mean_roi1, g_mean_roi1, b_mean_roi1]
###
for i, col in enumerate(ppg_roi1):
if math.isnan(col):
ppg_roi1[i] = 0
rPPG_roi1.append(ppg_roi1)
b_roi2, g_roi2, r_roi2 = cv2.split(roi2)
r_mean_roi2 = np.mean(r_roi2)
g_mean_roi2 = np.mean(g_roi2)
b_mean_roi2 = np.mean(b_roi2)
ppg_roi2 = [r_mean_roi2, g_mean_roi2, b_mean_roi2]
###
#Eliminado
#//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for i, col in enumerate(ppg_roi2):
if math.isnan(col):
ppg_roi2[i] = 0
rPPG_roi2.append(ppg_roi2)
rPPG1_roi1 = np.transpose(rPPG_roi1)
rPPG1_roi2 = np.transpose(rPPG_roi2)
if rPPG1_roi1.shape[1] > 10 and rPPG1_roi2.shape[1] > 10:
if rPPG1_roi1.shape[1] < 70 and rPPG1_roi2.shape[1] < 70:
#heart_rate = np.zeros(int(fftlength/2) + 1)
#respiratory_rate = np.zeros(int(fftlength/2) + 1)
heart_rate = 0
respiratory_rate = 0
spo2 = 0
else:
if rPPG1_roi1.shape[1] == rPPG1_roi2.shape[1] and rPPG1_roi1.shape[1] % 7 == 0 and rPPG1_roi1.shape[1] % 10 == 0:
fftlength = rPPG1_roi1.shape[1]
#f = np.linspace(0, fps_original/2, int(fftlength/2) + 1) * 60
fft_roi = range(int(fftlength/2) + 1) # We only care about this part of the fft because it is symmetric anyway
bpf_div= 60 * fps_original / 2
#skin_vec = [1, 0.66667, 0.5]
skin_vec = [0.7682, 0.5121, 0.3841]
values_by_roi = {"X_chrom": [], "Y_chrom": [], "hr": [], "snr": [], "psd": []}
for rPPG_roi in [rPPG1_roi1, rPPG1_roi2]:
if settings.use_resampling :
t = np.arange(0, timestamps[-1], 1/fps_original)
rPPG_resampled= np.zeros((3, t.shape[0]))
for col in [0, 1, 2]:
rPPG_resampled[col] = np.interp(t, timestamps, rPPG_roi[col])
rPPG_roi = rPPG_resampled
col_c = np.zeros((3, fftlength))
for col in [R, G, B]:
col_stride = rPPG_roi[col, -fftlength:]# select last samples
y_ACDC = signal.detrend(col_stride / np.mean(col_stride))
col_c[col] = y_ACDC * skin_vec[col]
X_chrom = col_c[R] - col_c[G]
Y_chrom = col_c[R] + col_c[G] - 2 * col_c[B]
coef_vector_hr = signal.butter(5, ([band_hr[0]/bpf_div, band_hr[1]/bpf_div]), 'bandpass')
Xf_hr = signal.filtfilt(*coef_vector_hr, X_chrom) # Applies band pass filter
Yf_hr = signal.filtfilt(*coef_vector_hr, Y_chrom)
alpha_CHROM_hr = np.std(Xf_hr) / np.std(Yf_hr)
x_stride_method_hr = Xf_hr - alpha_CHROM_hr * Yf_hr
f_hr, psd_hr = signal.welch(x_stride_method_hr, fs=fps_original, nperseg=(fftlength / 2 + 1))
heart_rate = int(f_hr[np.argmax(psd_hr)] * 60)
#STFT_hr = np.fft.fft(x_stride_method_hr, fftlength)[fft_roi]
#heart_rate = np.abs(STFT_hr)/np.max(np.abs(STFT_hr))
#snr = signaltonoise(heart_rate)
snr = signaltonoise(x_stride_method_hr)
values_by_roi["X_chrom"].append(X_chrom)
values_by_roi["Y_chrom"].append(Y_chrom)
values_by_roi["hr"].append(heart_rate)
values_by_roi["snr"].append(snr)
values_by_roi["psd"].append(np.mean(psd_hr))
X_chrom = values_by_roi["X_chrom"][np.argmax(values_by_roi["snr"])]
Y_chrom = values_by_roi["Y_chrom"][np.argmax(values_by_roi["snr"])]
heart_rate = values_by_roi["hr"][np.argmax(values_by_roi["snr"])]
coef_vector_rr = signal.butter(5, ([band_rr[0]/bpf_div, band_rr[1]/bpf_div]), 'bandpass')
Xf_rr = signal.filtfilt(*coef_vector_rr, X_chrom) # Applies band pass filter
Yf_rr = signal.filtfilt(*coef_vector_rr, Y_chrom)
alpha_CHROM_rr = np.std(Xf_rr) / np.std(Yf_rr)
x_stride_method_rr = Xf_rr - alpha_CHROM_rr * Yf_rr
f_rr, psd_rr = signal.welch(x_stride_method_rr, fs=fps_original, nperseg=(fftlength / 2 + 1))
respiratory_rate = int(f_rr[np.argmax(psd_rr)] * 60)
#STFT_rr = np.fft.fft(x_stride_method_rr, fftlength)[fft_roi]
#respiratory_rate = np.abs(STFT_rr)/np.max(np.abs(STFT_rr))
if np.argmax(values_by_roi["psd"]) == 0:
red_values = rPPG1_roi1[0, -fftlength:]
blue_values = rPPG1_roi1[2, -fftlength:]
else:
red_values = rPPG1_roi2[0, -fftlength:]
blue_values = rPPG1_roi2[2, -fftlength:]
ac_red = np.std(red_values)
ac_blue = np.std(blue_values)
dc_red = np.mean(red_values)
dc_blue = np.mean(blue_values)
spo2 = 125 - 26 * ((ac_red / dc_red) / (ac_blue / dc_blue))
#bpm = f[np.argmax(heart_rate)]
bpm = min(max(heart_rate, band_hr[0]), band_hr[1])
frequency_pr.value = 'Heart rate = ' + str(round(bpm)) + ' BPM'
#rpm = f[np.argmax(respiratory_rate)]
rpm = min(max(respiratory_rate, band_rr[0]), band_rr[1])
frequency_rr.value = 'Respiration rate = ' + str(round(rpm)) + ' RPM'
spo2 = min(max(spo2, 0), 100)
spo2_label.value = 'SPO2 = ' + str(int(round(spo2))) + ' %'
'''if rPPG1_roi1.shape[1] == fftlength + 1:
print('values_by_roi["snr"] =', values_by_roi["snr"])
print('np.argmax(values_by_roi["snr"]) =', np.argmax(values_by_roi["snr"]))
print('values_by_roi["psd"] =', values_by_roi["psd"])
print('np.argmax(values_by_roi["psd"]) =', np.argmax(values_by_roi["psd"]))
#print('values_by_roi["hr"] =', values_by_roi["hr"])
#print('len(heart_rate) =', len(heart_rate))
#print('len(respiratory_rate) =', len(respiratory_rate))
#print('np.argmax(heart_rate) =', np.argmax(heart_rate))
#print('np.argmax(respiratory_rate) =', np.argmax(respiratory_rate))
#print('time in seconds =', str(timestamps[-1]))
#print('bpm =', bpm)
#print('rpm =', rpm)'''
if rPPG1_roi1.shape[1] == rPPG1_roi2.shape[1] and rPPG1_roi1.shape[1] % (fps_original * 60) == 0:
print("//---------------------")
print('minute = ', str(rPPG1_roi1.shape[1] / fps_original / 60))
print('time in seconds =', str(round(timestamps[-1])))
print('bpm =', bpm)
print('rpm =', rpm)
print('spo2 =', spo2)
print("//---------------------")
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
cv2.rectangle(frame, (prev_face["x1"], prev_face["y1"]), (prev_face["x2"], prev_face["y2"]), (0, 255, 0), 2)
cv2.rectangle(frame, (prev_roi1["x1"], prev_roi1["y1"]), (prev_roi1["x2"], prev_roi1["y2"]), (0, 0, 255), 2)
cv2.rectangle(frame, (prev_roi2["x1"], prev_roi2["y1"]), (prev_roi2["x2"], prev_roi2["y2"]), (0, 0, 255), 2)
im_out = PIL.Image.fromarray(frame)
#im_out = im_out.resize((256, 256))
output_frame = io.BytesIO()
im_out.save(output_frame, format='png')
output_video.value = output_frame.getvalue()
if cv2.waitKey(1) & 0xFF == ord('q'):
break
(No se puede continuar esta discusión porque tiene más de dos meses de antigüedad. Si tienes dudas parecidas, abre un nuevo hilo.)