brewman/brewman/brewman/routers/fingerprint.py

143 lines
4.8 KiB
Python

import csv
import io
import uuid
from datetime import date, datetime, time, timedelta
from io import StringIO
import brewman.schemas.fingerprint as schemas
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionFuture
from ..models.employee import Employee
from ..models.fingerprint import Fingerprint
from ..schemas.user import UserToken
router = APIRouter()
@router.post("", response_model=None)
def upload_prints(
fingerprints: UploadFile = File(None),
user: UserToken = Depends(get_user),
) -> None:
try:
with SessionFuture() as db:
start = date.today() - timedelta(days=90)
finish = date.today() + timedelta(days=7)
employees: dict[int, uuid.UUID] = {}
for id_, code in db.execute(select(Employee.id, Employee.code)).all():
employees[code] = id_
file_data = read_file(fingerprints)
prints = [d.model_dump() for d in fp(file_data, employees) if start <= d.date_.date() <= finish]
for p in prints:
p["id"] = p.pop("id_")
paged_data = [prints[i : i + 100] for i in range(0, len(prints), 100)]
for i, page in enumerate(paged_data):
print(f"Processing page {i} of {len(paged_data)}")
db.execute(pg_insert(Fingerprint).on_conflict_do_nothing(), page)
db.commit()
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
def read_file(input_file: UploadFile) -> io.StringIO:
input_file.file.seek(0)
output = bytearray()
while 1:
data = input_file.file.read(2 << 16)
if not data:
break
output.extend(data)
if output[0:3] == b"\xef\xbb\xbf":
encoding = "utf-8"
elif output[0:2] == b"\xfe\xff" or output[0:2] == b"\xff\xfe":
encoding = "utf-16"
else:
encoding = "ascii"
# raise ValidationError("The encoding of the attendance file is not correct")
input_file.file.close()
return StringIO(output.decode(encoding))
def fp(file_data: StringIO, employees: dict[int, uuid.UUID]) -> list[schemas.Fingerprint]:
fingerprints: list[schemas.Fingerprint] = []
reader = csv.reader(file_data, delimiter="\t")
header = next(reader)
employee_column = 2
date_column = len(header) - 1
date_format = "%Y-%m-%d %H:%M:%S"
for row in reader:
try:
employee_code = int(row[employee_column]) # EnNo
date_ = datetime.strptime(row[date_column].replace("/", "-"), date_format)
if employee_code in employees.keys():
fingerprints.append(
schemas.Fingerprint(
id_=uuid.uuid4(),
employee_id=employees[employee_code],
date_=date_,
)
)
except ValueError:
continue
return fingerprints
def get_prints(employee_id: uuid.UUID, date_: date, db: Session) -> tuple[str, str, bool]:
prints = (
db.execute(
select(Fingerprint)
.where(
Fingerprint.employee_id == employee_id,
Fingerprint.date_ >= datetime.combine(date_, time(hour=7)),
Fingerprint.date_ < datetime.combine(date_ + timedelta(days=1), time(hour=7)),
)
.order_by(Fingerprint.date_)
)
.scalars()
.all()
)
last = None
for i in range(len(prints), 0, -1):
item = prints[i - 1].date_
if last is not None and last - item < timedelta(minutes=10):
prints.remove(prints[i - 1])
else:
last = item
if len(prints) == 0:
hours_worked, full_day = "", False
elif len(prints) == 2:
time_worked = prints[1].date_ - prints[0].date_
hours_worked, full_day = working_hours(time_worked)
elif len(prints) == 4:
time_worked = (prints[1].date_ - prints[0].date_) + (prints[3].date_ - prints[2].date_)
hours_worked, full_day = working_hours(time_worked)
else:
hours_worked, full_day = "Error", False
return (
", ".join([x.date_.strftime("%H:%M") for x in prints]) + " ",
hours_worked,
full_day,
)
def working_hours(delta: timedelta) -> tuple[str, bool]:
minutes = (delta.seconds // 60) % 60
minutes = int(5 * round(float(minutes) / 5))
hours = delta.seconds // 3600
hours_worked = str(hours).zfill(2) + ":" + str(minutes).zfill(2)
return hours_worked, delta.seconds >= 60 * 60 * 9 # 9hrs