From ba6c9f61d23a4135158eb54592afdb5f60d16fe5 Mon Sep 17 00:00:00 2001 From: MacBook Air M1 Date: Wed, 18 Feb 2026 14:32:41 +0700 Subject: [PATCH] add base fifo-v2 --- .../service/common.fifo_stock_v2.service.go | 41 ++ internal/common/service/fifo_stock_v2/RFC.md | 58 ++ .../common/service/fifo_stock_v2/allocate.go | 660 ++++++++++++++++++ .../common/service/fifo_stock_v2/config.go | 170 +++++ .../common/service/fifo_stock_v2/errors.go | 8 + .../common/service/fifo_stock_v2/gather.go | 268 +++++++ .../service/fifo_stock_v2/recalculate.go | 177 +++++ .../common/service/fifo_stock_v2/service.go | 265 +++++++ .../common/service/fifo_stock_v2/types.go | 142 ++++ ...8090000_create_fifo_stock_v2_core.down.sql | 24 + ...218090000_create_fifo_stock_v2_core.up.sql | 151 ++++ ...8090010_seed_fifo_stock_v2_config.down.sql | 36 + ...218090010_seed_fifo_stock_v2_config.up.sql | 248 +++++++ 13 files changed, 2248 insertions(+) create mode 100644 internal/common/service/common.fifo_stock_v2.service.go create mode 100644 internal/common/service/fifo_stock_v2/RFC.md create mode 100644 internal/common/service/fifo_stock_v2/allocate.go create mode 100644 internal/common/service/fifo_stock_v2/config.go create mode 100644 internal/common/service/fifo_stock_v2/errors.go create mode 100644 internal/common/service/fifo_stock_v2/gather.go create mode 100644 internal/common/service/fifo_stock_v2/recalculate.go create mode 100644 internal/common/service/fifo_stock_v2/service.go create mode 100644 internal/common/service/fifo_stock_v2/types.go create mode 100644 internal/database/migrations/20260218090000_create_fifo_stock_v2_core.down.sql create mode 100644 internal/database/migrations/20260218090000_create_fifo_stock_v2_core.up.sql create mode 100644 internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.down.sql create mode 100644 internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.up.sql diff --git a/internal/common/service/common.fifo_stock_v2.service.go b/internal/common/service/common.fifo_stock_v2.service.go new file mode 100644 index 00000000..a1b51e9e --- /dev/null +++ b/internal/common/service/common.fifo_stock_v2.service.go @@ -0,0 +1,41 @@ +package service + +import ( + "github.com/sirupsen/logrus" + fifoStockV2 "gitlab.com/mbugroup/lti-api.git/internal/common/service/fifo_stock_v2" + "gorm.io/gorm" +) + +type FifoStockV2Service = fifoStockV2.Service + +type FifoStockV2Lane = fifoStockV2.Lane + +type FifoStockV2Ref = fifoStockV2.Ref + +type FifoStockV2GatherRequest = fifoStockV2.GatherRequest + +type FifoStockV2GatherRow = fifoStockV2.GatherRow + +type FifoStockV2AllocateRequest = fifoStockV2.AllocateRequest + +type FifoStockV2AllocateResult = fifoStockV2.AllocateResult + +type FifoStockV2AllocationDetail = fifoStockV2.AllocationDetail + +type FifoStockV2RollbackRequest = fifoStockV2.RollbackRequest + +type FifoStockV2RollbackResult = fifoStockV2.RollbackResult + +type FifoStockV2ReflowRequest = fifoStockV2.ReflowRequest + +type FifoStockV2ReflowResult = fifoStockV2.ReflowResult + +type FifoStockV2RecalculateRequest = fifoStockV2.RecalculateRequest + +type FifoStockV2RecalculateResult = fifoStockV2.RecalculateResult + +type FifoStockV2WarehouseDrift = fifoStockV2.WarehouseDrift + +func NewFifoStockV2Service(db *gorm.DB, logger *logrus.Logger) FifoStockV2Service { + return fifoStockV2.NewService(db, logger) +} diff --git a/internal/common/service/fifo_stock_v2/RFC.md b/internal/common/service/fifo_stock_v2/RFC.md new file mode 100644 index 00000000..3c10ffd7 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/RFC.md @@ -0,0 +1,58 @@ +# RFC Ringkas: FIFO Stock V2 + +## Tujuan +`fifo_stock_v2` adalah engine FIFO baru berbasis konfigurasi `Flag Group + Jalur` yang berjalan paralel dengan v1 tanpa memutus kompatibilitas `stock_allocations`, HPP, dan closing/reporting existing. + +## Prinsip +- V1 tidak dihapus, V2 jalan paralel. +- Semua operasi transactional. +- FIFO sorting deterministic lintas tabel. +- Default over-consume `ALLOW` (pending), exception dapat `BLOCK`. +- Reflow idempotent. +- Recalculate bisa memperbaiki drift `product_warehouses.qty`. + +## Komponen +- `fifo_stock_v2_flag_groups`: master grouping flag produk. +- `fifo_stock_v2_flag_members`: pemetaan flag -> group. +- `fifo_stock_v2_traits`: trait sort per `table:date_column` (+ optional join date source). +- `fifo_stock_v2_route_rules`: rule per `flag_group + lane + function + table`. +- `fifo_stock_v2_overconsume_rules`: policy pending/over-consume. +- `fifo_stock_v2_operation_log`: idempotency + audit operasi. +- `fifo_stock_v2_reflow_runs` + checkpoints + shadow allocations: bulk reflow resumable/observable. + +## API Service +- `Gather`: union cross-table berdasarkan route rules + trait sorting. +- `Allocate`: alokasi lot FIFO ke usable. +- `Rollback`: batalkan alokasi aktif. +- `Reflow`: rollback penuh lalu allocate ulang (idempotent). +- `Recalculate`: rekonsiliasi qty warehouse dari ledger FIFO. + +## Deterministic Sorting +Urutan gather: +1. `sort_at ASC` (dari trait `date_column`) +2. `sort_priority ASC` +3. `source_table ASC` +4. `source_id ASC` + +Fallback waktu: `1970-01-01 00:00:00+00` bila tanggal null. + +## Compat Strategy +- Tetap menulis ke `stock_allocations` dengan tambahan metadata: + - `engine_version` (`v1`/`v2`) + - `flag_group_code` + - `function_code` + - `idempotency_key` +- Query lama yang bergantung `stockable_type/usable_type` tetap berjalan. + +## Migration Strategy +1. Deploy schema + seed v2. +2. Aktifkan shadow-run comparator v1 vs v2. +3. Canary cutover per flag group. +4. Full cutover jika parity aman. +5. Jalankan bulk reflow existing data. + +## Acceptance Criteria Singkat +- Parity mismatch terkendali pada aggregate + detail alokasi. +- Tidak ada regression closing/HPP. +- Drift qty warehouse turun signifikan pasca reflow. +- Rollback via feature flag memungkinkan kembali ke v1. diff --git a/internal/common/service/fifo_stock_v2/allocate.go b/internal/common/service/fifo_stock_v2/allocate.go new file mode 100644 index 00000000..2fb45090 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/allocate.go @@ -0,0 +1,660 @@ +package fifo_stock_v2 + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "strings" + "time" + + "gorm.io/gorm" +) + +type allocationRow struct { + ID uint `gorm:"column:id"` + ProductWarehouseID uint `gorm:"column:product_warehouse_id"` + StockableType string `gorm:"column:stockable_type"` + StockableID uint `gorm:"column:stockable_id"` + UsableType string `gorm:"column:usable_type"` + UsableID uint `gorm:"column:usable_id"` + Qty float64 `gorm:"column:qty"` + Status string `gorm:"column:status"` + CreatedAt time.Time `gorm:"column:created_at"` +} + +type usableQtySnapshot struct { + Usage float64 `gorm:"column:usage_qty"` + Pending float64 `gorm:"column:pending_qty"` +} + +func (s *fifoStockV2Service) Allocate(ctx context.Context, req AllocateRequest) (*AllocateResult, error) { + if err := s.validateAllocateRequest(req); err != nil { + return nil, err + } + + result := &AllocateResult{} + err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { + if err := s.ensureStockAllocationColumns(tx); err != nil { + return err + } + if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil { + return err + } + + hash := requestHash(map[string]any{ + "flag_group_code": req.FlagGroupCode, + "product_warehouse_id": req.ProductWarehouseID, + "usable_type": req.Usable.LegacyTypeKey, + "usable_id": req.Usable.ID, + "need_qty": req.NeedQty, + "as_of": req.AsOf, + "allow_over_consume": req.AllowOverConsume, + }) + logRow, reused, err := s.beginOperation( + tx, + OperationAllocate, + req.IdempotencyKey, + hash, + req.ProductWarehouseID, + req.FlagGroupCode, + req.Usable.LegacyTypeKey, + req.Usable.ID, + ) + if err != nil { + return err + } + if reused { + if len(logRow.ResultPayload) == 0 { + return fmt.Errorf("idempotent allocate has empty payload") + } + if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { + return err + } + return nil + } + if logRow != nil { + defer func() { + if err != nil { + s.failOperation(tx, logRow, err) + } + }() + } + + allocated, allocErr := s.allocateInternal(ctx, tx, req) + if allocErr != nil { + err = allocErr + return allocErr + } + *result = *allocated + + if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { + err = finishErr + return finishErr + } + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func (s *fifoStockV2Service) allocateInternal(ctx context.Context, tx *gorm.DB, req AllocateRequest) (*AllocateResult, error) { + usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, req.FlagGroupCode, req.Usable.LegacyTypeKey) + if err != nil { + return nil, err + } + + allowOverConsume := usableRule.AllowPendingDefault + if req.AllowOverConsume != nil { + allowOverConsume = *req.AllowOverConsume + } else { + allowOverConsume, err = s.resolveOverConsume(tx, req.FlagGroupCode, req.Usable.FunctionCode, LaneUsable, allowOverConsume) + if err != nil { + return nil, err + } + } + + gatherRows, err := s.gatherRows(ctx, tx, GatherRequest{ + FlagGroupCode: req.FlagGroupCode, + Lane: LaneStockable, + ProductWarehouseID: req.ProductWarehouseID, + AsOf: req.AsOf, + Limit: s.defaultGatherLimit, + }) + if err != nil { + return nil, err + } + + stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, req.FlagGroupCode) + if err != nil { + return nil, err + } + + now := time.Now() + remaining := req.NeedQty + result := &AllocateResult{Details: make([]AllocationDetail, 0)} + + for _, lot := range gatherRows { + if remaining <= 0 { + break + } + if lot.AvailableQuantity <= 0 { + continue + } + portion := math.Min(remaining, lot.AvailableQuantity) + if nearlyZero(portion) { + continue + } + + allocationInsert := map[string]any{ + "product_warehouse_id": req.ProductWarehouseID, + "stockable_type": lot.Ref.LegacyTypeKey, + "stockable_id": lot.Ref.ID, + "usable_type": req.Usable.LegacyTypeKey, + "usable_id": req.Usable.ID, + "qty": portion, + "status": activeAllocationStatus(), + "created_at": now, + "updated_at": now, + "engine_version": "v2", + "flag_group_code": req.FlagGroupCode, + "function_code": req.Usable.FunctionCode, + } + if strings.TrimSpace(req.IdempotencyKey) != "" { + allocationInsert["idempotency_key"] = req.IdempotencyKey + } + if err := tx.Table("stock_allocations").Create(allocationInsert).Error; err != nil { + return nil, err + } + + rule, ok := stockableRuleMap[lot.Ref.LegacyTypeKey] + if !ok { + return nil, fmt.Errorf("missing stockable route rule for type %s", lot.Ref.LegacyTypeKey) + } + if err := s.adjustStockableUsedQuantity(tx, rule, lot.Ref.ID, portion); err != nil { + return nil, err + } + + result.Details = append(result.Details, AllocationDetail{ + StockableType: lot.Ref.LegacyTypeKey, + StockableID: lot.Ref.ID, + Qty: portion, + SortAt: lot.SortAt, + }) + + remaining -= portion + result.AllocatedQty += portion + } + + if remaining > 0 { + if !allowOverConsume { + return nil, fmt.Errorf("%w: requested %.3f, allocated %.3f", ErrInsufficientStock, req.NeedQty, result.AllocatedQty) + } + result.PendingQty = remaining + } + + if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, result.AllocatedQty, result.PendingQty); err != nil { + return nil, err + } + if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, -result.AllocatedQty); err != nil { + return nil, err + } + + return result, nil +} + +func (s *fifoStockV2Service) Rollback(ctx context.Context, req RollbackRequest) (*RollbackResult, error) { + if err := s.validateRollbackRequest(req); err != nil { + return nil, err + } + + result := &RollbackResult{} + err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { + if err := s.ensureStockAllocationColumns(tx); err != nil { + return err + } + + flagGroupCode, err := s.resolveRollbackFlagGroup(ctx, tx, req) + if err != nil { + return err + } + if err := s.lockShard(tx, flagGroupCode, req.ProductWarehouseID); err != nil { + return err + } + + hash := requestHash(map[string]any{ + "product_warehouse_id": req.ProductWarehouseID, + "usable_type": req.Usable.LegacyTypeKey, + "usable_id": req.Usable.ID, + "release_qty": req.ReleaseQty, + "reason": req.Reason, + "flag_group_code": flagGroupCode, + }) + logRow, reused, beginErr := s.beginOperation( + tx, + OperationRollback, + req.IdempotencyKey, + hash, + req.ProductWarehouseID, + flagGroupCode, + req.Usable.LegacyTypeKey, + req.Usable.ID, + ) + if beginErr != nil { + return beginErr + } + if reused { + if len(logRow.ResultPayload) == 0 { + return fmt.Errorf("idempotent rollback has empty payload") + } + if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { + return err + } + return nil + } + if logRow != nil { + defer func() { + if err != nil { + s.failOperation(tx, logRow, err) + } + }() + } + + rolled, rollbackErr := s.rollbackInternal(ctx, tx, req, flagGroupCode) + if rollbackErr != nil { + err = rollbackErr + return rollbackErr + } + *result = *rolled + + if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { + err = finishErr + return finishErr + } + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func (s *fifoStockV2Service) rollbackInternal( + ctx context.Context, + tx *gorm.DB, + req RollbackRequest, + flagGroupCode string, +) (*RollbackResult, error) { + usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, flagGroupCode, req.Usable.LegacyTypeKey) + if err != nil { + return nil, err + } + + allocations, err := s.loadActiveAllocations(tx, req.Usable.LegacyTypeKey, req.Usable.ID, req.ProductWarehouseID) + if err != nil { + return nil, err + } + if len(allocations) == 0 { + if req.ReleaseQty == nil { + if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil { + return nil, err + } + } + return &RollbackResult{}, nil + } + + stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, flagGroupCode) + if err != nil { + return nil, err + } + + target := 0.0 + for _, alloc := range allocations { + target += alloc.Qty + } + if req.ReleaseQty != nil { + if *req.ReleaseQty < 0 { + return nil, fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest) + } + target = *req.ReleaseQty + } + if nearlyZero(target) { + return &RollbackResult{}, nil + } + + result := &RollbackResult{Details: make([]AllocationDetail, 0)} + now := time.Now() + remaining := target + + for _, alloc := range allocations { + if remaining <= 0 { + break + } + portion := math.Min(remaining, alloc.Qty) + if nearlyZero(portion) { + continue + } + + if nearlyZero(alloc.Qty - portion) { + updates := map[string]any{ + "status": releasedAllocationStatus(), + "released_at": now, + "updated_at": now, + } + if strings.TrimSpace(req.Reason) != "" { + updates["note"] = req.Reason + } + if err := tx.Table("stock_allocations").Where("id = ?", alloc.ID).Updates(updates).Error; err != nil { + return nil, err + } + } else { + if err := tx.Table("stock_allocations"). + Where("id = ?", alloc.ID). + Updates(map[string]any{ + "qty": alloc.Qty - portion, + "updated_at": now, + }).Error; err != nil { + return nil, err + } + } + + stockableRule, ok := stockableRuleMap[alloc.StockableType] + if !ok { + return nil, fmt.Errorf("missing stockable route rule for type %s", alloc.StockableType) + } + if err := s.adjustStockableUsedQuantity(tx, stockableRule, alloc.StockableID, -portion); err != nil { + return nil, err + } + + result.ReleasedQty += portion + remaining -= portion + result.Details = append(result.Details, AllocationDetail{ + StockableType: alloc.StockableType, + StockableID: alloc.StockableID, + Qty: portion, + SortAt: alloc.CreatedAt, + }) + } + + if req.ReleaseQty != nil && remaining > 1e-6 { + return nil, fmt.Errorf("unable to release %.3f; only %.3f allocation exists", target, result.ReleasedQty) + } + + if req.ReleaseQty == nil { + if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil { + return nil, err + } + } else { + if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, -result.ReleasedQty, 0); err != nil { + return nil, err + } + } + + if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, result.ReleasedQty); err != nil { + return nil, err + } + + return result, nil +} + +func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error) { + if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 || req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" { + return nil, fmt.Errorf("%w: invalid reflow request", ErrInvalidRequest) + } + if req.DesiredQty < 0 { + return nil, fmt.Errorf("%w: desired qty must be >= 0", ErrInvalidRequest) + } + + result := &ReflowResult{} + err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { + if err := s.ensureStockAllocationColumns(tx); err != nil { + return err + } + if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil { + return err + } + + hash := requestHash(map[string]any{ + "flag_group_code": req.FlagGroupCode, + "product_warehouse_id": req.ProductWarehouseID, + "usable_type": req.Usable.LegacyTypeKey, + "usable_id": req.Usable.ID, + "desired_qty": req.DesiredQty, + "as_of": req.AsOf, + "allow_over_consume": req.AllowOverConsume, + }) + logRow, reused, err := s.beginOperation( + tx, + OperationReflow, + req.IdempotencyKey, + hash, + req.ProductWarehouseID, + req.FlagGroupCode, + req.Usable.LegacyTypeKey, + req.Usable.ID, + ) + if err != nil { + return err + } + if reused { + if len(logRow.ResultPayload) == 0 { + return fmt.Errorf("idempotent reflow has empty payload") + } + if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { + return err + } + return nil + } + if logRow != nil { + defer func() { + if err != nil { + s.failOperation(tx, logRow, err) + } + }() + } + + rollbackRes, rollbackErr := s.rollbackInternal(ctx, tx, RollbackRequest{ + ProductWarehouseID: req.ProductWarehouseID, + Usable: req.Usable, + ReleaseQty: nil, + Reason: "reflow reset", + }, req.FlagGroupCode) + if rollbackErr != nil { + err = rollbackErr + return rollbackErr + } + result.Rollback = *rollbackRes + + if req.DesiredQty > 0 { + allocateRes, allocateErr := s.allocateInternal(ctx, tx, AllocateRequest{ + FlagGroupCode: req.FlagGroupCode, + ProductWarehouseID: req.ProductWarehouseID, + Usable: req.Usable, + NeedQty: req.DesiredQty, + AllowOverConsume: req.AllowOverConsume, + AsOf: req.AsOf, + }) + if allocateErr != nil { + err = allocateErr + return allocateErr + } + result.Allocate = *allocateRes + } + + if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { + err = finishErr + return finishErr + } + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func (s *fifoStockV2Service) loadActiveAllocations( + tx *gorm.DB, + usableType string, + usableID uint, + productWarehouseID uint, +) ([]allocationRow, error) { + query := tx.Table("stock_allocations"). + Select("id, product_warehouse_id, stockable_type, stockable_id, usable_type, usable_id, qty, status, created_at"). + Where("usable_type = ? AND usable_id = ? AND status = ?", usableType, usableID, activeAllocationStatus()) + if productWarehouseID > 0 { + query = query.Where("product_warehouse_id = ?", productWarehouseID) + } + query = query.Order("created_at DESC, id DESC") + + var rows []allocationRow + if err := query.Find(&rows).Error; err != nil { + return nil, err + } + return rows, nil +} + +func (s *fifoStockV2Service) loadStockableRuleMap(ctx context.Context, tx *gorm.DB, flagGroupCode string) (map[string]routeRule, error) { + rules, err := s.loadRouteRules(ctx, tx, flagGroupCode, LaneStockable) + if err != nil { + return nil, err + } + m := make(map[string]routeRule, len(rules)) + for _, rule := range rules { + m[rule.LegacyTypeKey] = rule + } + return m, nil +} + +func (s *fifoStockV2Service) adjustStockableUsedQuantity(tx *gorm.DB, rule routeRule, sourceID uint, delta float64) error { + if nearlyZero(delta) || sourceID == 0 { + return nil + } + if rule.UsedQuantityCol == nil || strings.TrimSpace(*rule.UsedQuantityCol) == "" { + return nil + } + + usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol) + sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) + sourceTable, _ := mustSafeIdentifier(rule.SourceTable) + + expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usedCol) + return tx.Table(sourceTable). + Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). + Update(usedCol, gorm.Expr(expr, delta)).Error +} + +func (s *fifoStockV2Service) applyUsableDeltas(tx *gorm.DB, rule routeRule, sourceID uint, usageDelta, pendingDelta float64) error { + if sourceID == 0 || (nearlyZero(usageDelta) && nearlyZero(pendingDelta)) { + return nil + } + sourceTable, _ := mustSafeIdentifier(rule.SourceTable) + sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) + usageCol, _ := mustSafeIdentifier(rule.QuantityCol) + + updates := map[string]any{} + if !nearlyZero(usageDelta) { + expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usageCol) + updates[usageCol] = gorm.Expr(expr, usageDelta) + } + if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" && !nearlyZero(pendingDelta) { + pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) + expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", pendingCol) + updates[pendingCol] = gorm.Expr(expr, pendingDelta) + } + if len(updates) == 0 { + return nil + } + + return tx.Table(sourceTable). + Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). + Updates(updates).Error +} + +func (s *fifoStockV2Service) resetUsableQuantities(tx *gorm.DB, rule routeRule, sourceID uint) error { + if sourceID == 0 { + return nil + } + sourceTable, _ := mustSafeIdentifier(rule.SourceTable) + sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) + usageCol, _ := mustSafeIdentifier(rule.QuantityCol) + + updates := map[string]any{usageCol: 0} + if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" { + pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) + updates[pendingCol] = 0 + } + + return tx.Table(sourceTable). + Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). + Updates(updates).Error +} + +func (s *fifoStockV2Service) resolveRollbackFlagGroup(ctx context.Context, tx *gorm.DB, req RollbackRequest) (string, error) { + type row struct { + FlagGroupCode string `gorm:"column:flag_group_code"` + } + var latest row + err := tx.WithContext(ctx). + Table("stock_allocations"). + Select("flag_group_code"). + Where("usable_type = ? AND usable_id = ?", req.Usable.LegacyTypeKey, req.Usable.ID). + Where("engine_version = 'v2'"). + Where("flag_group_code IS NOT NULL AND flag_group_code <> ''"). + Order("id DESC"). + Limit(1). + Take(&latest).Error + if err == nil && strings.TrimSpace(latest.FlagGroupCode) != "" { + return latest.FlagGroupCode, nil + } + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return "", err + } + + var rules []routeRule + err = tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules"). + Where("is_active = TRUE"). + Where("lane = ?", string(LaneUsable)). + Where("legacy_type_key = ?", req.Usable.LegacyTypeKey). + Find(&rules).Error + if err != nil { + return "", err + } + if len(rules) == 0 { + return "", fmt.Errorf("cannot resolve flag group for usable type %s", req.Usable.LegacyTypeKey) + } + if len(rules) > 1 { + return "", fmt.Errorf("ambiguous rollback flag group for usable type %s", req.Usable.LegacyTypeKey) + } + return rules[0].FlagGroupCode, nil +} + +func (s *fifoStockV2Service) validateAllocateRequest(req AllocateRequest) error { + if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 { + return fmt.Errorf("%w: missing flag group or product warehouse", ErrInvalidRequest) + } + if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" { + return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest) + } + if req.NeedQty < 0 { + return fmt.Errorf("%w: need qty must be >= 0", ErrInvalidRequest) + } + return nil +} + +func (s *fifoStockV2Service) validateRollbackRequest(req RollbackRequest) error { + if req.ProductWarehouseID == 0 { + return fmt.Errorf("%w: product warehouse is required", ErrInvalidRequest) + } + if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" { + return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest) + } + if req.ReleaseQty != nil && *req.ReleaseQty < 0 { + return fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest) + } + return nil +} diff --git a/internal/common/service/fifo_stock_v2/config.go b/internal/common/service/fifo_stock_v2/config.go new file mode 100644 index 00000000..b8ce3526 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/config.go @@ -0,0 +1,170 @@ +package fifo_stock_v2 + +import ( + "context" + "fmt" + "strings" + + "gorm.io/gorm" +) + +type routeRule struct { + ID uint `gorm:"column:id"` + FlagGroupCode string `gorm:"column:flag_group_code"` + Lane string `gorm:"column:lane"` + FunctionCode string `gorm:"column:function_code"` + SourceTable string `gorm:"column:source_table"` + SourceIDColumn string `gorm:"column:source_id_column"` + ProductWarehouseCol string `gorm:"column:product_warehouse_col"` + QuantityCol string `gorm:"column:quantity_col"` + UsedQuantityCol *string `gorm:"column:used_quantity_col"` + PendingQuantityCol *string `gorm:"column:pending_quantity_col"` + ScopeSQL *string `gorm:"column:scope_sql"` + LegacyTypeKey string `gorm:"column:legacy_type_key"` + AllowPendingDefault bool `gorm:"column:allow_pending_default"` +} + +type traitRule struct { + ID uint `gorm:"column:id"` + SourceTable string `gorm:"column:source_table"` + Lane string `gorm:"column:lane"` + DateTable *string `gorm:"column:date_table"` + DateJoinLeftCol *string `gorm:"column:date_join_left_col"` + DateJoinRightCol *string `gorm:"column:date_join_right_col"` + DateColumn string `gorm:"column:date_column"` + FallbackDateColumn *string `gorm:"column:fallback_date_column"` + SortPriority int `gorm:"column:sort_priority"` + IDColumn string `gorm:"column:id_column"` +} + +func (s *fifoStockV2Service) loadRouteRules(ctx context.Context, tx *gorm.DB, flagGroupCode string, lane Lane) ([]routeRule, error) { + var rules []routeRule + err := tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules"). + Where("is_active = TRUE"). + Where("flag_group_code = ?", flagGroupCode). + Where("lane = ?", string(lane)). + Order("id ASC"). + Find(&rules).Error + if err != nil { + return nil, err + } + for _, rule := range rules { + if err := validateRouteRule(rule); err != nil { + return nil, err + } + } + return rules, nil +} + +func (s *fifoStockV2Service) loadRouteRuleByLegacyType( + ctx context.Context, + tx *gorm.DB, + lane Lane, + flagGroupCode string, + legacyTypeKey string, +) (*routeRule, error) { + var rule routeRule + err := tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules"). + Where("is_active = TRUE"). + Where("lane = ?", string(lane)). + Where("flag_group_code = ?", flagGroupCode). + Where("legacy_type_key = ?", legacyTypeKey). + Order("id ASC"). + Limit(1). + Take(&rule).Error + if err != nil { + return nil, err + } + if err := validateRouteRule(rule); err != nil { + return nil, err + } + return &rule, nil +} + +func (s *fifoStockV2Service) loadTraitMap( + ctx context.Context, + tx *gorm.DB, + lane Lane, + sourceTables []string, +) (map[string]traitRule, error) { + if len(sourceTables) == 0 { + return map[string]traitRule{}, nil + } + + var traits []traitRule + err := tx.WithContext(ctx). + Table("fifo_stock_v2_traits"). + Where("is_active = TRUE"). + Where("lane = ?", string(lane)). + Where("source_table IN ?", sourceTables). + Find(&traits).Error + if err != nil { + return nil, err + } + + out := make(map[string]traitRule, len(traits)) + for _, tr := range traits { + if err := validateTraitRule(tr); err != nil { + return nil, err + } + out[tr.SourceTable] = tr + } + return out, nil +} + +func validateRouteRule(rule routeRule) error { + fields := []string{rule.SourceTable, rule.SourceIDColumn, rule.ProductWarehouseCol, rule.QuantityCol} + for _, value := range fields { + if _, err := mustSafeIdentifier(value); err != nil { + return err + } + } + if rule.UsedQuantityCol != nil { + if _, err := mustSafeIdentifier(*rule.UsedQuantityCol); err != nil { + return err + } + } + if rule.PendingQuantityCol != nil { + if _, err := mustSafeIdentifier(*rule.PendingQuantityCol); err != nil { + return err + } + } + if strings.TrimSpace(rule.LegacyTypeKey) == "" { + return fmt.Errorf("route rule has empty legacy type key") + } + return nil +} + +func validateTraitRule(rule traitRule) error { + if _, err := mustSafeIdentifier(rule.SourceTable); err != nil { + return err + } + if _, err := mustSafeIdentifier(rule.DateColumn); err != nil { + return err + } + if _, err := mustSafeIdentifier(rule.IDColumn); err != nil { + return err + } + if rule.DateTable != nil { + if _, err := mustSafeIdentifier(*rule.DateTable); err != nil { + return err + } + if rule.DateJoinLeftCol == nil || rule.DateJoinRightCol == nil { + return fmt.Errorf("trait %s requires date join columns", rule.SourceTable) + } + if _, err := mustSafeIdentifier(*rule.DateJoinLeftCol); err != nil { + return err + } + if _, err := mustSafeIdentifier(*rule.DateJoinRightCol); err != nil { + return err + } + } + if rule.FallbackDateColumn != nil { + if _, err := mustSafeIdentifier(*rule.FallbackDateColumn); err != nil { + return err + } + } + return nil +} diff --git a/internal/common/service/fifo_stock_v2/errors.go b/internal/common/service/fifo_stock_v2/errors.go new file mode 100644 index 00000000..6c18495b --- /dev/null +++ b/internal/common/service/fifo_stock_v2/errors.go @@ -0,0 +1,8 @@ +package fifo_stock_v2 + +import "errors" + +var ( + ErrInvalidRequest = errors.New("invalid fifo stock v2 request") + ErrInsufficientStock = errors.New("insufficient stock") +) diff --git a/internal/common/service/fifo_stock_v2/gather.go b/internal/common/service/fifo_stock_v2/gather.go new file mode 100644 index 00000000..dd794d12 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/gather.go @@ -0,0 +1,268 @@ +package fifo_stock_v2 + +import ( + "context" + "fmt" + "strings" + "time" + + entity "gitlab.com/mbugroup/lti-api.git/internal/entities" + "gorm.io/gorm" +) + +type gatherSQLRow struct { + SourceTable string `gorm:"column:source_table"` + LegacyTypeKey string `gorm:"column:legacy_type_key"` + FunctionCode string `gorm:"column:function_code"` + SourceID uint `gorm:"column:source_id"` + ProductWarehouseID uint `gorm:"column:product_warehouse_id"` + SortAt time.Time `gorm:"column:sort_at"` + SortPriority int `gorm:"column:sort_priority"` + Quantity float64 `gorm:"column:quantity"` + UsedQuantity float64 `gorm:"column:used_quantity"` + PendingQuantity float64 `gorm:"column:pending_quantity"` + AvailableQuantity float64 `gorm:"column:available_quantity"` +} + +func (s *fifoStockV2Service) Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error) { + if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 { + return nil, fmt.Errorf("%w: flag group and product warehouse are required", ErrInvalidRequest) + } + if req.Lane != LaneStockable && req.Lane != LaneUsable { + return nil, fmt.Errorf("%w: unsupported lane %q", ErrInvalidRequest, req.Lane) + } + + var out []GatherRow + err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { + rows, err := s.gatherRows(ctx, tx, req) + if err != nil { + return err + } + out = rows + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *fifoStockV2Service) gatherRows(ctx context.Context, tx *gorm.DB, req GatherRequest) ([]GatherRow, error) { + rules, err := s.loadRouteRules(ctx, tx, req.FlagGroupCode, req.Lane) + if err != nil { + return nil, err + } + if len(rules) == 0 { + return []GatherRow{}, nil + } + + tables := make([]string, 0, len(rules)) + for _, rule := range rules { + tables = append(tables, rule.SourceTable) + } + + traits, err := s.loadTraitMap(ctx, tx, req.Lane, tables) + if err != nil { + return nil, err + } + + subqueries := make([]string, 0, len(rules)) + args := make([]any, 0, len(rules)*10) + + for _, rule := range rules { + trait, ok := traits[rule.SourceTable] + if !ok { + return nil, fmt.Errorf("missing trait for table %s lane %s", rule.SourceTable, req.Lane) + } + subSQL, subArgs, err := s.buildGatherSubquery(rule, trait, req) + if err != nil { + return nil, err + } + subqueries = append(subqueries, subSQL) + args = append(args, subArgs...) + } + + if len(subqueries) == 0 { + return []GatherRow{}, nil + } + + limit := req.Limit + if limit <= 0 { + limit = s.defaultGatherLimit + } + if limit <= 0 { + limit = 1000 + } + + query := "SELECT * FROM (" + strings.Join(subqueries, " UNION ALL ") + ") AS g" + if req.AfterSortAt != nil { + query += ` + WHERE + (g.sort_at > ?) + OR (g.sort_at = ? AND g.source_table > ?) + OR (g.sort_at = ? AND g.source_table = ? AND g.source_id > ?) + ` + args = append(args, + *req.AfterSortAt, + *req.AfterSortAt, req.AfterSourceTable, + *req.AfterSortAt, req.AfterSourceTable, req.AfterSourceID, + ) + } + query += " ORDER BY g.sort_at ASC, g.sort_priority ASC, g.source_table ASC, g.source_id ASC LIMIT ?" + args = append(args, limit) + + var rows []gatherSQLRow + if err := tx.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + + out := make([]GatherRow, 0, len(rows)) + for _, row := range rows { + out = append(out, GatherRow{ + Ref: Ref{ + Table: row.SourceTable, + ID: row.SourceID, + LegacyTypeKey: row.LegacyTypeKey, + FunctionCode: row.FunctionCode, + }, + FlagGroupCode: req.FlagGroupCode, + ProductWarehouseID: row.ProductWarehouseID, + SortAt: row.SortAt, + SortPriority: row.SortPriority, + Quantity: row.Quantity, + UsedQuantity: row.UsedQuantity, + PendingQuantity: row.PendingQuantity, + AvailableQuantity: row.AvailableQuantity, + SourceTable: row.SourceTable, + SourceID: row.SourceID, + }) + } + + return out, nil +} + +func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule, req GatherRequest) (string, []any, error) { + sourceTable, _ := mustSafeIdentifier(rule.SourceTable) + sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) + productWarehouseCol, _ := mustSafeIdentifier(rule.ProductWarehouseCol) + quantityCol, _ := mustSafeIdentifier(rule.QuantityCol) + + baseQtyExpr := fmt.Sprintf("COALESCE(src.%s,0)::numeric", quantityCol) + usedExpr := "0::numeric" + pendingExpr := "0::numeric" + availableExpr := baseQtyExpr + extraArgs := make([]any, 0, 1) + + if req.Lane == LaneStockable { + if rule.UsedQuantityCol != nil && strings.TrimSpace(*rule.UsedQuantityCol) != "" { + usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol) + usedExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", usedCol) + } else { + usedExpr = fmt.Sprintf( + "(SELECT COALESCE(SUM(sa.qty),0)::numeric FROM stock_allocations sa WHERE sa.stockable_type = ? AND sa.stockable_id = src.%s AND sa.status = '%s')", + sourceIDCol, + activeAllocationStatus(), + ) + extraArgs = append(extraArgs, rule.LegacyTypeKey) + } + availableExpr = fmt.Sprintf("(%s - %s)", baseQtyExpr, usedExpr) + } else { + if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" { + pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) + pendingExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", pendingCol) + } + availableExpr = baseQtyExpr + } + + sortExpr, joinClause, err := buildSortExpr(trait) + if err != nil { + return "", nil, err + } + + whereParts := []string{ + fmt.Sprintf("src.%s = ?", productWarehouseCol), + fmt.Sprintf(`EXISTS ( + SELECT 1 + FROM product_warehouses pw + JOIN flags f ON f.flagable_type = ? AND f.flagable_id = pw.product_id + JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE + WHERE pw.id = src.%s AND fm.flag_group_code = ? + )`, productWarehouseCol), + } + + if req.Lane == LaneStockable { + whereParts = append(whereParts, fmt.Sprintf("%s > 0", availableExpr)) + } + + if req.AsOf != nil { + whereParts = append(whereParts, fmt.Sprintf("%s <= ?", sortExpr)) + } + + if rule.ScopeSQL != nil && strings.TrimSpace(*rule.ScopeSQL) != "" { + whereParts = append(whereParts, fmt.Sprintf("(%s)", strings.TrimSpace(*rule.ScopeSQL))) + } + + subquery := fmt.Sprintf(` + SELECT + ? AS source_table, + ? AS legacy_type_key, + ? AS function_code, + src.%s AS source_id, + src.%s AS product_warehouse_id, + %s AS sort_at, + ? AS sort_priority, + %s AS quantity, + %s AS used_quantity, + %s AS pending_quantity, + %s AS available_quantity + FROM %s src + %s + WHERE %s + `, sourceIDCol, productWarehouseCol, sortExpr, baseQtyExpr, usedExpr, pendingExpr, availableExpr, sourceTable, joinClause, strings.Join(whereParts, " AND ")) + + args := []any{ + rule.SourceTable, + rule.LegacyTypeKey, + rule.FunctionCode, + trait.SortPriority, + } + args = append(args, extraArgs...) + args = append(args, + req.ProductWarehouseID, + entity.FlagableTypeProduct, + req.FlagGroupCode, + ) + + if req.AsOf != nil { + args = append(args, *req.AsOf) + } + + return subquery, args, nil +} + +func buildSortExpr(trait traitRule) (string, string, error) { + dateCol, _ := mustSafeIdentifier(trait.DateColumn) + idCol, _ := mustSafeIdentifier(trait.IDColumn) + _ = idCol + + joinClause := "" + sortBase := fmt.Sprintf("src.%s", dateCol) + if trait.DateTable != nil && strings.TrimSpace(*trait.DateTable) != "" { + dateTable, _ := mustSafeIdentifier(*trait.DateTable) + if trait.DateJoinLeftCol == nil || trait.DateJoinRightCol == nil { + return "", "", fmt.Errorf("trait %s requires date join columns", trait.SourceTable) + } + leftCol, _ := mustSafeIdentifier(*trait.DateJoinLeftCol) + rightCol, _ := mustSafeIdentifier(*trait.DateJoinRightCol) + joinClause = fmt.Sprintf("LEFT JOIN %s dt ON src.%s = dt.%s", dateTable, leftCol, rightCol) + sortBase = fmt.Sprintf("dt.%s", dateCol) + } + + if trait.FallbackDateColumn != nil && strings.TrimSpace(*trait.FallbackDateColumn) != "" { + fallbackCol, _ := mustSafeIdentifier(*trait.FallbackDateColumn) + sortBase = fmt.Sprintf("COALESCE(%s, src.%s)", sortBase, fallbackCol) + } + + sortExpr := fmt.Sprintf("COALESCE(%s, '1970-01-01 00:00:00+00'::timestamptz)", sortBase) + return sortExpr, joinClause, nil +} diff --git a/internal/common/service/fifo_stock_v2/recalculate.go b/internal/common/service/fifo_stock_v2/recalculate.go new file mode 100644 index 00000000..a94407e4 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/recalculate.go @@ -0,0 +1,177 @@ +package fifo_stock_v2 + +import ( + "context" + "encoding/json" + "fmt" + "math" + "time" + + "gorm.io/gorm" +) + +func (s *fifoStockV2Service) Recalculate(ctx context.Context, req RecalculateRequest) (*RecalculateResult, error) { + result := &RecalculateResult{Drifts: make([]WarehouseDrift, 0)} + + err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { + hash := requestHash(map[string]any{ + "product_warehouse_ids": req.ProductWarehouseIDs, + "flag_group_codes": req.FlagGroupCodes, + "as_of": req.AsOf, + "fix_drift": req.FixDrift, + }) + logRow, reused, err := s.beginOperation( + tx, + OperationRecalculate, + req.IdempotencyKey, + hash, + 0, + "RECALCULATE", + "", + 0, + ) + if err != nil { + return err + } + if reused { + if len(logRow.ResultPayload) == 0 { + return fmt.Errorf("idempotent recalculate has empty payload") + } + if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { + return err + } + return nil + } + if logRow != nil { + defer func() { + if err != nil { + s.failOperation(tx, logRow, err) + } + }() + } + + warehouseIDs, err := s.resolveRecalculateWarehouseIDs(ctx, tx, req.ProductWarehouseIDs) + if err != nil { + return err + } + groupCodes, err := s.resolveRecalculateGroupCodes(ctx, tx, req.FlagGroupCodes) + if err != nil { + return err + } + + for _, warehouseID := range warehouseIDs { + expected := 0.0 + for _, flagGroup := range groupCodes { + available, calcErr := s.calculateWarehouseAvailableForGroup(ctx, tx, warehouseID, flagGroup, req.AsOf) + if calcErr != nil { + return calcErr + } + expected += available + } + + actual, actualErr := s.loadWarehouseQty(ctx, tx, warehouseID) + if actualErr != nil { + return actualErr + } + + delta := expected - actual + result.Checked++ + if math.Abs(delta) < 1e-6 { + continue + } + + drift := WarehouseDrift{ + ProductWarehouseID: warehouseID, + ExpectedQty: expected, + ActualQty: actual, + Delta: delta, + } + result.Drifts = append(result.Drifts, drift) + + if req.FixDrift { + if err := s.adjustProductWarehouseQty(tx, warehouseID, delta); err != nil { + return err + } + result.Fixed++ + } + } + + if err := s.finishOperation(tx, logRow, result); err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func (s *fifoStockV2Service) resolveRecalculateWarehouseIDs(ctx context.Context, tx *gorm.DB, provided []uint) ([]uint, error) { + if len(provided) > 0 { + return provided, nil + } + var ids []uint + err := tx.WithContext(ctx).Table("product_warehouses").Select("id").Order("id ASC").Scan(&ids).Error + if err != nil { + return nil, err + } + return ids, nil +} + +func (s *fifoStockV2Service) resolveRecalculateGroupCodes(ctx context.Context, tx *gorm.DB, provided []string) ([]string, error) { + if len(provided) > 0 { + return provided, nil + } + var groups []string + err := tx.WithContext(ctx). + Table("fifo_stock_v2_flag_groups"). + Select("code"). + Where("is_active = TRUE"). + Order("priority ASC, code ASC"). + Scan(&groups).Error + if err != nil { + return nil, err + } + return groups, nil +} + +func (s *fifoStockV2Service) calculateWarehouseAvailableForGroup( + ctx context.Context, + tx *gorm.DB, + warehouseID uint, + flagGroupCode string, + asOf *time.Time, +) (float64, error) { + rows, err := s.gatherRows(ctx, tx, GatherRequest{ + FlagGroupCode: flagGroupCode, + Lane: LaneStockable, + ProductWarehouseID: warehouseID, + AsOf: asOf, + Limit: 50000, + }) + if err != nil { + return 0, err + } + total := 0.0 + for _, row := range rows { + total += row.AvailableQuantity + } + return total, nil +} + +func (s *fifoStockV2Service) loadWarehouseQty(ctx context.Context, tx *gorm.DB, warehouseID uint) (float64, error) { + type row struct { + Qty float64 `gorm:"column:qty"` + } + var out row + err := tx.WithContext(ctx). + Table("product_warehouses"). + Select("COALESCE(qty,0) AS qty"). + Where("id = ?", warehouseID). + Take(&out).Error + if err != nil { + return 0, err + } + return out.Qty, nil +} diff --git a/internal/common/service/fifo_stock_v2/service.go b/internal/common/service/fifo_stock_v2/service.go new file mode 100644 index 00000000..25578d30 --- /dev/null +++ b/internal/common/service/fifo_stock_v2/service.go @@ -0,0 +1,265 @@ +package fifo_stock_v2 + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "math" + "regexp" + "strings" + + "github.com/sirupsen/logrus" + entity "gitlab.com/mbugroup/lti-api.git/internal/entities" + "gorm.io/gorm" +) + +var identifierPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + +type fifoStockV2Service struct { + db *gorm.DB + logger *logrus.Logger + defaultGatherLimit int +} + +func NewService(db *gorm.DB, logger *logrus.Logger) Service { + if logger == nil { + logger = logrus.StandardLogger() + } + + return &fifoStockV2Service{ + db: db, + logger: logger, + defaultGatherLimit: 1000, + } +} + +func (s *fifoStockV2Service) withTransaction( + ctx context.Context, + tx *gorm.DB, + fn func(*gorm.DB) error, +) error { + if tx != nil { + return fn(tx.WithContext(ctx)) + } + return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error { + return fn(inner) + }) +} + +func isSafeIdentifier(v string) bool { + return identifierPattern.MatchString(strings.TrimSpace(v)) +} + +func mustSafeIdentifier(v string) (string, error) { + v = strings.TrimSpace(v) + if !isSafeIdentifier(v) { + return "", fmt.Errorf("unsafe identifier: %s", v) + } + return v, nil +} + +func requestHash(v any) string { + payload, _ := json.Marshal(v) + sum := sha256.Sum256(payload) + return hex.EncodeToString(sum[:]) +} + +func shardLockKey(flagGroupCode string, productWarehouseID uint) int64 { + h := fnv.New64a() + _, _ = h.Write([]byte(strings.TrimSpace(strings.ToUpper(flagGroupCode)))) + _, _ = h.Write([]byte("|")) + _, _ = h.Write([]byte(fmt.Sprintf("%d", productWarehouseID))) + return int64(h.Sum64()) +} + +func (s *fifoStockV2Service) lockShard(tx *gorm.DB, flagGroupCode string, productWarehouseID uint) error { + if strings.TrimSpace(flagGroupCode) == "" || productWarehouseID == 0 { + return fmt.Errorf("lock shard requires flag group and product warehouse") + } + return tx.Exec("SELECT pg_advisory_xact_lock(?)", shardLockKey(flagGroupCode, productWarehouseID)).Error +} + +type operationLogRow struct { + ID uint `gorm:"column:id"` + Status string `gorm:"column:status"` + RequestHash string `gorm:"column:request_hash"` + ResultPayload json.RawMessage `gorm:"column:result_payload"` +} + +func (s *fifoStockV2Service) beginOperation( + tx *gorm.DB, + op Operation, + idempotencyKey string, + requestHashValue string, + productWarehouseID uint, + flagGroupCode string, + usableType string, + usableID uint, +) (*operationLogRow, bool, error) { + if strings.TrimSpace(idempotencyKey) == "" { + return nil, false, nil + } + + inserted := operationLogRow{} + insertSQL := ` + INSERT INTO fifo_stock_v2_operation_log + (idempotency_key, operation, product_warehouse_id, flag_group_code, usable_type, usable_id, request_hash, status, created_at) + VALUES (?, ?, ?, ?, NULLIF(?, ''), NULLIF(?, 0), ?, 'RUNNING', NOW()) + ON CONFLICT (idempotency_key, operation) DO NOTHING + RETURNING id, status, request_hash + ` + if err := tx.Raw(insertSQL, + idempotencyKey, + string(op), + productWarehouseID, + flagGroupCode, + usableType, + usableID, + requestHashValue, + ).Scan(&inserted).Error; err != nil { + return nil, false, err + } + if inserted.ID != 0 { + return &inserted, false, nil + } + + existing := operationLogRow{} + if err := tx.Table("fifo_stock_v2_operation_log"). + Select("id, status, request_hash, result_payload"). + Where("idempotency_key = ? AND operation = ?", idempotencyKey, string(op)). + Take(&existing).Error; err != nil { + return nil, false, err + } + + if existing.RequestHash != requestHashValue { + return nil, false, fmt.Errorf("idempotency key %s reused with different payload", idempotencyKey) + } + + switch strings.ToUpper(existing.Status) { + case "DONE": + return &existing, true, nil + case "RUNNING": + return nil, false, fmt.Errorf("operation %s with idempotency key %s is still running", op, idempotencyKey) + case "FAILED": + if err := tx.Table("fifo_stock_v2_operation_log"). + Where("id = ?", existing.ID). + Updates(map[string]any{ + "status": "RUNNING", + "error_text": nil, + "finished_at": nil, + }).Error; err != nil { + return nil, false, err + } + existing.Status = "RUNNING" + return &existing, false, nil + default: + return nil, false, fmt.Errorf("unknown operation status: %s", existing.Status) + } +} + +func (s *fifoStockV2Service) finishOperation(tx *gorm.DB, logRow *operationLogRow, payload any) error { + if logRow == nil || logRow.ID == 0 { + return nil + } + + encoded, err := json.Marshal(payload) + if err != nil { + return err + } + + return tx.Table("fifo_stock_v2_operation_log"). + Where("id = ?", logRow.ID). + Updates(map[string]any{ + "status": "DONE", + "result_payload": encoded, + "finished_at": gorm.Expr("NOW()"), + }).Error +} + +func (s *fifoStockV2Service) failOperation(tx *gorm.DB, logRow *operationLogRow, failure error) { + if logRow == nil || logRow.ID == 0 || failure == nil { + return + } + _ = tx.Table("fifo_stock_v2_operation_log"). + Where("id = ?", logRow.ID). + Updates(map[string]any{ + "status": "FAILED", + "error_text": failure.Error(), + "finished_at": gorm.Expr("NOW()"), + }).Error +} + +func (s *fifoStockV2Service) resolveOverConsume( + tx *gorm.DB, + flagGroupCode string, + functionCode string, + lane Lane, + defaultValue bool, +) (bool, error) { + type row struct { + Allow bool `gorm:"column:allow_overconsume"` + } + selected := row{} + err := tx.Table("fifo_stock_v2_overconsume_rules"). + Select("allow_overconsume"). + Where("is_active = TRUE"). + Where("lane = ?", string(lane)). + Where("(flag_group_code IS NULL OR flag_group_code = ?)", flagGroupCode). + Where("(function_code IS NULL OR function_code = ?)", functionCode). + Order("CASE WHEN flag_group_code IS NULL THEN 1 ELSE 0 END ASC"). + Order("CASE WHEN function_code IS NULL THEN 1 ELSE 0 END ASC"). + Order("priority ASC, id ASC"). + Limit(1). + Take(&selected).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return defaultValue, nil + } + return false, err + } + return selected.Allow, nil +} + +func (s *fifoStockV2Service) adjustProductWarehouseQty(tx *gorm.DB, productWarehouseID uint, delta float64) error { + if productWarehouseID == 0 || delta == 0 { + return nil + } + return tx.Table("product_warehouses"). + Where("id = ?", productWarehouseID). + Update("qty", gorm.Expr("COALESCE(qty,0) + ?", delta)).Error +} + +func nearlyZero(v float64) bool { + return math.Abs(v) < 1e-6 +} + +func (s *fifoStockV2Service) ensureStockAllocationColumns(tx *gorm.DB) error { + checkCols := []string{"engine_version", "flag_group_code", "function_code", "idempotency_key"} + for _, col := range checkCols { + var count int64 + err := tx.Raw(` + SELECT COUNT(1) + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = 'stock_allocations' AND column_name = ? + `, col).Scan(&count).Error + if err != nil { + return err + } + if count == 0 { + return fmt.Errorf("stock_allocations.%s does not exist, run fifo_stock_v2 migration first", col) + } + } + return nil +} + +func activeAllocationStatus() string { + return entity.StockAllocationStatusActive +} + +func releasedAllocationStatus() string { + return entity.StockAllocationStatusReleased +} diff --git a/internal/common/service/fifo_stock_v2/types.go b/internal/common/service/fifo_stock_v2/types.go new file mode 100644 index 00000000..3879201e --- /dev/null +++ b/internal/common/service/fifo_stock_v2/types.go @@ -0,0 +1,142 @@ +package fifo_stock_v2 + +import ( + "context" + "time" + + "gorm.io/gorm" +) + +type Lane string + +const ( + LaneStockable Lane = "STOCKABLE" + LaneUsable Lane = "USABLE" +) + +type Operation string + +const ( + OperationAllocate Operation = "ALLOCATE" + OperationRollback Operation = "ROLLBACK" + OperationReflow Operation = "REFLOW" + OperationRecalculate Operation = "RECALCULATE" +) + +type Ref struct { + Table string + ID uint + LegacyTypeKey string + FunctionCode string +} + +type GatherRequest struct { + FlagGroupCode string + Lane Lane + ProductWarehouseID uint + AsOf *time.Time + Limit int + AfterSortAt *time.Time + AfterSourceTable string + AfterSourceID uint + ForUpdate bool + Tx *gorm.DB +} + +type GatherRow struct { + Ref Ref + FlagGroupCode string + ProductWarehouseID uint + SortAt time.Time + SortPriority int + Quantity float64 + UsedQuantity float64 + PendingQuantity float64 + AvailableQuantity float64 + SourceTable string + SourceID uint +} + +type AllocateRequest struct { + FlagGroupCode string + ProductWarehouseID uint + Usable Ref + NeedQty float64 + AllowOverConsume *bool + IdempotencyKey string + AsOf *time.Time + Tx *gorm.DB +} + +type AllocationDetail struct { + StockableType string + StockableID uint + Qty float64 + SortAt time.Time +} + +type AllocateResult struct { + AllocatedQty float64 + PendingQty float64 + Details []AllocationDetail +} + +type RollbackRequest struct { + ProductWarehouseID uint + Usable Ref + ReleaseQty *float64 + Reason string + IdempotencyKey string + Tx *gorm.DB +} + +type RollbackResult struct { + ReleasedQty float64 + Details []AllocationDetail +} + +type ReflowRequest struct { + FlagGroupCode string + ProductWarehouseID uint + Usable Ref + DesiredQty float64 + AllowOverConsume *bool + IdempotencyKey string + AsOf *time.Time + Tx *gorm.DB +} + +type ReflowResult struct { + Rollback RollbackResult + Allocate AllocateResult +} + +type RecalculateRequest struct { + ProductWarehouseIDs []uint + FlagGroupCodes []string + AsOf *time.Time + FixDrift bool + IdempotencyKey string + Tx *gorm.DB +} + +type WarehouseDrift struct { + ProductWarehouseID uint + ExpectedQty float64 + ActualQty float64 + Delta float64 +} + +type RecalculateResult struct { + Checked int + Fixed int + Drifts []WarehouseDrift +} + +type Service interface { + Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error) + Allocate(ctx context.Context, req AllocateRequest) (*AllocateResult, error) + Rollback(ctx context.Context, req RollbackRequest) (*RollbackResult, error) + Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error) + Recalculate(ctx context.Context, req RecalculateRequest) (*RecalculateResult, error) +} diff --git a/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.down.sql b/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.down.sql new file mode 100644 index 00000000..0aee55e2 --- /dev/null +++ b/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.down.sql @@ -0,0 +1,24 @@ +BEGIN; + +DROP INDEX IF EXISTS idx_stock_allocations_idempotency; +DROP INDEX IF EXISTS idx_stock_allocations_flag_group; +DROP INDEX IF EXISTS idx_stock_allocations_engine_version; + +ALTER TABLE stock_allocations + DROP COLUMN IF EXISTS idempotency_key, + DROP COLUMN IF EXISTS reflow_run_id, + DROP COLUMN IF EXISTS function_code, + DROP COLUMN IF EXISTS flag_group_code, + DROP COLUMN IF EXISTS engine_version; + +DROP TABLE IF EXISTS fifo_stock_v2_shadow_allocations; +DROP TABLE IF EXISTS fifo_stock_v2_reflow_checkpoints; +DROP TABLE IF EXISTS fifo_stock_v2_reflow_runs; +DROP TABLE IF EXISTS fifo_stock_v2_operation_log; +DROP TABLE IF EXISTS fifo_stock_v2_overconsume_rules; +DROP TABLE IF EXISTS fifo_stock_v2_route_rules; +DROP TABLE IF EXISTS fifo_stock_v2_traits; +DROP TABLE IF EXISTS fifo_stock_v2_flag_members; +DROP TABLE IF EXISTS fifo_stock_v2_flag_groups; + +COMMIT; diff --git a/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.up.sql b/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.up.sql new file mode 100644 index 00000000..24ae2412 --- /dev/null +++ b/internal/database/migrations/20260218090000_create_fifo_stock_v2_core.up.sql @@ -0,0 +1,151 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_flag_groups ( + code VARCHAR(64) PRIMARY KEY, + name VARCHAR(128) NOT NULL, + priority INT NOT NULL DEFAULT 100, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_flag_members ( + flag_name VARCHAR(64) PRIMARY KEY, + flag_group_code VARCHAR(64) NOT NULL REFERENCES fifo_stock_v2_flag_groups(code), + priority INT NOT NULL DEFAULT 100, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_traits ( + id BIGSERIAL PRIMARY KEY, + source_table VARCHAR(64) NOT NULL, + lane VARCHAR(16) NOT NULL CHECK (lane IN ('STOCKABLE', 'USABLE')), + date_table VARCHAR(64) NULL, + date_join_left_col VARCHAR(64) NULL, + date_join_right_col VARCHAR(64) NULL, + date_column VARCHAR(64) NOT NULL, + fallback_date_column VARCHAR(64) NULL, + sort_priority INT NOT NULL DEFAULT 100, + id_column VARCHAR(64) NOT NULL DEFAULT 'id', + is_active BOOLEAN NOT NULL DEFAULT TRUE, + UNIQUE (source_table, lane) +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_route_rules ( + id BIGSERIAL PRIMARY KEY, + flag_group_code VARCHAR(64) NOT NULL REFERENCES fifo_stock_v2_flag_groups(code), + lane VARCHAR(16) NOT NULL CHECK (lane IN ('STOCKABLE', 'USABLE')), + function_code VARCHAR(64) NOT NULL, + source_table VARCHAR(64) NOT NULL, + source_id_column VARCHAR(64) NOT NULL DEFAULT 'id', + product_warehouse_col VARCHAR(64) NOT NULL, + quantity_col VARCHAR(64) NOT NULL, + used_quantity_col VARCHAR(64) NULL, + pending_quantity_col VARCHAR(64) NULL, + scope_sql TEXT NULL, + legacy_type_key VARCHAR(100) NOT NULL, + allow_pending_default BOOLEAN NOT NULL DEFAULT TRUE, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (flag_group_code, lane, function_code, source_table) +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_overconsume_rules ( + id BIGSERIAL PRIMARY KEY, + flag_group_code VARCHAR(64) NULL REFERENCES fifo_stock_v2_flag_groups(code), + function_code VARCHAR(64) NULL, + lane VARCHAR(16) NOT NULL DEFAULT 'USABLE' CHECK (lane IN ('STOCKABLE', 'USABLE')), + allow_overconsume BOOLEAN NOT NULL, + priority INT NOT NULL DEFAULT 100, + reason TEXT NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_operation_log ( + id BIGSERIAL PRIMARY KEY, + idempotency_key VARCHAR(128) NOT NULL, + operation VARCHAR(16) NOT NULL CHECK (operation IN ('ALLOCATE', 'ROLLBACK', 'REFLOW', 'RECALCULATE')), + product_warehouse_id BIGINT NOT NULL, + flag_group_code VARCHAR(64) NOT NULL, + usable_type VARCHAR(100) NULL, + usable_id BIGINT NULL, + request_hash VARCHAR(64) NOT NULL, + status VARCHAR(16) NOT NULL CHECK (status IN ('RUNNING', 'DONE', 'FAILED')), + result_payload JSONB NULL, + error_text TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ NULL, + UNIQUE (idempotency_key, operation) +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_reflow_runs ( + id BIGSERIAL PRIMARY KEY, + mode VARCHAR(16) NOT NULL CHECK (mode IN ('DRY_RUN', 'APPLY')), + status VARCHAR(16) NOT NULL CHECK (status IN ('RUNNING', 'PAUSED', 'DONE', 'FAILED', 'CANCELLED')), + as_of TIMESTAMPTZ NULL, + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ NULL, + total_shards INT NOT NULL DEFAULT 0, + processed_shards INT NOT NULL DEFAULT 0, + processed_rows BIGINT NOT NULL DEFAULT 0, + mismatch_rows BIGINT NOT NULL DEFAULT 0, + created_by BIGINT NULL, + note TEXT NULL +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_reflow_checkpoints ( + id BIGSERIAL PRIMARY KEY, + run_id BIGINT NOT NULL REFERENCES fifo_stock_v2_reflow_runs(id) ON DELETE CASCADE, + flag_group_code VARCHAR(64) NOT NULL, + product_warehouse_id BIGINT NOT NULL, + last_sort_at TIMESTAMPTZ NULL, + last_source_table VARCHAR(64) NULL, + last_source_id BIGINT NULL, + status VARCHAR(16) NOT NULL CHECK (status IN ('PENDING', 'RUNNING', 'DONE', 'FAILED')) DEFAULT 'PENDING', + retry_count INT NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (run_id, flag_group_code, product_warehouse_id) +); + +CREATE TABLE IF NOT EXISTS fifo_stock_v2_shadow_allocations ( + id BIGSERIAL PRIMARY KEY, + run_id BIGINT NOT NULL REFERENCES fifo_stock_v2_reflow_runs(id) ON DELETE CASCADE, + product_warehouse_id BIGINT NOT NULL, + stockable_type VARCHAR(100) NOT NULL, + stockable_id BIGINT NOT NULL, + usable_type VARCHAR(100) NOT NULL, + usable_id BIGINT NOT NULL, + qty NUMERIC(15,3) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + sort_at TIMESTAMPTZ NULL, + source_table VARCHAR(64) NULL, + source_id BIGINT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_fifo_v2_shadow_run_usable + ON fifo_stock_v2_shadow_allocations(run_id, usable_type, usable_id); + +CREATE INDEX IF NOT EXISTS idx_fifo_v2_shadow_run_stockable + ON fifo_stock_v2_shadow_allocations(run_id, stockable_type, stockable_id); + +ALTER TABLE stock_allocations + ADD COLUMN IF NOT EXISTS engine_version VARCHAR(8) NOT NULL DEFAULT 'v1', + ADD COLUMN IF NOT EXISTS flag_group_code VARCHAR(64) NULL, + ADD COLUMN IF NOT EXISTS function_code VARCHAR(64) NULL, + ADD COLUMN IF NOT EXISTS reflow_run_id BIGINT NULL, + ADD COLUMN IF NOT EXISTS idempotency_key VARCHAR(128) NULL; + +CREATE INDEX IF NOT EXISTS idx_stock_allocations_engine_version + ON stock_allocations(engine_version); + +CREATE INDEX IF NOT EXISTS idx_stock_allocations_flag_group + ON stock_allocations(flag_group_code); + +CREATE INDEX IF NOT EXISTS idx_stock_allocations_idempotency + ON stock_allocations(idempotency_key); + +COMMIT; diff --git a/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.down.sql b/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.down.sql new file mode 100644 index 00000000..a9fb727b --- /dev/null +++ b/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.down.sql @@ -0,0 +1,36 @@ +BEGIN; + +DELETE FROM fifo_stock_v2_overconsume_rules +WHERE reason IN ( + 'fifo_v2_default_allow', + 'fifo_v2_exception_ayam_depletion_block', + 'fifo_v2_exception_marketing_block', + 'fifo_v2_exception_transfer_block', + 'fifo_v2_exception_adjustment_block', + 'fifo_v2_exception_transfer_laying_block' +); + +DELETE FROM fifo_stock_v2_route_rules +WHERE flag_group_code IN ('AYAM', 'AFKIR_CULLING_MATI', 'PAKAN', 'OVK', 'TELUR', 'TELUR_GRADE'); + +DELETE FROM fifo_stock_v2_traits +WHERE source_table IN ( + 'purchase_items', + 'stock_transfer_details', + 'laying_transfer_targets', + 'laying_transfer_sources', + 'adjustment_stocks', + 'recording_stocks', + 'recording_depletions', + 'recording_eggs', + 'marketing_delivery_products', + 'project_chickins' +); + +DELETE FROM fifo_stock_v2_flag_members +WHERE flag_group_code IN ('AYAM', 'AFKIR_CULLING_MATI', 'PAKAN', 'OVK', 'TELUR', 'TELUR_GRADE'); + +DELETE FROM fifo_stock_v2_flag_groups +WHERE code IN ('AYAM', 'AFKIR_CULLING_MATI', 'PAKAN', 'OVK', 'TELUR', 'TELUR_GRADE'); + +COMMIT; diff --git a/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.up.sql b/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.up.sql new file mode 100644 index 00000000..e6914e94 --- /dev/null +++ b/internal/database/migrations/20260218090010_seed_fifo_stock_v2_config.up.sql @@ -0,0 +1,248 @@ +BEGIN; + +INSERT INTO fifo_stock_v2_flag_groups(code, name, priority) +VALUES + ('AYAM', 'AYAM', 10), + ('AFKIR_CULLING_MATI', 'AFKIR/CULLING/MATI', 20), + ('PAKAN', 'PAKAN', 30), + ('OVK', 'OVK', 40), + ('TELUR', 'TELUR', 50), + ('TELUR_GRADE', 'UTUH/PUTIH/RETAK/PECAH/PAPACAL/JUMBO', 60) +ON CONFLICT (code) DO UPDATE +SET + name = EXCLUDED.name, + priority = EXCLUDED.priority, + updated_at = NOW(); + +INSERT INTO fifo_stock_v2_flag_members(flag_name, flag_group_code, priority) +VALUES + ('DOC', 'AYAM', 10), + ('PULLET', 'AYAM', 20), + ('LAYER', 'AYAM', 30), + + ('AYAM-AFKIR', 'AFKIR_CULLING_MATI', 10), + ('AYAM-CULLING', 'AFKIR_CULLING_MATI', 20), + ('AYAM-MATI', 'AFKIR_CULLING_MATI', 30), + + ('PAKAN', 'PAKAN', 10), + ('PRE-STARTER', 'PAKAN', 20), + ('STARTER', 'PAKAN', 30), + ('FINISHER', 'PAKAN', 40), + + ('OVK', 'OVK', 10), + ('OBAT', 'OVK', 20), + ('VITAMIN', 'OVK', 30), + ('KIMIA', 'OVK', 40), + + ('TELUR', 'TELUR', 10), + + ('TELUR-UTUH', 'TELUR_GRADE', 10), + ('TELUR-PUTIH', 'TELUR_GRADE', 20), + ('TELUR-RETAK', 'TELUR_GRADE', 30), + ('TELUR-PECAH', 'TELUR_GRADE', 40), + ('TELUR-PAPACAL', 'TELUR_GRADE', 50), + ('TELUR-JUMBO', 'TELUR_GRADE', 60) +ON CONFLICT (flag_name) DO UPDATE +SET + flag_group_code = EXCLUDED.flag_group_code, + priority = EXCLUDED.priority, + updated_at = NOW(); + +INSERT INTO fifo_stock_v2_traits( + source_table, + lane, + date_table, + date_join_left_col, + date_join_right_col, + date_column, + fallback_date_column, + sort_priority, + id_column +) +VALUES + ('purchase_items', 'STOCKABLE', NULL, NULL, NULL, 'received_date', NULL, 10, 'id'), + + ('stock_transfer_details', 'STOCKABLE', 'stock_transfers', 'stock_transfer_id', 'id', 'transfer_date', NULL, 20, 'id'), + ('stock_transfer_details', 'USABLE', 'stock_transfers', 'stock_transfer_id', 'id', 'transfer_date', NULL, 20, 'id'), + + ('laying_transfer_targets', 'STOCKABLE', 'laying_transfers', 'laying_transfer_id', 'id', 'transfer_date', NULL, 25, 'id'), + ('laying_transfer_sources', 'USABLE', 'laying_transfers', 'laying_transfer_id', 'id', 'transfer_date', NULL, 25, 'id'), + + ('adjustment_stocks', 'STOCKABLE', NULL, NULL, NULL, 'created_at', NULL, 30, 'id'), + ('adjustment_stocks', 'USABLE', NULL, NULL, NULL, 'created_at', NULL, 30, 'id'), + + ('recording_stocks', 'USABLE', 'recordings', 'recording_id', 'id', 'record_datetime', NULL, 35, 'id'), + ('recording_depletions', 'USABLE', 'recordings', 'recording_id', 'id', 'record_datetime', NULL, 35, 'id'), + ('recording_depletions', 'STOCKABLE', 'recordings', 'recording_id', 'id', 'record_datetime', NULL, 35, 'id'), + + ('recording_eggs', 'STOCKABLE', 'recordings', 'recording_id', 'id', 'record_datetime', 'created_at', 40, 'id'), + + ('marketing_delivery_products', 'USABLE', NULL, NULL, NULL, 'delivery_date', 'created_at', 45, 'id'), + + ('project_chickins', 'USABLE', NULL, NULL, NULL, 'chick_in_date', 'created_at', 50, 'id') +ON CONFLICT (source_table, lane) DO UPDATE +SET + date_table = EXCLUDED.date_table, + date_join_left_col = EXCLUDED.date_join_left_col, + date_join_right_col = EXCLUDED.date_join_right_col, + date_column = EXCLUDED.date_column, + fallback_date_column = EXCLUDED.fallback_date_column, + sort_priority = EXCLUDED.sort_priority, + id_column = EXCLUDED.id_column, + is_active = TRUE; + +INSERT INTO fifo_stock_v2_route_rules( + flag_group_code, + lane, + function_code, + source_table, + source_id_column, + product_warehouse_col, + quantity_col, + used_quantity_col, + pending_quantity_col, + scope_sql, + legacy_type_key, + allow_pending_default, + is_active +) +VALUES + -- AYAM STOCKABLE + ('AYAM', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('AYAM', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('AYAM', 'STOCKABLE', 'PURCHASE_IN', 'purchase_items', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'PURCHASE_ITEMS', TRUE, TRUE), + ('AYAM', 'STOCKABLE', 'TRANSFER_TO_LAYING_IN', 'laying_transfer_targets', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'TRANSFERTOLAYING_IN', TRUE, TRUE), + + -- AYAM USABLE + ('AYAM', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('AYAM', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('AYAM', 'USABLE', 'CHICKIN_OUT', 'project_chickins', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_usage_qty', 'deleted_at IS NULL', 'PROJECT_CHICKIN', TRUE, TRUE), + ('AYAM', 'USABLE', 'RECORDING_DEPLETION_OUT', 'recording_depletions', 'id', 'source_product_warehouse_id', 'qty', NULL, 'pending_qty', NULL, 'RECORDING_DEPLETION', TRUE, TRUE), + ('AYAM', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE), + ('AYAM', 'USABLE', 'TRANSFER_TO_LAYING_OUT', 'laying_transfer_sources', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_usage_qty', 'deleted_at IS NULL', 'TRANSFERTOLAYING_OUT', TRUE, TRUE), + + -- AFKIR/CULLING/MATI STOCKABLE + ('AFKIR_CULLING_MATI', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('AFKIR_CULLING_MATI', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('AFKIR_CULLING_MATI', 'STOCKABLE', 'RECORDING_DEPLETION_IN', 'recording_depletions', 'id', 'product_warehouse_id', 'qty', NULL, NULL, NULL, 'RECORDING_DEPLETION', TRUE, TRUE), + + -- AFKIR/CULLING/MATI USABLE + ('AFKIR_CULLING_MATI', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('AFKIR_CULLING_MATI', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('AFKIR_CULLING_MATI', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE), + + -- PAKAN STOCKABLE + ('PAKAN', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('PAKAN', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('PAKAN', 'STOCKABLE', 'PURCHASE_IN', 'purchase_items', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'PURCHASE_ITEMS', TRUE, TRUE), + + -- PAKAN USABLE + ('PAKAN', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('PAKAN', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('PAKAN', 'USABLE', 'RECORDING_STOCK_OUT', 'recording_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'RECORDING_STOCK', TRUE, TRUE), + ('PAKAN', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE), + + -- OVK STOCKABLE + ('OVK', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('OVK', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('OVK', 'STOCKABLE', 'PURCHASE_IN', 'purchase_items', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'PURCHASE_ITEMS', TRUE, TRUE), + + -- OVK USABLE + ('OVK', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('OVK', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('OVK', 'USABLE', 'RECORDING_STOCK_OUT', 'recording_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'RECORDING_STOCK', TRUE, TRUE), + ('OVK', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE), + + -- TELUR STOCKABLE + ('TELUR', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('TELUR', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('TELUR', 'STOCKABLE', 'RECORDING_EGG_IN', 'recording_eggs', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'RECORDING_EGG', TRUE, TRUE), + + -- TELUR USABLE + ('TELUR', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('TELUR', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('TELUR', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE), + + -- TELUR_GRADE STOCKABLE + ('TELUR_GRADE', 'STOCKABLE', 'ADJUSTMENT_IN', 'adjustment_stocks', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'ADJUSTMENT_IN', TRUE, TRUE), + ('TELUR_GRADE', 'STOCKABLE', 'STOCK_TRANSFER_IN', 'stock_transfer_details', 'id', 'dest_product_warehouse_id', 'total_qty', 'total_used', NULL, 'deleted_at IS NULL', 'STOCK_TRANSFER_IN', TRUE, TRUE), + ('TELUR_GRADE', 'STOCKABLE', 'RECORDING_EGG_IN', 'recording_eggs', 'id', 'product_warehouse_id', 'total_qty', 'total_used', NULL, NULL, 'RECORDING_EGG', TRUE, TRUE), + + -- TELUR_GRADE USABLE + ('TELUR_GRADE', 'USABLE', 'ADJUSTMENT_OUT', 'adjustment_stocks', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'ADJUSTMENT_OUT', TRUE, TRUE), + ('TELUR_GRADE', 'USABLE', 'STOCK_TRANSFER_OUT', 'stock_transfer_details', 'id', 'source_product_warehouse_id', 'usage_qty', NULL, 'pending_qty', 'deleted_at IS NULL', 'STOCK_TRANSFER_OUT', TRUE, TRUE), + ('TELUR_GRADE', 'USABLE', 'MARKETING_OUT', 'marketing_delivery_products', 'id', 'product_warehouse_id', 'usage_qty', NULL, 'pending_qty', NULL, 'MARKETING_DELIVERY', TRUE, TRUE) +ON CONFLICT (flag_group_code, lane, function_code, source_table) DO UPDATE +SET + source_id_column = EXCLUDED.source_id_column, + product_warehouse_col = EXCLUDED.product_warehouse_col, + quantity_col = EXCLUDED.quantity_col, + used_quantity_col = EXCLUDED.used_quantity_col, + pending_quantity_col = EXCLUDED.pending_quantity_col, + scope_sql = EXCLUDED.scope_sql, + legacy_type_key = EXCLUDED.legacy_type_key, + allow_pending_default = EXCLUDED.allow_pending_default, + is_active = EXCLUDED.is_active, + updated_at = NOW(); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT NULL, NULL, 'USABLE', TRUE, 999, 'fifo_v2_default_allow', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code IS NULL + AND function_code IS NULL + AND lane = 'USABLE' + AND reason = 'fifo_v2_default_allow' +); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT 'AYAM', 'RECORDING_DEPLETION_OUT', 'USABLE', FALSE, 10, 'fifo_v2_exception_ayam_depletion_block', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code = 'AYAM' + AND function_code = 'RECORDING_DEPLETION_OUT' + AND lane = 'USABLE' + AND reason = 'fifo_v2_exception_ayam_depletion_block' +); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT NULL, 'MARKETING_OUT', 'USABLE', FALSE, 20, 'fifo_v2_exception_marketing_block', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code IS NULL + AND function_code = 'MARKETING_OUT' + AND lane = 'USABLE' + AND reason = 'fifo_v2_exception_marketing_block' +); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT NULL, 'STOCK_TRANSFER_OUT', 'USABLE', FALSE, 30, 'fifo_v2_exception_transfer_block', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code IS NULL + AND function_code = 'STOCK_TRANSFER_OUT' + AND lane = 'USABLE' + AND reason = 'fifo_v2_exception_transfer_block' +); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT NULL, 'ADJUSTMENT_OUT', 'USABLE', FALSE, 40, 'fifo_v2_exception_adjustment_block', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code IS NULL + AND function_code = 'ADJUSTMENT_OUT' + AND lane = 'USABLE' + AND reason = 'fifo_v2_exception_adjustment_block' +); + +INSERT INTO fifo_stock_v2_overconsume_rules(flag_group_code, function_code, lane, allow_overconsume, priority, reason, is_active) +SELECT NULL, 'TRANSFER_TO_LAYING_OUT', 'USABLE', FALSE, 50, 'fifo_v2_exception_transfer_laying_block', TRUE +WHERE NOT EXISTS ( + SELECT 1 FROM fifo_stock_v2_overconsume_rules + WHERE flag_group_code IS NULL + AND function_code = 'TRANSFER_TO_LAYING_OUT' + AND lane = 'USABLE' + AND reason = 'fifo_v2_exception_transfer_laying_block' +); + +COMMIT;