271 lines
8.7 KiB
Python
271 lines
8.7 KiB
Python
from flask import Blueprint, make_response, session, request, redirect, url_for
|
|
from flask import current_app as app
|
|
from voluptuous import Schema, Length, Required
|
|
|
|
from models import db, LoginTokens, Users
|
|
from decorators import api_wrapper, WebException
|
|
from schemas import verify_to_schema, check
|
|
|
|
import datetime
|
|
import logger
|
|
import re
|
|
import requests
|
|
import team
|
|
import utils
|
|
|
|
###############
|
|
# USER ROUTES #
|
|
###############
|
|
|
|
blueprint = Blueprint("user", __name__)
|
|
|
|
@blueprint.route("/forgot", methods=["POST"])
|
|
@blueprint.route("/forgot/<token>", methods=["GET", "POST"])
|
|
@api_wrapper
|
|
def user_forgot_password(token=None):
|
|
params = utils.flat_multi(request.form)
|
|
if token is not None:
|
|
user = get_user(reset_token=token).first()
|
|
if user is None:
|
|
raise WebException("Invalid reset token.")
|
|
|
|
# We are viewing the actual reset form
|
|
if request.method == "GET":
|
|
return { "success": 1, "message": ""}
|
|
|
|
# Submission of actual reset form
|
|
if request.method == "POST":
|
|
password = params.get("password")
|
|
confirm_password = params.get("confirm_password")
|
|
if password != confirm_password:
|
|
raise WebException("Passwords do not match.")
|
|
else:
|
|
user.password = utils.hash_password(password)
|
|
user.reset_token = None
|
|
current_session = db.session.object_session(user)
|
|
current_session.add(user)
|
|
current_session.commit()
|
|
return { "success": 1, "message": "Success!" }
|
|
else:
|
|
email = params.get("email").lower()
|
|
|
|
user = get_user(email=email).first()
|
|
if user is None:
|
|
raise WebException("User with that email does not exist.")
|
|
|
|
token = utils.generate_string(length=64)
|
|
user.reset_token = token
|
|
current_session = db.session.object_session(user)
|
|
current_session.add(user)
|
|
current_session.commit()
|
|
|
|
reset_link = "%s/forgot/%s" % ("127.0.0.1:8000", token)
|
|
subject = "EasyCTF password reset"
|
|
body = """%s,\n\nA request to reset your EasyCTF password has been made. If you did not request this password reset, you may safely ignore this email and delete it.\n\nYou may reset your password by clicking this link or pasting it to your browser.\n\n%s\n\nThis link can only be used once, and will lead you to a page where you can reset your password.\n\nGood luck!\n\n- The EasyCTF Team""" % (user.username, reset_link)
|
|
response = utils.send_email(email, subject, body).json()
|
|
if "Queued" in response["message"]:
|
|
return { "success": 1, "message": "Email sent to %s" % email }
|
|
else:
|
|
raise WebException(response["message"])
|
|
|
|
@blueprint.route("/register", methods=["POST"])
|
|
@api_wrapper
|
|
def user_register():
|
|
params = utils.flat_multi(request.form)
|
|
|
|
name = params.get("name")
|
|
email = params.get("email")
|
|
username = params.get("username")
|
|
password = params.get("password")
|
|
password_confirm = params.get("password_confirm")
|
|
utype = int(params.get("type"))
|
|
|
|
if password != password_confirm:
|
|
raise WebException("Passwords do not match.")
|
|
verify_to_schema(UserSchema, params)
|
|
|
|
user = Users(name, username, email, password, utype)
|
|
with app.app_context():
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
logger.log(__name__, logger.INFO, "%s registered with %s" % (name.encode("utf-8"), email.encode("utf-8")))
|
|
login_user(username, password)
|
|
|
|
return { "success": 1, "message": "Success!" }
|
|
|
|
@blueprint.route("/logout", methods=["GET"])
|
|
@api_wrapper
|
|
def user_logout():
|
|
sid = session["sid"]
|
|
username = session["username"]
|
|
with app.app_context():
|
|
expired = LoginTokens.query.filter_by(username=username).all()
|
|
for expired_token in expired: db.session.delete(expired_token)
|
|
db.session.commit()
|
|
session.clear()
|
|
|
|
@blueprint.route("/login", methods=["POST"])
|
|
@api_wrapper
|
|
def user_login():
|
|
params = utils.flat_multi(request.form)
|
|
|
|
username = params.get("username")
|
|
password = params.get("password")
|
|
|
|
result = login_user(username, password)
|
|
if result != True:
|
|
raise WebException("Please check if your username/password are correct.")
|
|
|
|
return { "success": 1, "message": "Success!" }
|
|
|
|
@blueprint.route("/status", methods=["GET"])
|
|
@api_wrapper
|
|
def user_status():
|
|
logged_in = is_logged_in()
|
|
result = {
|
|
"success": 1,
|
|
"logged_in": logged_in,
|
|
"admin": is_admin(),
|
|
"username": session["username"] if logged_in else "",
|
|
}
|
|
if logged_in:
|
|
result["has_team"] = in_team(get_user().first())
|
|
|
|
return result
|
|
|
|
@blueprint.route("/info", methods=["GET"])
|
|
@api_wrapper
|
|
def user_info():
|
|
logged_in = is_logged_in()
|
|
username = utils.flat_multi(request.args).get("username")
|
|
if username is None:
|
|
if logged_in:
|
|
username = session["username"]
|
|
if username is None:
|
|
raise WebException("No user specified.")
|
|
me = False if not("username" in session) else username.lower() == session["username"].lower()
|
|
user = get_user(username_lower=username.lower()).first()
|
|
if user is None:
|
|
raise WebException("User not found.")
|
|
|
|
show_email = me if logged_in else False
|
|
user_in_team = in_team(user)
|
|
userdata = {
|
|
"user_found": True,
|
|
"name": user.name,
|
|
"username": user.username,
|
|
"type": ["Student", "Instructor", "Observer"][user.utype - 1],
|
|
"admin": user.admin,
|
|
"registertime": datetime.datetime.fromtimestamp(user.registertime).isoformat() + "Z",
|
|
"me": me,
|
|
"show_email": show_email,
|
|
"in_team": user_in_team
|
|
}
|
|
if show_email:
|
|
userdata["email"] = user.email
|
|
if user_in_team:
|
|
userdata["team"] = team.get_team_info(tid=user.tid)
|
|
return { "success": 1, "user": userdata }
|
|
|
|
##################
|
|
# USER FUNCTIONS #
|
|
##################
|
|
|
|
__check_username = lambda username: get_user(username_lower=username.lower()).first() is None
|
|
__check_email = lambda email: get_user(email=email.lower()).first() is None
|
|
|
|
UserSchema = Schema({
|
|
Required("email"): check(
|
|
([str, Length(min=4, max=128)], "Your email should be between 4 and 128 characters long."),
|
|
([__check_email], "Someone already registered this email."),
|
|
([utils.__check_email_format], "Please enter a legit email.")
|
|
),
|
|
Required("name"): check(
|
|
([str, Length(min=4, max=128)], "Your name should be between 4 and 128 characters long.")
|
|
),
|
|
Required("username"): check(
|
|
([str, Length(min=4, max=32)], "Your username should be between 4 and 32 characters long."),
|
|
([utils.__check_alphanumeric], "Please only use alphanumeric characters in your username."),
|
|
([__check_username], "This username is taken, did you forget your password?")
|
|
),
|
|
Required("password"): check(
|
|
([str, Length(min=4, max=64)], "Your password should be between 4 and 64 characters long."),
|
|
([utils.__check_ascii], "Please only use ASCII characters in your password."),
|
|
),
|
|
Required("type"): check(
|
|
([str, lambda x: x.isdigit()], "Please use the online form.")
|
|
),
|
|
"notify": str
|
|
}, extra=True)
|
|
|
|
def get_user(username=None, username_lower=None, email=None, uid=None, reset_token=None):
|
|
match = {}
|
|
if username != None:
|
|
match.update({ "username": username })
|
|
elif username_lower != None:
|
|
match.update({ "username_lower": username_lower })
|
|
elif uid != None:
|
|
match.update({ "uid": uid })
|
|
elif email != None:
|
|
match.update({ "email": email })
|
|
elif is_logged_in():
|
|
match.update({ "username": session["username"] })
|
|
elif reset_token != None:
|
|
match.update({ "reset_token": reset_token })
|
|
with app.app_context():
|
|
result = Users.query.filter_by(**match)
|
|
return result
|
|
|
|
def login_user(username, password):
|
|
user = get_user(username_lower=username.lower()).first()
|
|
if user is None: return False
|
|
correct = utils.check_password(user.password, password)
|
|
if not correct: return False
|
|
|
|
useragent = request.headers.get("User-Agent")
|
|
ip = request.remote_addr
|
|
|
|
with app.app_context():
|
|
expired = LoginTokens.query.filter_by(username=username).all()
|
|
for expired_token in expired: db.session.delete(expired_token)
|
|
|
|
token = LoginTokens(user.uid, user.username, ua=useragent, ip=ip)
|
|
db.session.add(token)
|
|
db.session.commit()
|
|
|
|
session["sid"] = token.sid
|
|
session["username"] = token.username
|
|
session["admin"] = user.admin == True
|
|
if user.tid is not None and user.tid >= 0:
|
|
session["tid"] = user.tid
|
|
|
|
return True
|
|
|
|
def in_team(user):
|
|
return user.tid is not None and user.tid >= 0
|
|
|
|
def is_logged_in():
|
|
if not("sid" in session and "username" in session): return False
|
|
sid = session["sid"]
|
|
username = session["username"]
|
|
token = LoginTokens.query.filter_by(sid=sid).first()
|
|
if token is None: return False
|
|
|
|
useragent = request.headers.get("User-Agent")
|
|
ip = request.remote_addr
|
|
|
|
if token.username != username: return False
|
|
if token.ua != useragent: return False
|
|
return True
|
|
|
|
def is_admin():
|
|
return is_logged_in() and "admin" in session and session["admin"]
|
|
|
|
def validate_captcha(form):
|
|
if "captcha_response" not in form:
|
|
return False
|
|
captcha_response = form["captcha_response"]
|
|
data = {"secret": "6Lc4xhMTAAAAACFaG2NyuKoMdZQtSa_1LI76BCEu", "response": captcha_response}
|
|
response = requests.post("https://www.google.com/recaptcha/api/siteverify", data=data)
|
|
return response.json()["success"]
|