import os import threading import time from datetime import datetime, timezone import logging from flask import Flask, jsonify, send_file, render_template import cv2 from spacer import get_latest_screenshot, process_image, get_brightness # Configuration BASE_DIR = os.path.dirname(__file__) SCREENSHOT_DIR = os.environ.get('MOTION_DIR', os.path.join(BASE_DIR, 'screenshots')) os.makedirs(SCREENSHOT_DIR, exist_ok=True) MIN_MOTION_CONTOUR_AREA = int(os.environ.get('MIN_MOTION_AREA', 4000)) CAPTURE_DEVICE = int(os.environ.get('CAPTURE_DEVICE', 0)) # default one snapshot every 10 minutes (600 seconds). Override with SNAPSHOT_INTERVAL env var. SNAPSHOT_INTERVAL_SECONDS = int(os.environ.get('SNAPSHOT_INTERVAL', 600)) # Brightness threshold (0-255) used to decide open/closed state from a photo BRIGHTNESS_THRESHOLD = int(os.environ.get('BRIGHTNESS_THRESHOLD', 50)) app = Flask(__name__) # ensure logger prints info messages app.logger.setLevel(logging.INFO) state = { 'last_image': None, 'last_status': 'unknown', 'last_brightness': None, 'running': False, 'monitor_thread': None, } state_lock = threading.Lock() def save_frame(frame): # use timezone-aware UTC datetime to avoid DeprecationWarning ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') filename = f'shot_{ts}.jpg' path = os.path.join(SCREENSHOT_DIR, filename) cv2.imwrite(path, frame) app.logger.info('Saved snapshot: %s', path) return path def motion_monitor(): cap = cv2.VideoCapture(CAPTURE_DEVICE) if not cap.isOpened(): app.logger.error('Could not open video device %s', CAPTURE_DEVICE) with state_lock: state['running'] = False return app.logger.info('Periodic snapshot monitor started on device %s', CAPTURE_DEVICE) last_snapshot = 0 try: while True: with state_lock: if not state.get('running', False): break ret, frame = cap.read() if not ret: time.sleep(0.1) continue current_time = time.time() if current_time - last_snapshot >= SNAPSHOT_INTERVAL_SECONDS: path = save_frame(frame) brightness = get_brightness(path) if brightness is not None: status = 'open' if brightness >= BRIGHTNESS_THRESHOLD else 'closed' else: status = process_image(path) with state_lock: state['last_image'] = path state['last_status'] = status state['last_brightness'] = brightness app.logger.info( 'Periodic snapshot: %s brightness=%.1f status=%s', path, brightness if brightness is not None else -1.0, status ) last_snapshot = current_time time.sleep(0.2) finally: cap.release() app.logger.info('Periodic snapshot monitor stopped') with state_lock: state['running'] = False state['monitor_thread'] = None @app.route('/') def index(): return render_template('index.html') @app.route('/api/status') def api_status(): with state_lock: last_image = state['last_image'] or get_latest_screenshot(SCREENSHOT_DIR) # prefer a recent brightness-derived status if available, otherwise fallback last_status = state.get('last_status') if state.get('last_status') != 'unknown' else process_image(last_image) last_brightness = state.get('last_brightness') # if we don't have brightness yet attempt to compute it for the latest image if last_brightness is None and last_image: last_brightness = get_brightness(last_image) if last_brightness is not None: last_status = 'open' if last_brightness >= BRIGHTNESS_THRESHOLD else 'closed' image_url = '/api/screenshot' if last_image else None return jsonify({ 'status': last_status, 'brightness': last_brightness, 'image': image_url, 'screenshot_dir': SCREENSHOT_DIR, 'brightness_threshold': BRIGHTNESS_THRESHOLD }) @app.route('/api/screenshot') def api_screenshot(): path = get_latest_screenshot(SCREENSHOT_DIR) if not path: return jsonify({'error': 'no screenshot available'}), 404 return send_file(path, mimetype='image/jpeg') @app.route('/api/start', methods=['POST']) def api_start(): with state_lock: # if a monitor thread exists and is alive, report running thr = state.get('monitor_thread') if thr is not None and thr.is_alive(): state['running'] = True return jsonify({'running': True}), 200 # start a new monitor thread state['running'] = True t = threading.Thread(target=motion_monitor, daemon=True) state['monitor_thread'] = t t.start() return jsonify({'running': True}), 200 @app.route('/api/stop', methods=['POST']) def api_stop(): # request monitor to stop and join the thread briefly thr = None with state_lock: state['running'] = False thr = state.get('monitor_thread') if thr is not None: # give the thread a short time to exit thr.join(timeout=3.0) with state_lock: if state.get('monitor_thread') is thr: state['monitor_thread'] = None return jsonify({'running': False}), 200 if __name__ == '__main__': # Console (no-web) mode: run monitor in foreground and print status CONSOLE_MODE = os.environ.get('CONSOLE_MODE', 'false').lower() in ('1', 'true', 'yes') if CONSOLE_MODE: print('Starting in CONSOLE_MODE — monitor will run in foreground. Press Ctrl+C to stop.') with state_lock: state['running'] = True try: # run monitor in the main thread (blocking) so logs appear live in console motion_monitor() except KeyboardInterrupt: print('Interrupted, stopping monitor...') with state_lock: state['running'] = False finally: print('Monitor stopped') else: # Start monitor only if AUTO_START is set (web mode) AUTO_START = os.environ.get('AUTO_START', 'false').lower() in ('1', 'true', 'yes') if AUTO_START: with state_lock: state['running'] = True t = threading.Thread(target=motion_monitor, daemon=True) state['monitor_thread'] = t t.start() # run the Flask webserver app.run(host='0.0.0.0', port=5000)