1
0
forked from KPR/KPRSpaceAPI

Merge pull request 'Runing' (#1) from KPR/KPRSpaceAPI:LuzLobo2 into main

Reviewed-on: #1
This commit is contained in:
Cwolf 2025-10-20 05:21:30 +00:00
commit 1dcbf09664
7 changed files with 566 additions and 27 deletions

53
.gitignore vendored Normal file
View File

@ -0,0 +1,53 @@
# Python
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
venv/
.venv/
ENV/
env/
# Packaging
build/
dist/
*.egg-info/
# Logs and runtime files
*.log
logs/
run_web.pid
# Screenshots and generated images
screenshots/
# OS files
.DS_Store
Thumbs.db
# IDE/editor
.vscode/
.idea/
# Environment files
.env
.env.*
# Cache and test
.pytest_cache/
htmlcov/
.coverage
# SQLite DBs
*.sqlite3
*.db
# pip wheel cache
pip-wheel-metadata/
# Ignore local secrets or config
secrets.yaml
# Node
node_modules/

109
api_client.py Normal file
View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Simple CLI to call the running Space API (no external deps).
Usage:
python api_client.py status [--url URL]
python api_client.py start [--url URL]
python api_client.py stop [--url URL]
python api_client.py screenshot [--url URL] [--out FILE]
Defaults URL: http://localhost:5000
"""
import sys
import argparse
import json
from urllib import request, error, parse
DEFAULT_URL = 'http://localhost:5000'
def get(url):
try:
with request.urlopen(url, timeout=5) as resp:
return resp.read(), resp.getcode(), resp.info().get_content_type()
except error.HTTPError as e:
return e.read(), e.code, None
except Exception as e:
print('Request error:', e)
return None, None, None
def post(url):
req = request.Request(url, method='POST')
try:
with request.urlopen(req, timeout=5) as resp:
return resp.read(), resp.getcode()
except error.HTTPError as e:
return e.read(), e.code
except Exception as e:
print('Request error:', e)
return None, None
def cmd_status(base_url):
data, code, ctype = get(f"{base_url.rstrip('/')}/api/status")
if data is None:
print('No response')
return 2
try:
j = json.loads(data.decode())
print(json.dumps(j, indent=2))
except Exception:
print(data.decode(errors='ignore'))
return 0
def cmd_start(base_url):
data, code = post(f"{base_url.rstrip('/')}/api/start")
print('HTTP', code)
if data:
print(data.decode(errors='ignore'))
return 0
def cmd_stop(base_url):
data, code = post(f"{base_url.rstrip('/')}/api/stop")
print('HTTP', code)
if data:
print(data.decode(errors='ignore'))
return 0
def cmd_screenshot(base_url, out):
data, code, ctype = get(f"{base_url.rstrip('/')}/api/screenshot")
if data is None or code != 200:
print('Failed to get screenshot, HTTP', code)
if data:
print(data.decode(errors='ignore'))
return 2
with open(out, 'wb') as f:
f.write(data)
print('Saved screenshot to', out)
return 0
def main(argv):
p = argparse.ArgumentParser()
p.add_argument('--url', '-u', default=DEFAULT_URL, help='Base URL of the API')
sub = p.add_subparsers(dest='cmd')
sub.add_parser('status')
sub.add_parser('start')
sub.add_parser('stop')
ss = sub.add_parser('screenshot')
ss.add_argument('--out', '-o', default='latest.jpg')
args = p.parse_args(argv)
if not args.cmd:
p.print_help()
return 1
if args.cmd == 'status':
return cmd_status(args.url)
if args.cmd == 'start':
return cmd_start(args.url)
if args.cmd == 'stop':
return cmd_stop(args.url)
if args.cmd == 'screenshot':
return cmd_screenshot(args.url, args.out)
return 0
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

187
app.py Normal file
View File

@ -0,0 +1,187 @@
import os
import threading
import time
from datetime import datetime, timezone
import logging
from flask import Flask, jsonify, send_file, render_template
import cv2
from spacer import get_latest_screenshot, process_image, get_brightness
# Configuration
BASE_DIR = os.path.dirname(__file__)
SCREENSHOT_DIR = os.environ.get('MOTION_DIR', os.path.join(BASE_DIR, 'screenshots'))
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
MIN_MOTION_CONTOUR_AREA = int(os.environ.get('MIN_MOTION_AREA', 4000))
CAPTURE_DEVICE = int(os.environ.get('CAPTURE_DEVICE', 0))
# default one snapshot every 10 minutes (600 seconds). Override with SNAPSHOT_INTERVAL env var.
SNAPSHOT_INTERVAL_SECONDS = int(os.environ.get('SNAPSHOT_INTERVAL', 600))
# Brightness threshold (0-255) used to decide open/closed state from a photo
BRIGHTNESS_THRESHOLD = int(os.environ.get('BRIGHTNESS_THRESHOLD', 50))
app = Flask(__name__)
# ensure logger prints info messages
app.logger.setLevel(logging.INFO)
state = {
'last_image': None,
'last_status': 'unknown',
'last_brightness': None,
'running': False,
'monitor_thread': None,
}
state_lock = threading.Lock()
def save_frame(frame):
# use timezone-aware UTC datetime to avoid DeprecationWarning
ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
filename = f'shot_{ts}.jpg'
path = os.path.join(SCREENSHOT_DIR, filename)
cv2.imwrite(path, frame)
app.logger.info('Saved snapshot: %s', path)
return path
def motion_monitor():
cap = cv2.VideoCapture(CAPTURE_DEVICE)
if not cap.isOpened():
app.logger.error('Could not open video device %s', CAPTURE_DEVICE)
with state_lock:
state['running'] = False
return
app.logger.info('Periodic snapshot monitor started on device %s', CAPTURE_DEVICE)
last_snapshot = 0
try:
while True:
with state_lock:
if not state.get('running', False):
break
ret, frame = cap.read()
if not ret:
time.sleep(0.1)
continue
current_time = time.time()
if current_time - last_snapshot >= SNAPSHOT_INTERVAL_SECONDS:
path = save_frame(frame)
brightness = get_brightness(path)
if brightness is not None:
status = 'open' if brightness >= BRIGHTNESS_THRESHOLD else 'closed'
else:
status = process_image(path)
with state_lock:
state['last_image'] = path
state['last_status'] = status
state['last_brightness'] = brightness
app.logger.info(
'Periodic snapshot: %s brightness=%.1f status=%s',
path, brightness if brightness is not None else -1.0, status
)
last_snapshot = current_time
time.sleep(0.2)
finally:
cap.release()
app.logger.info('Periodic snapshot monitor stopped')
with state_lock:
state['running'] = False
state['monitor_thread'] = None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/status')
def api_status():
with state_lock:
last_image = state['last_image'] or get_latest_screenshot(SCREENSHOT_DIR)
# prefer a recent brightness-derived status if available, otherwise fallback
last_status = state.get('last_status') if state.get('last_status') != 'unknown' else process_image(last_image)
last_brightness = state.get('last_brightness')
# if we don't have brightness yet attempt to compute it for the latest image
if last_brightness is None and last_image:
last_brightness = get_brightness(last_image)
if last_brightness is not None:
last_status = 'open' if last_brightness >= BRIGHTNESS_THRESHOLD else 'closed'
image_url = '/api/screenshot' if last_image else None
return jsonify({
'status': last_status,
'brightness': last_brightness,
'image': image_url,
'screenshot_dir': SCREENSHOT_DIR,
'brightness_threshold': BRIGHTNESS_THRESHOLD
})
@app.route('/api/screenshot')
def api_screenshot():
path = get_latest_screenshot(SCREENSHOT_DIR)
if not path:
return jsonify({'error': 'no screenshot available'}), 404
return send_file(path, mimetype='image/jpeg')
@app.route('/api/start', methods=['POST'])
def api_start():
with state_lock:
# if a monitor thread exists and is alive, report running
thr = state.get('monitor_thread')
if thr is not None and thr.is_alive():
state['running'] = True
return jsonify({'running': True}), 200
# start a new monitor thread
state['running'] = True
t = threading.Thread(target=motion_monitor, daemon=True)
state['monitor_thread'] = t
t.start()
return jsonify({'running': True}), 200
@app.route('/api/stop', methods=['POST'])
def api_stop():
# request monitor to stop and join the thread briefly
thr = None
with state_lock:
state['running'] = False
thr = state.get('monitor_thread')
if thr is not None:
# give the thread a short time to exit
thr.join(timeout=3.0)
with state_lock:
if state.get('monitor_thread') is thr:
state['monitor_thread'] = None
return jsonify({'running': False}), 200
if __name__ == '__main__':
# Console (no-web) mode: run monitor in foreground and print status
CONSOLE_MODE = os.environ.get('CONSOLE_MODE', 'false').lower() in ('1', 'true', 'yes')
if CONSOLE_MODE:
print('Starting in CONSOLE_MODE — monitor will run in foreground. Press Ctrl+C to stop.')
with state_lock:
state['running'] = True
try:
# run monitor in the main thread (blocking) so logs appear live in console
motion_monitor()
except KeyboardInterrupt:
print('Interrupted, stopping monitor...')
with state_lock:
state['running'] = False
finally:
print('Monitor stopped')
else:
# Start monitor only if AUTO_START is set (web mode)
AUTO_START = os.environ.get('AUTO_START', 'false').lower() in ('1', 'true', 'yes')
if AUTO_START:
with state_lock:
state['running'] = True
t = threading.Thread(target=motion_monitor, daemon=True)
state['monitor_thread'] = t
t.start()
# run the Flask webserver
app.run(host='0.0.0.0', port=5000)

78
run_web.fish Executable file
View File

@ -0,0 +1,78 @@
#!/usr/bin/env fish
# Run the app in web mode with recommended env vars (fish shell)
# Usage: ./run_web.fish
# create venv if missing
if not test -d venv
python3 -m venv venv
end
# activate venv
source venv/bin/activate.fish
# ensure dependencies
pip install -r requirements.txt
# recommended env vars (adjust if needed)
set -x SNAPSHOT_INTERVAL 600 # seconds (600 = 10 minutes)
set -x BRIGHTNESS_THRESHOLD 50 # 0-255
set -x AUTO_START false # true to auto-start monitor inside web server
# ensure CONSOLE_MODE is unset so the Flask server will run
set -e CONSOLE_MODE
# logging
set -x LOG_DIR logs
mkdir -p $LOG_DIR
set -x LOG_FILE $LOG_DIR/run_web.log
set -x PID_FILE $LOG_DIR/run_web.pid
# If a previous PID file exists, try to stop that process first
if test -f $PID_FILE
set OLD_PID (cat $PID_FILE)
if test -n "$OLD_PID"
if ps -p $OLD_PID > /dev/null 2>&1
echo "Stopping previous process pid $OLD_PID"
kill $OLD_PID 2>/dev/null || kill -9 $OLD_PID 2>/dev/null
sleep 1
end
rm -f $PID_FILE
end
end
# Also check for any process listening on port 5000 and kill it to free the port
if type lsof >/dev/null 2>&1
set PORT_PIDS (lsof -ti:5000 2>/dev/null)
if test -n "$PORT_PIDS"
echo "Killing processes on port 5000: $PORT_PIDS"
for p in $PORT_PIDS
kill $p 2>/dev/null || kill -9 $p 2>/dev/null
end
end
else
# fallback using ss and awk if lsof not available
if type ss >/dev/null 2>&1
for line in (ss -ltnp 2>/dev/null | grep ':5000' | awk '{print $NF}' | sed -n 's/.*pid=\([0-9]*\).*/\1/p')
if test -n "$line"
echo "Killing process pid $line listening on port 5000"
kill $line 2>/dev/null || kill -9 $line 2>/dev/null
end
end
end
end
echo "Starting web app in background; logs: $LOG_FILE"
# Use bash to ensure POSIX redirection works reliably
bash -lc "python app.py > '$LOG_FILE' 2>&1 & echo \$! > '$PID_FILE'"
# wait briefly for process to start
sleep 1
if test -f $PID_FILE
set PID (cat $PID_FILE)
echo "Started pid: $PID"
else
echo "Failed to start process; see $LOG_FILE for details"
end
# tail the log so the user can see live output (Ctrl+C to stop tail but process keeps running)
echo "Tailing log (press Ctrl+C to stop tail, process will continue running in background)."
tail -n 200 -f $LOG_FILE

View File

@ -1,35 +1,53 @@
import os import os
from PIL import Image
import glob import glob
from PIL import Image
def get_latest_screenshot(directory, ext='jpg'):
"""Return the latest file path with the given extension in directory or None if none found."""
if not directory:
return None
if not os.path.isdir(directory):
return None
pattern = os.path.join(directory, f'*.{ext}')
files = glob.glob(pattern)
if not files:
return None
return max(files, key=os.path.getctime)
def get_latest_screenshot(directory):
""" Get the latest screenshot file from the given directory. """
list_of_files = glob.glob(os.path.join(directory, '*.jpg')) # You can change the file extension if needed
latest_file = max(list_of_files, key=os.path.getctime)
return latest_file
def process_image(image_path): def process_image(image_path):
""" Process the image to decide if it's dark or light. """ """Return 'open', 'closed' or 'unknown' based on average brightness of an image.
The function is defensive and returns 'unknown' if the path is missing or an error occurs.
"""
if not image_path or not os.path.isfile(image_path):
return 'unknown'
try:
with Image.open(image_path) as img: with Image.open(image_path) as img:
# Resize image to 2x2 img = img.convert('RGB')
img = img.resize((2, 2)) img = img.resize((2, 2))
pixels = list(img.getdata())
if not pixels:
return 'unknown'
avg = [sum(ch) / len(pixels) for ch in zip(*pixels)]
luminance = sum(avg) / 3.0
return 'closed' if luminance < 128 else 'open'
except Exception:
return 'unknown'
# Get average color of the pixels
average_color = sum(img.getdata(), (0, 0, 0))
average_color = [color / 4 for color in average_color] # Since the image is 2x2
# Determine if the image is dark or light def get_brightness(image_path):
if sum(average_color) / 3 < 128: # Average RGB values less than 128 is considered dark """Return average brightness (0-255) as float, or None on error."""
return 'closed' if not image_path or not os.path.isfile(image_path):
else: return None
return 'open' try:
with Image.open(image_path) as img:
# Directory where the screenshots are stored img = img.convert('L')
screenshot_directory = '/var/lib/motion/' img = img.resize((16, 16))
pixels = list(img.getdata())
# Get the latest screenshot if not pixels:
latest_screenshot = get_latest_screenshot(screenshot_directory) return None
return float(sum(pixels) / len(pixels))
# Process the image and determine hackspace status except Exception:
hackspace_status = process_image(latest_screenshot) return None
print(f"The hackspace is {hackspace_status}.")

45
static/style.css Normal file
View File

@ -0,0 +1,45 @@
body {
font-family: Arial, sans-serif;
background: #f4f6f8;
color: #222;
display: flex;
align-items: center;
justify-content: center;
height: 100vh;
margin: 0;
}
.container {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 6px 18px rgba(0,0,0,0.08);
width: 420px;
text-align: center;
}
.status {
font-size: 1.6rem;
padding: 10px;
margin-bottom: 12px;
border-radius: 6px;
color: white;
}
.status.open { background: #28a745; }
.status.closed { background: #dc3545; }
.status.unknown { background: #6c757d; }
img#latest {
max-width: 100%;
border-radius: 6px;
margin-bottom: 12px;
background: #111;
height: auto;
}
.controls button {
margin: 6px;
padding: 8px 14px;
border: none;
border-radius: 6px;
cursor: pointer;
}
.controls button#start { background: #007bff; color: white; }
.controls button#stop { background: #ffc107; color: #222; }
pre#info { text-align: left; background: #f0f0f0; padding: 8px; border-radius: 6px; font-size: 12px; }

49
templates/index.html Normal file
View File

@ -0,0 +1,49 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Hackspace Status</title>
<link rel="stylesheet" href="/static/style.css" />
</head>
<body>
<div class="container">
<h1>Hackspace Status</h1>
<div id="status" class="status unknown">Loading...</div>
<img id="latest" src="" alt="Latest screenshot" />
<div class="controls">
<button id="start">Start Monitor</button>
<button id="stop">Stop Monitor</button>
</div>
<pre id="info"></pre>
</div>
<script>
async function refresh() {
try {
const r = await fetch('/api/status');
const j = await r.json();
const s = document.getElementById('status');
s.textContent = (j.status || 'unknown').toUpperCase();
s.className = 'status ' + (j.status || 'unknown');
const img = document.getElementById('latest');
if (j.image) {
img.src = j.image + '?_=' + Date.now();
} else {
img.src = '';
}
document.getElementById('info').textContent = JSON.stringify(j, null, 2);
} catch (e) {
console.error(e);
}
}
document.getElementById('start').addEventListener('click', async () => {
await fetch('/api/start', {method: 'POST'});
});
document.getElementById('stop').addEventListener('click', async () => {
await fetch('/api/stop', {method: 'POST'});
});
setInterval(refresh, 30000);
refresh();
</script>
</body>
</html>