forked from orson/bachemap
305 lines
13 KiB
Python
305 lines
13 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory
|
|
from flask_pymongo import PyMongo, ObjectId
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, current_user, logout_user
|
|
from werkzeug.utils import secure_filename
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime
|
|
from flask_pymongo import ObjectId
|
|
import os
|
|
from uuid import uuid4
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import StringField, FileField, SubmitField, DateTimeField, SelectField, PasswordField
|
|
from wtforms.validators import DataRequired, Length
|
|
import requests
|
|
from config import Config
|
|
from geopy.geocoders import Nominatim
|
|
geolocator = Nominatim(user_agent="Bachemapa @ baches.qro.mx")
|
|
|
|
def create_app(config=Config):
|
|
app = Flask(__name__)
|
|
app.config.from_object(config)
|
|
|
|
mongo = PyMongo(app)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = 'thelogin'
|
|
login_manager.session_protection = "strong"
|
|
|
|
class User(UserMixin):
|
|
def __init__(self, user_data):
|
|
self.id = str(user_data['_id'])
|
|
self.username = user_data['username']
|
|
self.referral_code = user_data['referral_code']
|
|
self.invited_by = user_data.get('invited_by')
|
|
self.is_admin = user_data.get('is_admin', False)
|
|
self.pwd = user_data.get('pwd')
|
|
|
|
@staticmethod
|
|
def get(user_id):
|
|
user_data = mongo.db.users.find_one({"_id": ObjectId(user_id)})
|
|
if user_data:
|
|
return User(user_data)
|
|
else:
|
|
return None
|
|
|
|
class PinForm(FlaskForm):
|
|
description = StringField('¿Qué estamos viendo?', validators=[DataRequired()])
|
|
photo = FileField('Evidencia fotogénica', validators=[DataRequired()])
|
|
timedate = DateTimeField(default=datetime.now())
|
|
typeofpin = SelectField('Tipo de cosa', choices=['bache', 'coladera', 'obra sin terminar', 'escombro', 'robo-asalto', 'biciestacionamiento', 'mala iluminación', 'bici blanca', 'zapato blanco'])
|
|
|
|
submit = SubmitField('Agregar')
|
|
|
|
class LoginForm(FlaskForm):
|
|
username = StringField('Usuario', validators=[DataRequired()])
|
|
pwd = PasswordField('Tu clave', validators=[DataRequired()])
|
|
submit = SubmitField('Entrar')
|
|
|
|
def Unique(model, field, message=None):
|
|
def _unique(form, field_data):
|
|
if mongo.db[model.__name__.lower()].find_one({field.name: field_data.data}):
|
|
raise ValidationError(message or f"{field.name} must be unique.")
|
|
return _unique
|
|
|
|
class RegistrationForm(FlaskForm):
|
|
username = StringField('Nombre de usuarix', validators=[DataRequired(), Unique('users', StringField('username', message="Este usuario ya existe"))])
|
|
pwd = PasswordField('Clave', validators=[DataRequired(), Length(min=10), Unique('users', StringField('pwd', message="Esta clave no es muy buena, escoge otra"))])
|
|
referral = StringField('ID de quien te invito', [DataRequired()])
|
|
submit = SubmitField('Registrar')
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
|
|
|
|
@app.route('/', methods=['GET', 'POST'])
|
|
def index():
|
|
if request.method == 'GET':
|
|
form = PinForm()
|
|
pins = mongo.db.pins.find()
|
|
return render_template('index.html', pins=pins, form=form)
|
|
else:
|
|
form = request.form
|
|
try:
|
|
photo = request.files["photo"]
|
|
except Exception as e:
|
|
print(e)
|
|
if photo and allowed_file(photo.filename):
|
|
#try:
|
|
filename = secure_filename(photo.filename)
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
photo.save(filepath)
|
|
|
|
pin = {
|
|
'time': datetime.now(),
|
|
'photo': filepath,
|
|
'lat': request.form['lat'],
|
|
'lng': request.form['lng'],
|
|
'typeofpin': request.form['typeofpin'],
|
|
'added_by': current_user.id,
|
|
'description': request.form['description'],
|
|
'address': str(geolocator.reverse(str(request.form['lat'])+ ", "+request.form['lng']))
|
|
}
|
|
print(geolocator.reverse(str(request.form['lat'])+ ", "+request.form['lng']))
|
|
mongo.db.pins.insert_one(pin)
|
|
flash('¡Gracias por tu aportación!')
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return allowed_file(photo.filename), 404
|
|
@app.route('/quienes')
|
|
def quienes():
|
|
return render_template('quienes.html')
|
|
|
|
@app.route('/uploads/<name>')
|
|
def download_file(name):
|
|
return send_from_directory(app.config["UPLOAD_FOLDER"], name)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.get(user_id)
|
|
|
|
@app.route('/registrame/<referral_code>', methods=['GET', 'POST'])
|
|
def registrame(referral_code):
|
|
inviter = mongo.db.users.find_one({"referral_code": referral_code})
|
|
if not inviter:
|
|
message = "Ooops, parece que nadie te invitó aquí"
|
|
return render_template('error.html', message = message), 406
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
pwd = request.form['pwd']
|
|
new_user_data = {
|
|
"username": username,
|
|
"pwd": generate_password_hash(pwd),
|
|
"referral_code": str(uuid4()),
|
|
"invited_by": str(inviter['_id']),
|
|
"is_admin": False
|
|
}
|
|
new_user_id = mongo.db.users.insert_one(new_user_data).inserted_id
|
|
str_id = str(new_user_id)
|
|
mongo.db.users.update_one(
|
|
{'_id': new_user_id},
|
|
{'$set': {'str_id': str_id}}
|
|
)
|
|
invite_link = url_for('registrame', referral_code=new_user_data['referral_code'], _external=True)
|
|
login_user(load_user(new_user_id))
|
|
return redirect(url_for('index'))
|
|
#return render_template('dashboard.html', invite_link=invite_link)
|
|
|
|
if request.method == 'GET':
|
|
return render_template('register.html', form=RegistrationForm())
|
|
|
|
@app.route('/thelogin', methods=['GET', 'POST'])
|
|
def thelogin():
|
|
form2 = LoginForm()
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
pwd = request.form['pwd']
|
|
user_data = mongo.db.users.find_one({"username": username})
|
|
if user_data and check_password_hash(user_data['pwd'], pwd):
|
|
user = User(user_data)
|
|
login_user(user)
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return render_template('login.html', messages = 'Ooops, no hay tal usuario', form=form2)
|
|
return render_template('login.html',form=form2)
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash('Cerraste sesión exitosamente')
|
|
return redirect('/')
|
|
|
|
@app.route('/desheredame/<user_id>')
|
|
@login_required
|
|
def desheredame(user_id):
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index'))
|
|
else:
|
|
user_to_remove = mongo.db.users.find_one({"_id": ObjectId(user_id)})
|
|
if not user_to_remove:
|
|
message = "A este ya me lo desheredaron"
|
|
return render_template('error.html', message=message), 404
|
|
else:
|
|
# invited_by es un string con el user id, pero para eliminar a un usuario por id, ocupamos el ObjectId(string del user id)
|
|
# encontrando todos los usuarios invitados por el traidor para eliminar todos los pines que pusieron
|
|
for user in mongo.db.users.find({'invited_by': user_to_remove.id}):
|
|
mongo.db.pins.delete_many({"added_by": user.id})
|
|
# luego eliminamos a todos los usuarios cuyos pines acabamos de eliminar
|
|
mongo.db.users.delete_many({"invited_by": user_id})
|
|
#eliminamos los pines del usuario infractor
|
|
mongo.db.pins.delete_many({"added_by": user_to_remove.id})
|
|
# finalmente, eliminamos al usuario infractor
|
|
mongo.db.users.delete_one({"_id": ObjectId(user_id)})
|
|
return redirect(url_for('dashboard'))
|
|
|
|
@app.route("/remove_pin/<pin_id>")
|
|
@login_required
|
|
def remove_pin(pin_id):
|
|
actual_pin = mongo.db.pins.find_one({"_id": ObjectId(pin_id)})
|
|
print(actual_pin)
|
|
added_by = actual_pin.get("added_by")
|
|
print(added_by)
|
|
print(current_user.id)
|
|
if current_user.is_admin or current_user.id == added_by:
|
|
mongo.db.pins.delete_one({"_id": ObjectId(pin_id)})
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
#if request.method == 'GET':
|
|
#if current_user.is_authenticated == False:
|
|
# return redirect(url_for('thelogin'))
|
|
#else:
|
|
old_qr = mongo.db.users.find_one({'_id': ObjectId(current_user.id)})
|
|
print(old_qr)
|
|
invite_code = str(uuid4())
|
|
qr_update = mongo.db.users.update_one({'_id': ObjectId(current_user.id)}, {'$set': {'referral_code': invite_code}})
|
|
print(invite_code)
|
|
if not current_user.is_admin:
|
|
pins = list(mongo.db.pins.find({"added_by": current_user.id}))
|
|
return render_template('dashboard.html', pins=pins, invite_code=invite_code)
|
|
if current_user.is_admin:
|
|
users = list(mongo.db.users.find())
|
|
pins = list(mongo.db.pins.find())
|
|
return render_template('dashboard.html', users=users, pins=pins, invite_code=invite_code)
|
|
|
|
@app.cli.command('add_user')
|
|
def add_user():
|
|
username = input("Enter Username:\n")
|
|
pwd = input("Add pass:\n")
|
|
|
|
admin_user = {
|
|
"username": username,
|
|
"pwd": generate_password_hash(pwd),
|
|
"referral_code": str(uuid4()),
|
|
"invited_by": None,
|
|
"is_admin": True
|
|
}
|
|
mongo.db.users.insert_one(admin_user)
|
|
print(f"Admin {username} added! Referral code is {admin_user['referral_code']}")
|
|
|
|
@app.route('/leaderboard')
|
|
def leaderboard():
|
|
pipeline = [
|
|
{"$group": {"_id": "$added_by", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
{"$limit": 10},
|
|
]
|
|
|
|
# Convert the aggregation result to a list
|
|
leaders_by_id = list(mongo.db.pins.aggregate(pipeline))
|
|
|
|
# Create a list to store the final results
|
|
cleaned_leaders = []
|
|
|
|
# Process each leader
|
|
for leader in leaders_by_id:
|
|
user_id = leader["_id"]
|
|
count = leader["count"]
|
|
|
|
# Find the corresponding user
|
|
try:
|
|
user = mongo.db.users.find_one({"_id": ObjectId(user_id)})
|
|
if user and "username" in user:
|
|
username = user["username"]
|
|
if len(username) >= 2:
|
|
cleaned_username = username[0] + "***" + username[-1]
|
|
else:
|
|
cleaned_username = username
|
|
cleaned_leaders.append({"username": cleaned_username, "count": count})
|
|
except Exception as e:
|
|
print(f"Error processing user_id {user_id}: {e}")
|
|
continue
|
|
|
|
print('THIS IS THE LEADERBOARD', cleaned_leaders)
|
|
# Get stats using aggregation pipeline
|
|
# Count total users
|
|
total_users_result = list(mongo.db.users.aggregate([
|
|
{"$count": "total"}
|
|
]))
|
|
total_users = total_users_result[0]["total"] if total_users_result else 0
|
|
|
|
# Count unique users who have added pins
|
|
active_users_result = list(mongo.db.pins.aggregate([
|
|
{"$group": {"_id": None, "active_users": {"$addToSet": "$added_by"}}},
|
|
{"$project": {"active_count": {"$size": "$active_users"}}}
|
|
]))
|
|
active_users = active_users_result[0]["active_count"] if active_users_result else 0
|
|
|
|
# Calculate percentage (handle case where total_users might be 0)
|
|
active_percentage = round((active_users / total_users) * 100, 1) if total_users > 0 else 0
|
|
return render_template('leaderboard.html', leaders=cleaned_leaders, percentage = active_percentage)
|
|
|
|
|
|
return app
|
|
|
|
|
|
|
|
app=create_app()
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|