-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfoscam_control.py
97 lines (80 loc) · 3.35 KB
/
foscam_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python
import configparser
import logging
import sys
import time
import pika
import requests
logging.basicConfig(format='%(asctime)s %(message)s', stream=sys.stdout)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Camera = namedtuple('Camera', ['ip', 'port', 'username', 'password'])
class Camera(object):
def __init__(self, name, ip, port, username, password, **kwargs):
self.name = name
self.password = password
self.username = username
self.port = port
self.ip = ip
def __str__(self):
return self.name
def api_call(self, cmd, **extra):
params = {'usr': self.username,
'pwd': self.password,
'cmd': cmd}
params = {**params, **extra}
url = f'https://{self.ip}:{self.port}/cgi-bin/CGIProxy.fcgi'
logger.debug('Performing request to url %s with params %r', url, params)
requests.get(url, params=params, verify=False)
def perform_move(self, direction: str, duration):
cmd = f'ptzMove{direction.title()}'
self.api_call(cmd)
time.sleep(duration)
self.api_call('ptzStopRun')
def goto_scene(self, scene):
self.api_call('ptzGotoPresetPoint', name=scene)
class FoscamControl(object):
def __init__(self, config_file):
self.config_file = config_file
self.channel = None
self.config = configparser.ConfigParser()
def run(self):
self.config.read(self.config_file)
self.connect()
self.channel.basic_consume(queue='foscammove', auto_ack=True, on_message_callback=self.callback)
logger.info('Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming()
def connect(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(self.config['default']['broker_address']))
self.channel: pika.adapters.blocking_connection.BlockingChannel = connection.channel()
self.channel.queue_declare('foscammove')
self.channel.queue_bind('foscammove', 'amq.topic', 'foscammove.#')
def callback(self, ch: pika.adapters.blocking_connection.BlockingChannel, method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties, body: bytes):
camera = self.get_camera(method.routing_key)
if camera is None:
logger.warning('Could not locate camera config for [%s]', method.routing_key)
return
action = body.decode('utf-8')
logger.info('Got action [%s] for camera [%s]', action, camera)
try:
if action.startswith('move_'):
direction = action[5:]
if direction in ('up', 'down'):
duration = 0.5
else:
duration = 1.5
camera.perform_move(direction, duration)
if action.startswith('scene_'):
scene = action[6:]
camera.goto_scene(scene)
except Exception as e:
logger.exception('Error performing action', exc_info=e)
def get_camera(self, routing_key: str):
camera_name = routing_key.rsplit('.', 1)[-1]
if not self.config.has_section(camera_name):
return None
return Camera(camera_name, **self.config[camera_name])
if __name__ == '__main__':
control = FoscamControl(sys.argv[1])
control.run()