-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
81 lines (63 loc) · 2.03 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import cv2
import numpy as np
import imutils
from bot import bot
from datetime import datetime
from threading import Thread
import os
from dotenv import load_dotenv
# Load env var
load_dotenv()
CAM_URL = os.getenv('CAM_URL')
CHAT_ID = int(os.getenv('CHAT_ID'))
# Global var between threads
xml = './haarcascade_frontalcatface.xml'
filename = 'image.jpg'
dt = datetime.today()
q_th = []
def detect_cat():
# IP Cam
cap = cv2.VideoCapture(CAM_URL)
# Get image classifier
catFaceCascade = cv2.CascadeClassifier(xml)
# Send start message
bot.send_message(text=f'Cataclysm Started {dt}', chat_id=CHAT_ID)
while True:
_, frame = cap.read()
if frame is not None:
frame = imutils.resize(frame, width=680)
faces = catFaceCascade.detectMultiScale(frame)
# Number of lines of the matrix corresponds to number of faces detected
if len(faces) == 0:
print("No cat faces found")
else:
print(f"Number of cat faces detected: {faces.shape[0]}")
# Draw faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0))
# Push to queue
if len(q_th) == 0:
q_th.append(frame)
cv2.imshow('Frame', frame)
# Exit
q = cv2.waitKey(1)
if q == ord('q'):
bot.send_message(text=f'Cataclysm Terminated {dt}', chat_id=CHAT_ID)
cv2.destroyAllWindows()
os._exit(1)
def send_alert():
while True:
if len(q_th) != 0:
cv2.imwrite(filename, q_th[0])
bot.send_message(text=f'CAT ALERT {dt}', chat_id=CHAT_ID)
bot.send_photo(photo=open(filename, 'rb'), chat_id=CHAT_ID)
q_th.clear()
if __name__ == "__main__":
read_th = Thread(target=detect_cat)
write_th = Thread(target=send_alert)
# Start
read_th.start()
write_th.start()
# Wait for completion
read_th.join()
write_th.join()