150 lines
4.2 KiB
Python
150 lines
4.2 KiB
Python
from hashlib import sha256
|
|
import uuid
|
|
from sqlalchemy import Column, Unicode, Boolean, Table, ForeignKey, UniqueConstraint
|
|
from sqlalchemy.orm import relationship, synonym
|
|
from soter.models import Base, DBSession
|
|
from soter.models.guidtype import GUID
|
|
|
|
|
|
def encrypt(val):
|
|
return sha256(val.encode('utf-8') + "Salt".encode('utf-8')).hexdigest()
|
|
|
|
|
|
role_user = Table(
|
|
'role_users', Base.metadata,
|
|
Column('id', GUID(), primary_key=True, default=uuid.uuid4),
|
|
Column('user_id', GUID(), ForeignKey('users.id')),
|
|
Column('role_id', GUID(), ForeignKey('roles.id')),
|
|
UniqueConstraint('user_id', 'role_id')
|
|
)
|
|
|
|
role_permission = Table(
|
|
'role_permissions', Base.metadata,
|
|
Column('id', GUID(), primary_key=True, default=uuid.uuid4),
|
|
Column('permission_id', GUID(), ForeignKey('permissions.id')),
|
|
Column('role_id', GUID(), ForeignKey('roles.id')),
|
|
UniqueConstraint('permission_id', 'role_id')
|
|
)
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = 'users'
|
|
|
|
id = Column('id', GUID(), primary_key=True, default=uuid.uuid4)
|
|
name = Column('name', Unicode(255), unique=True)
|
|
display_name = Column('display_name', Unicode(255))
|
|
email = Column('email', Unicode(255), unique=True)
|
|
_password = Column('password', Unicode(64))
|
|
locked_out = Column('locked_out', Boolean)
|
|
|
|
roles = relationship("Role", secondary=role_user, backref='users')
|
|
|
|
def _get_password(self):
|
|
return self._password
|
|
|
|
def _set_password(self, password):
|
|
self._password = encrypt(password)
|
|
|
|
password = property(_get_password, _set_password)
|
|
password = synonym('_password', descriptor=password)
|
|
|
|
|
|
@property
|
|
def __name__(self):
|
|
return self.name
|
|
|
|
def __init__(self, name=None, display_name=None, email=None, password=None, locked_out=None, id=None):
|
|
self.name = name
|
|
self.display_name = display_name
|
|
self.email = email
|
|
self.password = password
|
|
self.locked_out = locked_out
|
|
self.id = id
|
|
|
|
@classmethod
|
|
def by_name(cls, name):
|
|
if not name:
|
|
return None
|
|
return DBSession.query(cls).filter(cls.name.ilike(name)).first()
|
|
|
|
@classmethod
|
|
def by_email(cls, email):
|
|
if not email:
|
|
return None
|
|
return DBSession.query(cls).filter(cls.email.ilike(email)).first()
|
|
|
|
@classmethod
|
|
def by_id(cls, id):
|
|
if not isinstance(id, uuid.UUID):
|
|
id = uuid.UUID(id)
|
|
return DBSession.query(cls).filter(cls.id == id).one()
|
|
|
|
@classmethod
|
|
def auth(cls, name, password):
|
|
if password is None:
|
|
return False, None
|
|
user = cls.by_name(name)
|
|
if not user:
|
|
return False, None
|
|
if user.password != encrypt(password) or user.locked_out:
|
|
return False, None
|
|
else:
|
|
return True, user
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return DBSession.query(cls).order_by(cls.name).all()
|
|
|
|
@classmethod
|
|
def query(cls):
|
|
return DBSession.query(cls)
|
|
|
|
@classmethod
|
|
def filtered_list(cls, name):
|
|
query = DBSession.query(cls)
|
|
for item in name.split():
|
|
query = query.filter(cls.name.ilike('%' + item + '%'))
|
|
return query.order_by(cls.name)
|
|
|
|
|
|
class Role(Base):
|
|
__tablename__ = 'roles'
|
|
|
|
id = Column('id', GUID(), primary_key=True, default=uuid.uuid4)
|
|
name = Column('name', Unicode(255), unique=True)
|
|
|
|
|
|
def __init__(self, name=None, id=None):
|
|
self.name = name
|
|
self.id = id
|
|
|
|
@classmethod
|
|
def by_id(cls, id):
|
|
return DBSession.query(cls).filter(cls.id == id).one()
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return DBSession.query(cls).order_by(cls.name).all()
|
|
|
|
|
|
class Permission(Base):
|
|
__tablename__ = 'permissions'
|
|
|
|
id = Column('id', GUID(), primary_key=True, default=uuid.uuid4)
|
|
name = Column('name', Unicode(255), unique=True)
|
|
|
|
roles = relationship("Role", secondary=role_permission, backref="permissions")
|
|
|
|
def __init__(self, name=None, id=None):
|
|
self.name = name
|
|
self.id = id
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return DBSession.query(cls).order_by(cls.name).all()
|
|
|
|
@classmethod
|
|
def by_id(cls, id):
|
|
return DBSession.query(cls).filter(cls.id == id).one()
|
|
|