Files
brewman/brewman/views/auth/user.py
2016-10-14 15:51:06 +05:30

156 lines
6.0 KiB
Python

import re
import uuid
import pkg_resources
from pyramid.response import Response, FileResponse
from pyramid.view import view_config
import transaction
from brewman import groupfinder
from brewman.models import DBSession
from brewman.models.auth import User, Group
from brewman.models.validation_exception import TryCatchFunction, ValidationError
class UserView(object):
def __init__(self, request):
self.request = request
self.user = request.authenticated_userid
self.permissions = None
self.has_permission = False
if self.user is not None:
self.user = User.by_id(self.user)
self.has_permission = 'Users' in groupfinder(self.user.id, request)
@view_config(route_name='user_list', permission='Users')
@view_config(request_method='GET', route_name='user_id', permission='Authenticated')
@view_config(request_method='GET', route_name='user', permission='Users')
def html(self):
package, resource = 'brewman:static/base.html'.split(':', 1)
file = pkg_resources.resource_filename(package, resource)
return FileResponse(file, request=self.request)
@view_config(request_method='POST', route_name='api_user', renderer='json', permission='Users')
def save(self):
return self.save_user()
@TryCatchFunction
def save_user(self):
user = User(self.request.json_body['Name'], self.request.json_body['Password'],
self.request.json_body['LockedOut'])
DBSession.add(user)
self.add_groups(user, self.request.json_body['Groups'])
transaction.commit()
return self.user_info(user.id)
@view_config(request_method='POST', route_name='api_user_id', renderer='json', permission='Authenticated')
def update(self):
id = self.request.matchdict['id']
p = re.compile('^[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}$')
if p.match(id):
user = User.by_id(uuid.UUID(id))
else:
user = User.by_name(id)
return self.update_user(user)
@TryCatchFunction
def update_user(self, user):
if user is None:
raise ValidationError('User name / id not found')
if self.has_permission:
user.name = self.request.json_body['Name']
user.locked_out = self.request.json_body['LockedOut']
self.add_groups(user, self.request.json_body['Groups'])
if self.request.json_body['Password'] != '' and self.request.json_body['Password'] != user.password:
user.password = self.request.json_body['Password']
transaction.commit()
return self.user_info(user.id)
@view_config(request_method='DELETE', route_name='api_user_id', renderer='json', permission='Users')
def delete(self):
id = self.request.matchdict.get('id', None)
if id is None:
response = Response("User is Null")
response.status_int = 500
return response
else:
response = Response("User deletion not implemented")
response.status_int = 500
return response
@view_config(request_method='GET', route_name='api_user_id', renderer='json', permission='Authenticated')
def show_id(self):
id = self.request.matchdict['id']
p = re.compile('^[A-Za-z0-9]{8}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{12}$')
if p.match(id):
id = uuid.UUID(id)
return self.user_info(id)
@view_config(request_method='GET', route_name='api_user', renderer='json', permission='Users')
def show_blank(self):
return self.user_info(None)
@view_config(request_method='GET', route_name='api_user', renderer='json', request_param='list', permission='Users')
def show_list(self):
list = User.list()
users = []
for item in list:
user = {'Name': item.name, 'LockedOut': item.locked_out,
'Groups': [], 'Url': self.request.route_url('user_id', id=item.name)}
for group in item.groups:
user['Groups'].append(group.name)
users.append(user)
return users
@view_config(request_method='GET', route_name='api_user', renderer='json', request_param='names',
permission='Authenticated')
def show_name(self):
list = User.query().filter(User.locked_out == False).order_by(User.name).all()
users = [{'Name': item.name} for item in list]
return users
def user_info(self, id):
if id is None:
account = {'Name': '', 'LockedOut': False, 'Groups': []}
for item in DBSession.query(Group).order_by(Group.name).all():
account['Groups'].append({'GroupID': item.id, 'Name': item.name, 'Enabled': False})
return account
if isinstance(id, uuid.UUID):
user = User.by_id(id)
else:
user = User.by_name(id)
if self.has_permission:
account = {'UserID': user.id, 'Name': user.name, 'Password': '', 'LockedOut': user.locked_out, 'Groups': []}
for item in DBSession.query(Group).order_by(Group.name).all():
account['Groups'].append(
{'GroupID': item.id, 'Name': item.name, 'Enabled': True if item in user.groups else False})
elif self.user.id == user.id:
account = {'UserID': user.id, 'Name': user.name, 'Password': '', 'LockedOut': user.locked_out, 'Groups': []}
else:
response = Response("User can only update his/her password")
response.status_int = 500
return response
return account
def add_groups(self, user, groups):
for group in groups:
id = uuid.UUID(group['GroupID'])
ug = [g for g in user.groups if g.id == id]
ug = None if len(ug) == 0 else ug[0]
if group['Enabled'] and ug is None:
user.groups.append(Group.by_id(id))
elif not group['Enabled'] and ug:
user.groups.remove(ug)