KPRSpaceAPI/app.py
2025-10-19 23:14:50 -06:00

188 lines
6.5 KiB
Python

import os
import threading
import time
from datetime import datetime, timezone
import logging
from flask import Flask, jsonify, send_file, render_template
import cv2
from spacer import get_latest_screenshot, process_image, get_brightness
# Configuration
BASE_DIR = os.path.dirname(__file__)
SCREENSHOT_DIR = os.environ.get('MOTION_DIR', os.path.join(BASE_DIR, 'screenshots'))
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
MIN_MOTION_CONTOUR_AREA = int(os.environ.get('MIN_MOTION_AREA', 4000))
CAPTURE_DEVICE = int(os.environ.get('CAPTURE_DEVICE', 0))
# default one snapshot every 10 minutes (600 seconds). Override with SNAPSHOT_INTERVAL env var.
SNAPSHOT_INTERVAL_SECONDS = int(os.environ.get('SNAPSHOT_INTERVAL', 600))
# Brightness threshold (0-255) used to decide open/closed state from a photo
BRIGHTNESS_THRESHOLD = int(os.environ.get('BRIGHTNESS_THRESHOLD', 50))
app = Flask(__name__)
# ensure logger prints info messages
app.logger.setLevel(logging.INFO)
state = {
'last_image': None,
'last_status': 'unknown',
'last_brightness': None,
'running': False,
'monitor_thread': None,
}
state_lock = threading.Lock()
def save_frame(frame):
# use timezone-aware UTC datetime to avoid DeprecationWarning
ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
filename = f'shot_{ts}.jpg'
path = os.path.join(SCREENSHOT_DIR, filename)
cv2.imwrite(path, frame)
app.logger.info('Saved snapshot: %s', path)
return path
def motion_monitor():
cap = cv2.VideoCapture(CAPTURE_DEVICE)
if not cap.isOpened():
app.logger.error('Could not open video device %s', CAPTURE_DEVICE)
with state_lock:
state['running'] = False
return
app.logger.info('Periodic snapshot monitor started on device %s', CAPTURE_DEVICE)
last_snapshot = 0
try:
while True:
with state_lock:
if not state.get('running', False):
break
ret, frame = cap.read()
if not ret:
time.sleep(0.1)
continue
current_time = time.time()
if current_time - last_snapshot >= SNAPSHOT_INTERVAL_SECONDS:
path = save_frame(frame)
brightness = get_brightness(path)
if brightness is not None:
status = 'open' if brightness >= BRIGHTNESS_THRESHOLD else 'closed'
else:
status = process_image(path)
with state_lock:
state['last_image'] = path
state['last_status'] = status
state['last_brightness'] = brightness
app.logger.info(
'Periodic snapshot: %s brightness=%.1f status=%s',
path, brightness if brightness is not None else -1.0, status
)
last_snapshot = current_time
time.sleep(0.2)
finally:
cap.release()
app.logger.info('Periodic snapshot monitor stopped')
with state_lock:
state['running'] = False
state['monitor_thread'] = None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/status')
def api_status():
with state_lock:
last_image = state['last_image'] or get_latest_screenshot(SCREENSHOT_DIR)
# prefer a recent brightness-derived status if available, otherwise fallback
last_status = state.get('last_status') if state.get('last_status') != 'unknown' else process_image(last_image)
last_brightness = state.get('last_brightness')
# if we don't have brightness yet attempt to compute it for the latest image
if last_brightness is None and last_image:
last_brightness = get_brightness(last_image)
if last_brightness is not None:
last_status = 'open' if last_brightness >= BRIGHTNESS_THRESHOLD else 'closed'
image_url = '/api/screenshot' if last_image else None
return jsonify({
'status': last_status,
'brightness': last_brightness,
'image': image_url,
'screenshot_dir': SCREENSHOT_DIR,
'brightness_threshold': BRIGHTNESS_THRESHOLD
})
@app.route('/api/screenshot')
def api_screenshot():
path = get_latest_screenshot(SCREENSHOT_DIR)
if not path:
return jsonify({'error': 'no screenshot available'}), 404
return send_file(path, mimetype='image/jpeg')
@app.route('/api/start', methods=['POST'])
def api_start():
with state_lock:
# if a monitor thread exists and is alive, report running
thr = state.get('monitor_thread')
if thr is not None and thr.is_alive():
state['running'] = True
return jsonify({'running': True}), 200
# start a new monitor thread
state['running'] = True
t = threading.Thread(target=motion_monitor, daemon=True)
state['monitor_thread'] = t
t.start()
return jsonify({'running': True}), 200
@app.route('/api/stop', methods=['POST'])
def api_stop():
# request monitor to stop and join the thread briefly
thr = None
with state_lock:
state['running'] = False
thr = state.get('monitor_thread')
if thr is not None:
# give the thread a short time to exit
thr.join(timeout=3.0)
with state_lock:
if state.get('monitor_thread') is thr:
state['monitor_thread'] = None
return jsonify({'running': False}), 200
if __name__ == '__main__':
# Console (no-web) mode: run monitor in foreground and print status
CONSOLE_MODE = os.environ.get('CONSOLE_MODE', 'false').lower() in ('1', 'true', 'yes')
if CONSOLE_MODE:
print('Starting in CONSOLE_MODE — monitor will run in foreground. Press Ctrl+C to stop.')
with state_lock:
state['running'] = True
try:
# run monitor in the main thread (blocking) so logs appear live in console
motion_monitor()
except KeyboardInterrupt:
print('Interrupted, stopping monitor...')
with state_lock:
state['running'] = False
finally:
print('Monitor stopped')
else:
# Start monitor only if AUTO_START is set (web mode)
AUTO_START = os.environ.get('AUTO_START', 'false').lower() in ('1', 'true', 'yes')
if AUTO_START:
with state_lock:
state['running'] = True
t = threading.Thread(target=motion_monitor, daemon=True)
state['monitor_thread'] = t
t.start()
# run the Flask webserver
app.run(host='0.0.0.0', port=5000)