152 lines
5.1 KiB
Python
152 lines
5.1 KiB
Python
import csv
|
|
import io
|
|
import uuid
|
|
|
|
from datetime import date, datetime, time, timedelta
|
|
from io import StringIO
|
|
|
|
import brewman.schemas.fingerprint as schemas
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
|
|
from sqlalchemy import bindparam, select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..core.security import get_current_active_user as get_user
|
|
from ..db.session import SessionFuture
|
|
from ..models.employee import Employee
|
|
from ..models.fingerprint import Fingerprint
|
|
from ..schemas.user import UserToken
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("", response_model=None)
|
|
def upload_prints(
|
|
fingerprints: UploadFile = File(None),
|
|
user: UserToken = Depends(get_user),
|
|
) -> None:
|
|
try:
|
|
with SessionFuture() as db:
|
|
start = date.today() - timedelta(days=90)
|
|
finish = date.today() + timedelta(days=7)
|
|
employees: dict[int, uuid.UUID] = {}
|
|
for id_, code in db.execute(select(Employee.id, Employee.code)).all():
|
|
employees[code] = id_
|
|
file_data = read_file(fingerprints)
|
|
prints = [d for d in fp(file_data, employees) if start <= d.date_.date() <= finish]
|
|
paged_data = [prints[i : i + 100] for i in range(0, len(prints), 100)]
|
|
for i, page in enumerate(paged_data):
|
|
print(f"Processing page {i} of {len(paged_data)}")
|
|
db.execute(
|
|
pg_insert(Fingerprint)
|
|
.values(
|
|
{
|
|
"id": bindparam("id"),
|
|
"employee_id": bindparam("employee_id"),
|
|
"date": bindparam("date"),
|
|
}
|
|
)
|
|
.on_conflict_do_nothing(),
|
|
[p.dict() for p in page],
|
|
)
|
|
db.commit()
|
|
except SQLAlchemyError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e),
|
|
)
|
|
|
|
|
|
def read_file(input_file: UploadFile) -> io.StringIO:
|
|
input_file.file.seek(0)
|
|
output = bytearray()
|
|
while 1:
|
|
data = input_file.file.read(2 << 16)
|
|
if not data:
|
|
break
|
|
output.extend(data)
|
|
if output[0:3] == b"\xef\xbb\xbf":
|
|
encoding = "utf-8"
|
|
elif output[0:2] == b"\xfe\xff" or output[0:2] == b"\xff\xfe":
|
|
encoding = "utf-16"
|
|
else:
|
|
encoding = "ascii"
|
|
# raise ValidationError("The encoding of the attendance file is not correct")
|
|
input_file.file.close()
|
|
return StringIO(output.decode(encoding))
|
|
|
|
|
|
def fp(file_data: StringIO, employees: dict[int, uuid.UUID]) -> list[schemas.Fingerprint]:
|
|
fingerprints: list[schemas.Fingerprint] = []
|
|
reader = csv.reader(file_data, delimiter="\t")
|
|
header = next(reader)
|
|
employee_column = 2
|
|
date_column = len(header) - 1
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
for row in reader:
|
|
try:
|
|
employee_code = int(row[employee_column]) # EnNo
|
|
date_ = datetime.strptime(row[date_column].replace("/", "-"), date_format)
|
|
if employee_code in employees.keys():
|
|
fingerprints.append(
|
|
schemas.Fingerprint(
|
|
id_=uuid.uuid4(),
|
|
employee_id=employees[employee_code],
|
|
date_=date_,
|
|
)
|
|
)
|
|
except ValueError:
|
|
continue
|
|
return fingerprints
|
|
|
|
|
|
def get_prints(employee_id: uuid.UUID, date_: date, db: Session) -> tuple[str, str, bool]:
|
|
prints = (
|
|
db.execute(
|
|
select(Fingerprint)
|
|
.where(
|
|
Fingerprint.employee_id == employee_id,
|
|
Fingerprint.date_ >= datetime.combine(date_, time(hour=7)),
|
|
Fingerprint.date_ < datetime.combine(date_ + timedelta(days=1), time(hour=7)),
|
|
)
|
|
.order_by(Fingerprint.date_)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
last = None
|
|
for i in range(len(prints), 0, -1):
|
|
item = prints[i - 1].date_
|
|
if last is not None and last - item < timedelta(minutes=10):
|
|
prints.remove(prints[i - 1])
|
|
else:
|
|
last = item
|
|
|
|
if len(prints) == 0:
|
|
hours_worked, full_day = "", False
|
|
elif len(prints) == 2:
|
|
time_worked = prints[1].date_ - prints[0].date_
|
|
hours_worked, full_day = working_hours(time_worked)
|
|
elif len(prints) == 4:
|
|
time_worked = (prints[1].date_ - prints[0].date_) + (prints[3].date_ - prints[2].date_)
|
|
hours_worked, full_day = working_hours(time_worked)
|
|
else:
|
|
hours_worked, full_day = "Error", False
|
|
return (
|
|
", ".join([x.date_.strftime("%H:%M") for x in prints]) + " ",
|
|
hours_worked,
|
|
full_day,
|
|
)
|
|
|
|
|
|
def working_hours(delta: timedelta) -> tuple[str, bool]:
|
|
minutes = (delta.seconds // 60) % 60
|
|
minutes = int(5 * round(float(minutes) / 5))
|
|
hours = delta.seconds // 3600
|
|
hours_worked = str(hours).zfill(2) + ":" + str(minutes).zfill(2)
|
|
return hours_worked, delta.seconds >= 60 * 60 * 9 # 9hrs
|