1
0
forked from orson/bachemap
bachemap/app.py

331 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory
from flask_pymongo import PyMongo, ObjectId
from flask_login import LoginManager, UserMixin, login_user, login_required, current_user, logout_user
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from flask_pymongo import ObjectId
import os
from uuid import uuid4
from flask_wtf import FlaskForm
from wtforms import StringField, FileField, SubmitField, DateTimeField, SelectField, PasswordField
from wtforms.validators import DataRequired, Length
import requests
from config import Config
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="Bachemapa @ baches.qro.mx")
def create_app(config=Config):
app = Flask(__name__)
app.config.from_object(config)
mongo = PyMongo(app)
login_manager = LoginManager(app)
login_manager.login_view = 'thelogin'
login_manager.session_protection = "strong"
class User(UserMixin):
def __init__(self, user_data):
self.id = str(user_data['_id'])
self.username = user_data['username']
self.referral_code = user_data['referral_code']
self.invited_by = user_data.get('invited_by')
self.is_admin = user_data.get('is_admin', False)
self.pwd = user_data.get('pwd')
@staticmethod
def get(user_id):
user_data = mongo.db.users.find_one({"_id": ObjectId(user_id)})
if user_data:
return User(user_data)
else:
return None
class PinForm(FlaskForm):
description = StringField('¿Qué estamos viendo?', validators=[DataRequired()])
photo = FileField('Evidencia fotogénica', validators=[DataRequired()])
timedate = DateTimeField(default=datetime.now())
typeofpin = SelectField('Tipo de cosa', choices=['bache', 'coladera', 'obra sin terminar', 'escombro', 'robo-asalto', 'biciestacionamiento', 'mala iluminación', 'bici blanca', 'zapato blanco'])
submit = SubmitField('Agregar')
class LoginForm(FlaskForm):
username = StringField('Usuario', validators=[DataRequired()])
pwd = PasswordField('Tu clave', validators=[DataRequired()])
submit = SubmitField('Entrar')
def Unique(model, field, message=None):
def _unique(form, field_data):
if mongo.db[model.__name__.lower()].find_one({field.name: field_data.data}):
raise ValidationError(message or f"{field.name} must be unique.")
return _unique
class RegistrationForm(FlaskForm):
username = StringField('Nombre de usuarix', validators=[DataRequired(), Unique('users', StringField('username', message="Este usuario ya existe"))])
pwd = PasswordField('Clave', validators=[DataRequired(), Length(min=10), Unique('users', StringField('pwd', message="Esta clave no es muy buena, escoge otra"))])
referral = StringField('ID de quien te invito', [DataRequired()])
submit = SubmitField('Registrar')
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
form = PinForm()
pins = mongo.db.pins.find()
return render_template('index.html', pins=pins, form=form)
else:
form = request.form
try:
photo = request.files["photo"]
except Exception as e:
print(e)
if photo and allowed_file(photo.filename):
#try:
filename = secure_filename(photo.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
photo.save(filepath)
pin = {
'time': datetime.now(),
'photo': filepath,
'lat': request.form['lat'],
'lng': request.form['lng'],
'typeofpin': request.form['typeofpin'],
'added_by': current_user.id,
'description': request.form['description'],
'address': str(geolocator.reverse(str(request.form['lat'])+ ", "+request.form['lng']))
}
print(geolocator.reverse(str(request.form['lat'])+ ", "+request.form['lng']))
mongo.db.pins.insert_one(pin)
flash('¡Gracias por tu aportación!')
return redirect(url_for('index'))
else:
return allowed_file(photo.filename), 404
@app.route('/quienes')
def quienes():
return render_template('quienes.html')
@app.route('/uploads/<name>')
def download_file(name):
return send_from_directory(app.config["UPLOAD_FOLDER"], name)
@login_manager.user_loader
def load_user(user_id):
return User.get(user_id)
@app.route('/registrame/<referral_code>', methods=['GET', 'POST'])
def registrame(referral_code):
inviter = mongo.db.users.find_one({"referral_code": referral_code})
if not inviter:
message = "Ooops, parece que nadie te invitó aquí... Puedes saber más sobre esta decisión aquí https://baches.qro.mx/quienes#preguntas"
return render_template('error.html', message = message), 406
if request.method == 'POST':
username = request.form['username']
pwd = request.form['pwd']
new_user_data = {
"username": username,
"pwd": generate_password_hash(pwd),
"referral_code": str(uuid4()),
"invited_by": str(inviter['_id']),
"is_admin": False
}
new_user_id = mongo.db.users.insert_one(new_user_data).inserted_id
str_id = str(new_user_id)
mongo.db.users.update_one(
{'_id': new_user_id},
{'$set': {'str_id': str_id}}
)
invite_link = url_for('registrame', referral_code=new_user_data['referral_code'], _external=True)
login_user(load_user(new_user_id))
return redirect(url_for('index'))
#return render_template('dashboard.html', invite_link=invite_link)
if request.method == 'GET':
return render_template('register.html', form=RegistrationForm())
@app.route('/thelogin', methods=['GET', 'POST'])
def thelogin():
form2 = LoginForm()
if request.method == 'POST':
username = request.form['username']
pwd = request.form['pwd']
user_data = mongo.db.users.find_one({"username": username})
if user_data and check_password_hash(user_data['pwd'], pwd):
user = User(user_data)
login_user(user)
return redirect(url_for('index'))
else:
return render_template('login.html', messages = 'Ooops, no hay tal usuario', form=form2)
return render_template('login.html',form=form2)
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Cerraste sesión exitosamente')
return redirect('/')
@app.route('/desheredame/<user_id>')
@login_required
def desheredame(user_id):
if not current_user.is_admin:
return redirect(url_for('index'))
else:
user_to_remove = mongo.db.users.find_one({"_id": ObjectId(user_id)})
if not user_to_remove:
message = "A este ya me lo desheredaron"
return render_template('error.html', message=message), 404
else:
# invited_by es un string con el user id, pero para eliminar a un usuario por id, ocupamos el ObjectId(string del user id)
# encontrando todos los usuarios invitados por el traidor para eliminar todos los pines que pusieron
for user in mongo.db.users.find({'invited_by': user_to_remove.id}):
mongo.db.pins.delete_many({"added_by": user.id})
# luego eliminamos a todos los usuarios cuyos pines acabamos de eliminar
mongo.db.users.delete_many({"invited_by": user_id})
#eliminamos los pines del usuario infractor
mongo.db.pins.delete_many({"added_by": user_to_remove.id})
# finalmente, eliminamos al usuario infractor
mongo.db.users.delete_one({"_id": ObjectId(user_id)})
return redirect(url_for('dashboard'))
@app.route("/remove_pin/<pin_id>")
@login_required
def remove_pin(pin_id):
actual_pin = mongo.db.pins.find_one({"_id": ObjectId(pin_id)})
print(actual_pin)
added_by = actual_pin.get("added_by")
print(added_by)
print(current_user.id)
if current_user.is_admin or current_user.id == added_by:
mongo.db.pins.delete_one({"_id": ObjectId(pin_id)})
return redirect(url_for('dashboard'))
else:
return redirect(url_for('index'))
@app.route('/manifest.json')
def manifest():
return send_from_directory('static', 'manifest.json')
@app.route('/dashboard')
@login_required
def dashboard():
#if request.method == 'GET':
#if current_user.is_authenticated == False:
# return redirect(url_for('thelogin'))
#else:
old_qr = mongo.db.users.find_one({'_id': ObjectId(current_user.id)})
print(old_qr)
invite_code = str(uuid4())
qr_update = mongo.db.users.update_one({'_id': ObjectId(current_user.id)}, {'$set': {'referral_code': invite_code}})
print(invite_code)
if not current_user.is_admin:
pins = list(mongo.db.pins.find({"added_by": current_user.id}))
return render_template('dashboard.html', pins=pins, invite_code=invite_code)
if current_user.is_admin:
users = list(mongo.db.users.find())
pins = list(mongo.db.pins.find())
return render_template('dashboard.html', users=users, pins=pins, invite_code=invite_code)
@app.cli.command('add_user')
def add_user():
username = input("Enter Username:\n")
pwd = input("Add pass:\n")
admin_user = {
"username": username,
"pwd": generate_password_hash(pwd),
"referral_code": str(uuid4()),
"invited_by": None,
"is_admin": True
}
mongo.db.users.insert_one(admin_user)
print(f"Admin {username} added! Referral code is {admin_user['referral_code']}")
@app.route("/camera", methods=["GET", "POST"])
@login_required
def camera():
if request.method == "POST":
# 1. Grab geolocation data from the form
latitude = request.form.get("latitude")
longitude = request.form.get("longitude")
# 2. Grab the photo file
photo = request.files.get("photo")
# 3. Save the file if it exists
if photo:
# Make sure your 'uploads' folder exists or handle dynamically
photo_path = os.path.join(app.config['UPLOAD_FOLDER'], photo.filename)
photo.save(photo_path)
# TODO: Optionally embed latitude/longitude into the photos EXIF here
# using piexif or Pillow
# For demonstration, just redirect or return a success message
return f"Photo uploaded! Lat: {latitude}, Lng: {longitude}"
# If GET request, just render the camera page
return render_template("camera.html")
@app.route('/leaderboard')
def leaderboard():
pipeline = [
{"$group": {"_id": "$added_by", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10},
]
# Convert the aggregation result to a list
leaders_by_id = list(mongo.db.pins.aggregate(pipeline))
# Create a list to store the final results
cleaned_leaders = []
# Process each leader
for leader in leaders_by_id:
user_id = leader["_id"]
count = leader["count"]
# Find the corresponding user
try:
user = mongo.db.users.find_one({"_id": ObjectId(user_id)})
if user and "username" in user:
username = user["username"]
if len(username) >= 2:
cleaned_username = username[0] + "***" + username[-1]
else:
cleaned_username = username
cleaned_leaders.append({"username": cleaned_username, "count": count})
except Exception as e:
print(f"Error processing user_id {user_id}: {e}")
continue
print('THIS IS THE LEADERBOARD', cleaned_leaders)
# Get stats using aggregation pipeline
# Count total users
total_users_result = list(mongo.db.users.aggregate([
{"$count": "total"}
]))
total_users = total_users_result[0]["total"] if total_users_result else 0
# Count unique users who have added pins
active_users_result = list(mongo.db.pins.aggregate([
{"$group": {"_id": None, "active_users": {"$addToSet": "$added_by"}}},
{"$project": {"active_count": {"$size": "$active_users"}}}
]))
active_users = active_users_result[0]["active_count"] if active_users_result else 0
# Calculate percentage (handle case where total_users might be 0)
active_percentage = round((active_users / total_users) * 100, 1) if total_users > 0 else 0
return render_template('leaderboard.html', leaders=cleaned_leaders, percentage = active_percentage)
return app
app=create_app()
if __name__ == '__main__':
app.run(debug=True)