From 0e168b914b8e06c36a50b8a513bacf3a53482e99 Mon Sep 17 00:00:00 2001 From: Steve Herrell Date: Mon, 23 Mar 2020 14:57:46 -0400 Subject: [PATCH] tighten up pemissions --- google-app-engine/app.yaml.in | 2 +- google-app-engine/main.py | 193 +++++++++++++++++++--------------- 2 files changed, 107 insertions(+), 88 deletions(-) diff --git a/google-app-engine/app.yaml.in b/google-app-engine/app.yaml.in index 80f3250..468c39d 100644 --- a/google-app-engine/app.yaml.in +++ b/google-app-engine/app.yaml.in @@ -1,7 +1,7 @@ runtime: python37 env_variables: - AUTH_KEY: 'XXXXXX' + AUTH_TOKEN: 'XXXXXX' SMTP_SERVER: 'xxx.xx:2020' SMTP_USERNAME: 'testing' SMTP_PASSWORD: '123' diff --git a/google-app-engine/main.py b/google-app-engine/main.py index 8c1f1ae..fb304e3 100644 --- a/google-app-engine/main.py +++ b/google-app-engine/main.py @@ -14,95 +14,101 @@ def fixup_email(email): - email = email.replace('@','.') - email = email.replace('+','.') + if email is not None: + email = email.replace('@','.') + email = email.replace('+','.') return email def check_admin_token(token): - - # is token sane - if not token: + if token is None: return False - # get token from env - saved_token = os.environ['AUTH_TOKEN'] - if not saved_token: + # get token from env and check for match + saved_token = os.environ.get('AUTH_TOKEN',None) + if saved_token is None: return False - - # check for match return token == saved_token def get_user_token(fmail): - - # get existing token - query = datastore_client.query(kind='tokens') - query.add_filter('fmail','=',fmail) - tokens = list(query.fetch()) - if tokens: - return tokens[0]['token'] - else: - return None + if fmail is not None: + query = datastore_client.query(kind='tokens') + query.add_filter('fmail','=',fmail) + tokens = list(query.fetch()) + if tokens: + return tokens[0]['token'] + return None def create_user_token(fmail): - - # create a new one! - token = secrets.token_hex(24) - entity = datastore.Entity(key=datastore_client.key('tokens')) - entity.update({ - 'fmail': fmail, - 'token': token, - 'timestamp': int(time.time()) - }) - datastore_client.put(entity) - return token + if fmail is not None: + token = secrets.token_hex(24) + entity = datastore.Entity(key=datastore_client.key('tokens')) + entity.update({ + 'fmail': fmail, + 'token': token, + 'timestamp': int(time.time()) + }) + datastore_client.put(entity) + return token + return None def check_user_token(fmail,token): - if token is None: + if fmail is None or token is None: return False return token == get_user_token(fmail) -def get_user_code(fmail): +def is_valid_user(fmail): + if fmail is None: + return False + return get_user_token(fmail) is not None + - query = datastore_client.query(kind='codes') - query.add_filter('fmail','=',fmail) - codes = list(query.fetch()) - if codes: - #if codes[0]['timestamp'] > int(time.time() - 300 ): - return codes[0] +def get_user_code(fmail): + if fmail is not None: + query = datastore_client.query(kind='codes') + query.add_filter('fmail','=',fmail) + codes = list(query.fetch()) + if codes: + return codes[0] return None + def clear_user_code(fmail): - query = datastore_client.query(kind='codes') - query.add_filter('fmail','=',fmail) - codes = query.fetch() - for old_code in codes: - datastore_client.delete(datastore_client.key('codes',old_code.id)) + if fmail is not None: + query = datastore_client.query(kind='codes') + query.add_filter('fmail','=',fmail) + codes = query.fetch() + for old_code in codes: + datastore_client.delete(datastore_client.key('codes',old_code.id)) def set_user_code(fmail,code): + if fmail is not None and code is not None: + # wipe out old first + clear_user_code(fmail) - # wipe out old first - clear_user_code(fmail) + # add in new + entity = datastore.Entity(key=datastore_client.key('codes')) + entity.update({ + 'fmail': fmail, + 'code': code, + 'timestamp': int(time.time()) + }) + datastore_client.put(entity) - # add in new - entity = datastore.Entity(key=datastore_client.key('codes')) - entity.update({ - 'fmail': fmail, - 'code': code, - 'timestamp': int(time.time()) - }) - datastore_client.put(entity) + +def has_permission(fmail,token): + return check_admin_token(token) or check_user_token(fmail,token) def parse_mail(mail): # Search for bits we are interested in - email= None + email = None code = None for line in mail: line = line.decode().rstrip() @@ -129,13 +135,15 @@ def register(): def register_done(): email = request.args.get('email',None) + if email is None: + return jsonify({ 'success': False, 'error': 'no email supplied' }) + fmail = fixup_email(email) - token = get_user_token(fmail) - if token is None: + if not is_valid_user(fmail): token = create_user_token(fmail) return jsonify({ 'success': True, 'email': email, - 'fwd-to': "pyaarlo+{}@thewardrobe.ca".format(fmail), + 'fwd-to': "pyaarlo@thewardrobe.ca", 'token': token }) else: return jsonify({ 'success': False, 'error': 'email already registered' }) @@ -144,24 +152,24 @@ def register_done(): @app.route('/get') def get(): - # validate args + # get args email = request.args.get('email',None) + fmail = fixup_email(email) token = request.args.get('token',None) - if not email or not token: - return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'please provide email and token', 'code': None }}) # validate email/token - fmail = fixup_email(email) - if not check_user_token(fmail,token): + if not has_permission(fmail,token): return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'incorrect email or token', 'code': None }}) + 'data': { 'success': False, 'error': 'permission denied', 'code': None }}) + if not is_valid_user(fmail): + return jsonify({ 'meta': { 'code': 400 }, + 'data': { 'success': False, 'error': 'no valid email found', 'code': None }}) # should be 0 or 1 entries code = get_user_code(fmail) if code is None: return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'no valid code for email address', 'code': None }}) + 'data': { 'success': False, 'error': 'no valid code found', 'code': None }}) return jsonify({ 'meta': { 'code': 200 }, 'data': { 'success': True, 'email': email, 'code': code['code'], 'timestamp': code['timestamp'] }}) @@ -170,15 +178,20 @@ def get(): @app.route('/clear') def clear(): - # validate args + # get args email = request.args.get('email',None) + fmail = fixup_email(email) token = request.args.get('token',None) - if not email or not token: - return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'please provide email and token', 'code': None }}) # validate email/token - fmail = fixup_email(email) + if not has_permission(fmail,token): + return jsonify({ 'meta': { 'code': 400 }, + 'data': { 'success': False, 'error': 'permission denied' }}) + if not is_valid_user(fmail): + return jsonify({ 'meta': { 'code': 400 }, + 'data': { 'success': False, 'error': 'no valid email found', 'code': None }}) + + # clear code clear_user_code(fmail) return jsonify({ 'meta': { 'code': 200 }, 'data': { 'success': True, 'email': email }}) @@ -187,22 +200,24 @@ def clear(): @app.route('/add') def add(): - # check token to start - if not check_admin_token(request.args.get('token',None)): - return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'invalid admin token' }}) - - # get email/code info + # get args email = request.args.get('email',None) - code = request.args.get('code',None) - if not email or not code: + fmail = fixup_email(email) + token = request.args.get('token',None) + + # validate email/token + if not has_permission(fmail,token): + return jsonify({ 'meta': { 'code': 400 }, + 'data': { 'success': False, 'error': 'permission denied' }}) + if not is_valid_user(fmail): return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'please provide email and token', 'code': None }}) + 'data': { 'success': False, 'error': 'no valid email found', 'code': None }}) - fmail = fixup_email(email) - if get_user_token(fmail) is None: + # set email/code info + code = request.args.get('code',None) + if code is None: return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'unknown email', 'code': None }}) + 'data': { 'success': False, 'error': 'please provide code', 'code': None }}) set_user_code(fmail,code) return jsonify({ 'meta': { 'code': 200 }, @@ -213,9 +228,10 @@ def add(): def mail(): # check token to start - if not check_admin_token(request.args.get('token',None)): + token = request.args.get('token',None) + if token is None: return jsonify({ 'meta': { 'code': 400 }, - 'data': { 'success': False, 'error': 'invalid admin token' }}) + 'data': { 'success': False, 'error': 'no token supplied' }}) # check file is there if 'file' not in request.files: @@ -228,13 +244,16 @@ def mail(): # Search for bits we are interested in email, code = parse_mail(mail) + fmail = fixup_email(email) if email is None or code is None: return jsonify({ 'meta': { 'code': 400 }, 'data': { 'success': False, 'error': 'unable to parse email' }}) - # is valid user? - fmail = fixup_email(email) - if get_user_token(fmail) is None: + # permission? can be admin or user level + if not has_permission(fmail,token): + return jsonify({ 'meta': { 'code': 400 }, + 'data': { 'success': False, 'error': 'permission denied' }}) + if not is_valid_user(fmail): return jsonify({ 'meta': { 'code': 400 }, 'data': { 'success': False, 'error': 'unknown email', 'code': None }})