Validation: Making Product name unique ignoring the fraction_units.

Making the sku_version.units unique for a product across stock_keeping_units.

Both respecting the validity
This commit is contained in:
2026-02-17 04:13:11 +00:00
parent 9b0da9cb65
commit 338d9d63d5
4 changed files with 183 additions and 3 deletions

View File

@ -34,7 +34,7 @@ def upgrade():
) )
op.create_exclude_constraint( op.create_exclude_constraint(
"uq_product_versions_product_id", op.f("uq_product_versions_product_id"),
"product_versions", "product_versions",
(prod.c.product_id, "="), (prod.c.product_id, "="),
(func.daterange(prod.c.valid_from, prod.c.valid_till, text("'[]'")), "&&"), (func.daterange(prod.c.valid_from, prod.c.valid_till, text("'[]'")), "&&"),

View File

@ -0,0 +1,156 @@
"""index
Revision ID: 8260414066d6
Revises: 5cb65066be86
Create Date: 2026-02-13 06:22:39.926120
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "8260414066d6"
down_revision = "5cb65066be86"
branch_labels = None
depends_on = None
def upgrade():
# 1) Add the column (nullable for backfill)
op.add_column("sku_versions", sa.Column("product_id", sa.UUID(), nullable=True))
# 2) Define lightweight table objects (no autoload; explicit columns only)
sku_versions = sa.Table(
"sku_versions",
sa.MetaData(),
sa.Column("id", sa.UUID(), primary_key=True),
sa.Column("sku_id", sa.UUID(), nullable=False),
sa.Column("product_id", sa.UUID(), nullable=True),
sa.Column("valid_from", sa.Date(), nullable=True),
sa.Column("valid_till", sa.Date(), nullable=True),
sa.Column("units", sa.Unicode(), nullable=False),
)
stock_keeping_units = sa.Table(
"stock_keeping_units",
sa.MetaData(),
sa.Column("id", sa.UUID(), primary_key=True),
sa.Column("product_id", sa.UUID(), nullable=False),
)
# 3) Backfill via SQLAlchemy Core UPDATE..SET..(SELECT ...)
product_id_subq = (
sa.select(stock_keeping_units.c.product_id)
.where(stock_keeping_units.c.id == sku_versions.c.sku_id)
.scalar_subquery()
)
backfill_stmt = (
sa.update(sku_versions).values(product_id=product_id_subq).where(sku_versions.c.product_id.is_(None))
)
op.execute(backfill_stmt)
# 5) Enforce NOT NULL at schema level
op.alter_column("sku_versions", "product_id", nullable=False)
# 6) Trigger keeps product_id in sync when sku_id changes
op.execute(
sa.text(
"""
CREATE OR REPLACE FUNCTION sku_versions_set_product_id()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
SELECT s.product_id INTO NEW.product_id
FROM stock_keeping_units s
WHERE s.id = NEW.sku_id;
IF NEW.product_id IS NULL THEN
RAISE EXCEPTION 'Invalid sku_id %, no product found', NEW.sku_id;
END IF;
RETURN NEW;
END;
$$;
"""
)
)
op.execute(sa.text("DROP TRIGGER IF EXISTS trg_sku_versions_set_product_id ON sku_versions;"))
op.execute(
sa.text(
"""
CREATE TRIGGER trg_sku_versions_set_product_id
BEFORE INSERT OR UPDATE OF sku_id
ON sku_versions
FOR EACH ROW
EXECUTE FUNCTION sku_versions_set_product_id();
"""
)
)
# 7) Exclusion constraint: product_id + units must not overlap in time
# daterange(valid_from, valid_till, '[]') overlap operator &&.
sv = sa.table(
"sku_versions",
sa.column("product_id", sa.UUID()),
sa.column("units", sa.UUID()),
sa.column("valid_from", sa.Date()),
sa.column("valid_till", sa.Date()),
)
op.create_exclude_constraint(
op.f("uq_sku_versions_product_id_units"),
"sku_versions",
(sv.c.product_id, "="),
(sv.c.units, "="),
(sa.func.daterange(sv.c.valid_from, sv.c.valid_till, sa.text("'[]'")), "&&"),
)
prod = sa.table(
"product_versions",
sa.column("id", sa.UUID()),
sa.column("name", sa.Unicode(length=255)),
sa.column("fraction_units", sa.Unicode(length=255)),
sa.column("valid_from", sa.Date()),
sa.column("valid_till", sa.Date()),
)
# Update the exclude constraint on product_versions to only be on the name and drop fraction_units from it
op.drop_constraint("uq_product_versions_name", "product_versions", type_="unique")
op.create_exclude_constraint(
op.f("uq_product_versions_name"),
"product_versions",
(prod.c.name, "="),
(sa.func.daterange(prod.c.valid_from, prod.c.valid_till, sa.text("'[]'")), "&&"),
)
# ### end Alembic commands ###
def downgrade():
op.drop_constraint("uq_sku_versions_product_id_units", "sku_versions", type_="exclude")
# Drop trigger + function
op.execute(sa.text("DROP TRIGGER IF EXISTS trg_sku_versions_set_product_id ON sku_versions;"))
op.execute(sa.text("DROP FUNCTION IF EXISTS sku_versions_set_product_id();"))
# Drop column
op.drop_column("sku_versions", "product_id")
prod = sa.table(
"product_versions",
sa.column("id", sa.UUID()),
sa.column("name", sa.Unicode(length=255)),
sa.column("fraction_units", sa.Unicode(length=255)),
sa.column("valid_from", sa.Date()),
sa.column("valid_till", sa.Date()),
)
op.drop_constraint("uq_product_versions_name", "product_versions", type_="unique")
op.create_exclude_constraint(
"uq_product_versions_name",
"product_versions",
(prod.c.name, "="),
(prod.c.units, "="),
(sa.func.daterange(prod.c.valid_from, prod.c.valid_till, sa.text("'[]'")), "&&"),
)

View File

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING
from sqlalchemy import Boolean, Date, ForeignKey, Numeric, Unicode, Uuid, func, text from sqlalchemy import Boolean, Date, ForeignKey, Numeric, Unicode, Uuid, func, text
from sqlalchemy.dialects import postgresql from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
from ..db.base_class import reg from ..db.base_class import reg
@ -26,6 +26,14 @@ class SkuVersion:
Uuid, primary_key=True, insert_default=uuid.uuid4, server_default=text("gen_random_uuid()") Uuid, primary_key=True, insert_default=uuid.uuid4, server_default=text("gen_random_uuid()")
) )
sku_id: Mapped[uuid.UUID] = mapped_column(Uuid, ForeignKey("stock_keeping_units.id"), nullable=False) sku_id: Mapped[uuid.UUID] = mapped_column(Uuid, ForeignKey("stock_keeping_units.id"), nullable=False)
# DB column is "product_id", but ORM attribute is private: "_product_id"
_product_id: Mapped[uuid.UUID] = mapped_column(
"product_id",
Uuid,
nullable=False,
repr=False, # hides in dataclass repr
init=False, # not part of __init__ signature (dataclass)
)
units: Mapped[str] = mapped_column( units: Mapped[str] = mapped_column(
Unicode, nullable=False Unicode, nullable=False
) # Need to have logic in the application to handle unit uniqueness since we don't have product_id here ) # Need to have logic in the application to handle unit uniqueness since we don't have product_id here
@ -56,8 +64,24 @@ class SkuVersion:
(sku_id, "="), (sku_id, "="),
(func.daterange(valid_from, valid_till, text("'[]'")), "&&"), (func.daterange(valid_from, valid_till, text("'[]'")), "&&"),
), ),
# product-level uniqueness per time range
postgresql.ExcludeConstraint(
(_product_id, "="),
(units, "="),
(func.daterange(valid_from, valid_till, text("'[]'")), "&&"),
name="uq_sku_versions_product_units_time",
using="gist",
),
) )
@validates("_product_id")
def _prevent_writes_to_product_id(self, key, value):
"""
This column is integrity-only. We never accept app-side writes.
DB trigger sets it.
"""
raise ValueError("product_id is managed by the database and must not be set in application code.")
def __init__( def __init__(
self, self,
units: str = "", units: str = "",

View File

@ -60,7 +60,7 @@ def get_sale_report(
def get_sale(start_date: date, finish_date: date, id_: uuid.UUID | None, db: Session) -> list[SaleReportItem]: def get_sale(start_date: date, finish_date: date, id_: uuid.UUID | None, db: Session) -> list[SaleReportItem]:
day = func.cast( day = func.cast(
Voucher.date + timedelta(minutes=settings.TIMEZONE_OFFSET_MINUTES - settings.NEW_DAY_OFFSET_MINUTES), Date Kot.date + timedelta(minutes=settings.TIMEZONE_OFFSET_MINUTES - settings.NEW_DAY_OFFSET_MINUTES), Date
).label("day") ).label("day")
product_version_onclause = _pv_onclause(day) product_version_onclause = _pv_onclause(day)
query = ( query = (