soter/soter/views/auth/user.py

159 lines
6.0 KiB
Python

import re
import uuid
import pkg_resources
from pyramid.response import Response, FileResponse
from pyramid.security import authenticated_userid
from pyramid.view import view_config
import transaction
from soter import rolefinder
from soter.models import DBSession
from soter.models.auth import User, Role
from soter.models.validation_exception import TryCatchFunction, ValidationError
class UserView(object):
def __init__(self, request):
self.request = request
self.user = authenticated_userid(request)
self.permissions = None
self.has_permission = False
if self.user is not None:
self.user = User.by_id(self.user)
self.has_permission = 'Users' in rolefinder(self.user.id, request)
@view_config(route_name='user_list', permission='Users')
@view_config(request_method='GET', route_name='user_id', permission='Authenticated')
@view_config(request_method='GET', route_name='user', permission='Users')
def html(self):
package, resource = 'soter:static/base.html'.split(':', 1)
file = pkg_resources.resource_filename(package, resource)
return FileResponse(file, request=self.request)
@view_config(request_method='POST', route_name='api_user', renderer='json', permission='Users')
def save(self):
return self.save_user()
@TryCatchFunction
def save_user(self):
user = User(self.request.json_body['Name'], self.request.json_body['Password'],
self.request.json_body['LockedOut'])
DBSession.add(user)
self.add_roles(user, self.request.json_body['Roles'])
transaction.commit()
return self.user_info(user.id)
@view_config(request_method='POST', route_name='api_user_id', renderer='json', permission='Authenticated')
def update(self):
id = self.request.matchdict['id']
p = re.compile('^[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}$')
if p.match(id):
user = User.by_id(uuid.UUID(id))
else:
user = User.by_name(id)
return self.update_user(user)
@TryCatchFunction
def update_user(self, user):
if user is None:
raise ValidationError('User name / id not found')
if self.has_permission:
user.name = self.request.json_body['Name']
user.locked_out = self.request.json_body['LockedOut']
self.add_roles(user, self.request.json_body['Roles'])
if self.request.json_body['Password'] != '' and self.request.json_body['Password'] != user.password:
user.password = self.request.json_body['Password']
transaction.commit()
return self.user_info(user.id)
@view_config(request_method='DELETE', route_name='api_user_id', renderer='json', permission='Users')
def delete(self):
id = self.request.matchdict.get('id', None)
if id is None:
response = Response("User is Null")
response.status_int = 500
return response
else:
response = Response("User deletion not implemented")
response.status_int = 500
return response
@view_config(request_method='GET', route_name='api_user_id', renderer='json', permission='Authenticated')
def show_id(self):
id = self.request.matchdict['id']
p = re.compile('^[A-Za-z0-9]{8}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{12}$')
if p.match(id):
id = uuid.UUID(id)
return self.user_info(id)
@view_config(request_method='GET', route_name='api_user', renderer='json', permission='Users')
def show_blank(self):
return self.user_info(None)
@view_config(request_method='GET', route_name='api_user', renderer='json', request_param='list', permission='Users')
def show_list(self):
list = User.list()
users = []
for item in list:
user = {'Name': item.name, 'LockedOut': item.locked_out,
'Roles': [], 'Url': self.request.route_url('user_id', id=item.name)}
for role in item.roles:
user['Roles'].append(role.name)
users.append(user)
return users
@view_config(request_method='GET', route_name='api_user', renderer='json', request_param='names',
permission='Authenticated')
def show_name(self):
list = User.query().filter(User.locked_out == False).order_by(User.name).all()
users = [{'Name': item.name} for item in list]
# for item in list:
# users.append({'Name': item.name})
return users
def user_info(self, id):
if id is None:
account = {'Name': '', 'LockedOut': False, 'Roles': []}
for item in Role.list():
account['Roles'].append({'RoleID': item.id, 'Name': item.name, 'Enabled': False})
return account
if isinstance(id, uuid.UUID):
user = User.by_id(id)
else:
user = User.by_name(id)
if self.has_permission:
account = {'UserID': user.id, 'Name': user.name, 'Password': '', 'LockedOut': user.locked_out, 'Roles': []}
for item in Role.list():
account['Roles'].append(
{'RoleID': item.id, 'Name': item.name, 'Enabled': True if item in user.roles else False})
elif self.user.id == user.id:
account = {'UserID': user.id, 'Name': user.name, 'Password': '', 'LockedOut': user.locked_out, 'Roles': []}
else:
response = Response("User can only update his/her password")
response.status_int = 500
return response
return account
def add_roles(self, user, roles):
for role in roles:
id = uuid.UUID(role['RoleID'])
ug = [g for g in user.roles if g.id == id]
ug = None if len(ug) == 0 else ug[0]
if role['Enabled'] and ug is None:
user.roles.append(Role.by_id(id))
elif not role['Enabled'] and ug:
user.roles.remove(ug)