package service import ( "context" "errors" "fmt" "math" "sort" "strings" "time" "github.com/sirupsen/logrus" commonRepo "gitlab.com/mbugroup/lti-api.git/internal/common/repository" "gitlab.com/mbugroup/lti-api.git/internal/entities" productWarehouseRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/inventory/product-warehouses/repositories" "gitlab.com/mbugroup/lti-api.git/internal/utils/fifo" "gorm.io/gorm" "gorm.io/gorm/clause" ) type FifoService interface { RegisterStockable(cfg fifo.StockableConfig) error RegisterUsable(cfg fifo.UsableConfig) error Replenish(ctx context.Context, req StockReplenishRequest) (*StockReplenishResult, error) Consume(ctx context.Context, req StockConsumeRequest) (*StockConsumeResult, error) ReleaseUsage(ctx context.Context, req StockReleaseRequest) error } type fifoService struct { db *gorm.DB logger *logrus.Logger allocations commonRepo.StockAllocationRepository productWarehouseRepo productWarehouseRepo.ProductWarehouseRepository defaultOrderBy []string pendingBatchPerUsable int maxLotsPerStockable int defaultAllocationNotes string } func NewFifoService( db *gorm.DB, allocations commonRepo.StockAllocationRepository, productWarehouseRepo productWarehouseRepo.ProductWarehouseRepository, logger *logrus.Logger, ) FifoService { if logger == nil { logger = logrus.StandardLogger() } return &fifoService{ db: db, logger: logger, allocations: allocations, productWarehouseRepo: productWarehouseRepo, defaultOrderBy: []string{"created_at ASC", "id ASC"}, pendingBatchPerUsable: 25, maxLotsPerStockable: 50, } } func (s *fifoService) withTransaction( ctx context.Context, tx *gorm.DB, fn func(*gorm.DB) error, ) error { if tx != nil { return fn(tx.WithContext(ctx)) } return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error { return fn(inner) }) } func (s *fifoService) txOrDB(tx, db *gorm.DB) *gorm.DB { if tx != nil { return tx } return db } func (s *fifoService) RegisterStockable(cfg fifo.StockableConfig) error { return fifo.RegisterStockable(cfg) } func (s *fifoService) RegisterUsable(cfg fifo.UsableConfig) error { return fifo.RegisterUsable(cfg) } type StockReplenishRequest struct { StockableKey fifo.StockableKey StockableID uint ProductWarehouseID uint Quantity float64 Note *string Tx *gorm.DB } type PendingResolution struct { UsableKey fifo.UsableKey UsableID uint Quantity float64 } type StockReplenishResult struct { AddedQuantity float64 PendingResolved []PendingResolution RemainingPending float64 } type StockConsumeRequest struct { UsableKey fifo.UsableKey UsableID uint ProductWarehouseID uint Quantity float64 AllowPending bool Note *string Tx *gorm.DB } type AllocationDetail struct { StockableKey fifo.StockableKey StockableID uint Quantity float64 } type StockConsumeResult struct { RequestedQuantity float64 UsageQuantity float64 PendingQuantity float64 AddedAllocations []AllocationDetail ReleasedQuantity float64 } type StockReleaseRequest struct { UsableKey fifo.UsableKey UsableID uint Reason *string Tx *gorm.DB } func (s *fifoService) Replenish(ctx context.Context, req StockReplenishRequest) (*StockReplenishResult, error) { if req.StockableID == 0 || strings.TrimSpace(req.StockableKey.String()) == "" { return nil, errors.New("stockable key and id are required") } if req.ProductWarehouseID == 0 { return nil, errors.New("product warehouse id is required") } if req.Quantity <= 0 { return nil, errors.New("quantity must be greater than zero") } cfg, ok := fifo.Stockable(req.StockableKey) if !ok { return nil, fmt.Errorf("stockable %q is not registered", req.StockableKey) } result := &StockReplenishResult{ AddedQuantity: req.Quantity, } err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { if err := s.incrementStockableQty(ctx, tx, cfg, req.StockableID, req.Quantity); err != nil { return err } if err := s.productWarehouseRepo.AdjustQuantities(ctx, map[uint]float64{ req.ProductWarehouseID: req.Quantity, }, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return err } resolved, err := s.resolvePendingForWarehouse(ctx, tx, req.ProductWarehouseID) if err != nil { return err } result.PendingResolved = resolved return nil }) if err != nil { return nil, err } return result, nil } func (s *fifoService) Consume(ctx context.Context, req StockConsumeRequest) (*StockConsumeResult, error) { if req.UsableID == 0 || strings.TrimSpace(req.UsableKey.String()) == "" { return nil, errors.New("usable key and id are required") } if req.Quantity < 0 { return nil, errors.New("quantity must be zero or greater") } cfg, ok := fifo.Usable(req.UsableKey) if !ok { return nil, fmt.Errorf("usable %q is not registered", req.UsableKey) } result := &StockConsumeResult{ RequestedQuantity: req.Quantity, } err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { ctxRow, err := s.loadUsableContext(ctx, tx, cfg, req.UsableID) if err != nil { return err } productWarehouseID := ctxRow.ProductWarehouseID if productWarehouseID == 0 { return fmt.Errorf("usable %q (id: %d) has no product warehouse reference", req.UsableKey, req.UsableID) } if req.ProductWarehouseID != 0 && req.ProductWarehouseID != productWarehouseID { return fmt.Errorf("usable %q (id: %d) references product warehouse %d but %d was provided", req.UsableKey, req.UsableID, productWarehouseID, req.ProductWarehouseID) } currentUsage := ctxRow.UsageQty currentPending := ctxRow.PendingQty currentTotal := currentUsage + currentPending delta := req.Quantity - currentTotal var ( usageDelta float64 pendingDelta float64 addedAlloc []AllocationDetail releasedAmount float64 ) switch { case delta > 0: var excludedStockables []fifo.StockableKey if cfg.ExcludedStockables != nil { excludedStockables = cfg.ExcludedStockables } allocationRes, err := s.allocateFromStock(ctx, tx, productWarehouseID, req.UsableKey, req.UsableID, delta, excludedStockables) if err != nil { return err } if allocationRes.pending > 0 && !req.AllowPending { return fmt.Errorf("insufficient stock: requested %.3f, allocated %.3f", req.Quantity, currentUsage+allocationRes.allocated) } usageDelta += allocationRes.allocated pendingDelta += allocationRes.pending addedAlloc = allocationRes.allocations if allocationRes.allocated > 0 { if err := s.productWarehouseRepo.AdjustQuantities(ctx, map[uint]float64{ productWarehouseID: -allocationRes.allocated, }, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return err } } case delta < 0: reductionTarget := -delta if currentPending > 0 { pendingReduction := math.Min(currentPending, reductionTarget) if pendingReduction > 0 { pendingDelta -= pendingReduction reductionTarget -= pendingReduction } } if reductionTarget > 0 { released, err := s.releaseUsagePortion(ctx, tx, req.UsableKey, req.UsableID, reductionTarget) if err != nil { return err } if released+1e-6 < reductionTarget { return fmt.Errorf("unable to release %.3f from usable %d, only %.3f available", reductionTarget, req.UsableID, released) } usageDelta -= released releasedAmount = released } default: // no change } if err := s.applyUsableDeltas(ctx, tx, cfg, req.UsableID, usageDelta, pendingDelta); err != nil { return err } result.AddedAllocations = addedAlloc result.ReleasedQuantity = releasedAmount result.UsageQuantity = currentUsage + usageDelta result.PendingQuantity = currentPending + pendingDelta return nil }) if err != nil { return nil, err } return result, nil } func (s *fifoService) ReleaseUsage(ctx context.Context, req StockReleaseRequest) error { if req.UsableID == 0 || strings.TrimSpace(req.UsableKey.String()) == "" { return errors.New("usable key and id are required") } return s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { cfg, ok := fifo.Usable(req.UsableKey) if !ok { return fmt.Errorf("usable %q is not registered", req.UsableKey) } ctxRow, err := s.loadUsableContext(ctx, tx, cfg, req.UsableID) if err != nil { return err } var usageDelta, pendingDelta float64 if ctxRow.UsageQty > 0 { if _, err := s.releaseUsagePortion(ctx, tx, req.UsableKey, req.UsableID, ctxRow.UsageQty); err != nil { return err } usageDelta -= ctxRow.UsageQty } if ctxRow.PendingQty > 0 { pendingDelta -= ctxRow.PendingQty } if err := s.applyUsableDeltas(ctx, tx, cfg, req.UsableID, usageDelta, pendingDelta); err != nil { return err } return s.allocations.ReleaseByUsable(ctx, req.UsableKey.String(), req.UsableID, req.Reason, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }) }) } // --- helpers --- type usableContextRow struct { ProductWarehouseID uint UsageQty float64 PendingQty float64 } func (s *fifoService) loadUsableContext(ctx context.Context, tx *gorm.DB, cfg fifo.UsableConfig, id uint) (*usableContextRow, error) { var row usableContextRow query := tx.Table(cfg.Table). Select(fmt.Sprintf("%s AS product_warehouse_id, COALESCE(%s,0) AS usage_qty, COALESCE(%s,0) AS pending_qty", cfg.Columns.ProductWarehouseID, cfg.Columns.UsageQuantity, cfg.Columns.PendingQuantity)). Where(fmt.Sprintf("%s = ?", cfg.Columns.ID), id). Clauses(clause.Locking{Strength: "UPDATE"}) if cfg.Scope != nil { query = cfg.Scope(query) } if err := query.Take(&row).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, fmt.Errorf("usable record %d not found", id) } return nil, err } return &row, nil } func (s *fifoService) incrementStockableQty(ctx context.Context, tx *gorm.DB, cfg fifo.StockableConfig, id uint, qty float64) error { column := cfg.Columns.TotalQuantity query := tx.Table(cfg.Table). Where(fmt.Sprintf("%s = ?", cfg.Columns.ID), id) if cfg.Scope != nil { query = cfg.Scope(query) } updates := map[string]any{ column: gorm.Expr(fmt.Sprintf("COALESCE(%s,0) + ?", column), qty), } if cfg.Columns.TotalUsedQuantity != "" { updates[cfg.Columns.TotalUsedQuantity] = gorm.Expr(fmt.Sprintf("COALESCE(%s,0)", cfg.Columns.TotalUsedQuantity)) } return query.Updates(updates).Error } func (s *fifoService) incrementStockableUsage(ctx context.Context, tx *gorm.DB, cfg fifo.StockableConfig, id uint, qty float64) error { if qty == 0 { return nil } column := cfg.Columns.TotalUsedQuantity query := tx.Table(cfg.Table). Where(fmt.Sprintf("%s = ?", cfg.Columns.ID), id) if cfg.Scope != nil { query = cfg.Scope(query) } return query.Update(column, gorm.Expr(fmt.Sprintf("COALESCE(%s,0) + ?", column), qty)).Error } type allocationOutcome struct { allocated float64 pending float64 allocations []AllocationDetail } type stockLot struct { StockableKey fifo.StockableKey RecordID uint AvailableQty float64 CreatedAt time.Time } func (s *fifoService) allocateFromStock( ctx context.Context, tx *gorm.DB, productWarehouseID uint, usableKey fifo.UsableKey, usableID uint, requestQty float64, excludedStockables []fifo.StockableKey, ) (*allocationOutcome, error) { lots, err := s.fetchStockLots(ctx, tx, productWarehouseID, excludedStockables) if err != nil { return nil, err } if len(lots) == 0 { return &allocationOutcome{pending: requestQty}, nil } var ( remaining = requestQty applied float64 allocations []*entities.StockAllocation allocationSummaries []AllocationDetail usageAdjustments = make(map[fifo.StockableKey]map[uint]float64) ) for _, lot := range lots { if remaining <= 0 { break } if lot.AvailableQty <= 0 { continue } portion := lot.AvailableQty if portion > remaining { portion = remaining } applied += portion remaining -= portion allocationSummaries = append(allocationSummaries, AllocationDetail{ StockableKey: lot.StockableKey, StockableID: lot.RecordID, Quantity: portion, }) allocations = append(allocations, &entities.StockAllocation{ ProductWarehouseId: productWarehouseID, StockableType: lot.StockableKey.String(), StockableId: lot.RecordID, UsableType: usableKey.String(), UsableId: usableID, Qty: portion, Status: entities.StockAllocationStatusActive, }) if _, ok := usageAdjustments[lot.StockableKey]; !ok { usageAdjustments[lot.StockableKey] = make(map[uint]float64) } usageAdjustments[lot.StockableKey][lot.RecordID] += portion } if len(allocations) > 0 { if err := s.allocations.CreateMany(ctx, allocations, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return nil, err } for key, deltas := range usageAdjustments { cfg, ok := fifo.Stockable(key) if !ok { continue } for id, qty := range deltas { if err := s.incrementStockableUsage(ctx, tx, cfg, id, qty); err != nil { return nil, err } } } } return &allocationOutcome{ allocated: applied, pending: remaining, allocations: allocationSummaries, }, nil } func (s *fifoService) fetchStockLots(ctx context.Context, tx *gorm.DB, productWarehouseID uint, excludedStockables []fifo.StockableKey) ([]stockLot, error) { configs := fifo.Stockables() if len(configs) == 0 { return nil, nil } // Create exclusion set for faster lookup excludedSet := make(map[fifo.StockableKey]bool) for _, key := range excludedStockables { excludedSet[key] = true } var lots []stockLot for key, cfg := range configs { // Skip excluded stockables if excludedSet[key] { continue } usesNumericTime := cfg.Columns.CreatedAt == cfg.Columns.ID var selectStmt string if usesNumericTime { selectStmt = fmt.Sprintf( "%s AS id, %s AS available_qty, '1970-01-01 00:00:00 UTC'::timestamp AS created_at", cfg.Columns.ID, fmt.Sprintf("%s - COALESCE(%s,0)", cfg.Columns.TotalQuantity, cfg.Columns.TotalUsedQuantity), ) } else { selectStmt = fmt.Sprintf( "%s AS id, %s AS available_qty, %s AS created_at", cfg.Columns.ID, fmt.Sprintf("%s - COALESCE(%s,0)", cfg.Columns.TotalQuantity, cfg.Columns.TotalUsedQuantity), cfg.Columns.CreatedAt, ) } var rows []struct { ID uint AvailableQty float64 CreatedAt time.Time } query := tx.Table(cfg.Table). Select(selectStmt). Where(fmt.Sprintf("%s = ?", cfg.Columns.ProductWarehouseID), productWarehouseID). Where(fmt.Sprintf("%s > %s", cfg.Columns.TotalQuantity, cfg.Columns.TotalUsedQuantity)) if cfg.Scope != nil { query = cfg.Scope(query) } for _, order := range s.orderClauses(cfg.OrderBy) { query = query.Order(order) } query = query.Limit(s.maxLotsPerStockable) if err := query.Find(&rows).Error; err != nil { return nil, err } for _, row := range rows { if row.AvailableQty <= 0 { continue } lots = append(lots, stockLot{ StockableKey: key, RecordID: row.ID, AvailableQty: row.AvailableQty, CreatedAt: row.CreatedAt, }) } } if len(lots) == 0 { return nil, nil } sort.SliceStable(lots, func(i, j int) bool { if lots[i].CreatedAt.Equal(lots[j].CreatedAt) { return lots[i].RecordID < lots[j].RecordID } return lots[i].CreatedAt.Before(lots[j].CreatedAt) }) return lots, nil } func (s *fifoService) applyUsableDeltas(ctx context.Context, tx *gorm.DB, cfg fifo.UsableConfig, id uint, usageDelta, pendingDelta float64) error { if usageDelta == 0 && pendingDelta == 0 { return nil } updates := map[string]any{} if usageDelta != 0 { updates[cfg.Columns.UsageQuantity] = gorm.Expr(fmt.Sprintf("COALESCE(%s,0) + ?", cfg.Columns.UsageQuantity), usageDelta) } if pendingDelta != 0 { updates[cfg.Columns.PendingQuantity] = gorm.Expr(fmt.Sprintf("COALESCE(%s,0) + ?", cfg.Columns.PendingQuantity), pendingDelta) } query := tx.Table(cfg.Table).Where(fmt.Sprintf("%s = ?", cfg.Columns.ID), id) if cfg.Scope != nil { query = cfg.Scope(query) } return query.Updates(updates).Error } type pendingCandidate struct { UsableKey fifo.UsableKey Config fifo.UsableConfig UsableID uint Pending float64 CreatedAt time.Time } func (s *fifoService) resolvePendingForWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) ([]PendingResolution, error) { candidates, err := s.fetchPendingCandidates(ctx, tx, productWarehouseID) if err != nil { return nil, err } if len(candidates) == 0 { return nil, nil } var resolutions []PendingResolution for _, candidate := range candidates { if candidate.Pending <= 0 { continue } // Get excluded stockables from candidate usable config var excludedStockables []fifo.StockableKey if candidate.Config.ExcludedStockables != nil { excludedStockables = candidate.Config.ExcludedStockables } outcome, err := s.allocateFromStock(ctx, tx, productWarehouseID, candidate.UsableKey, candidate.UsableID, candidate.Pending, excludedStockables) if err != nil { return nil, err } if outcome.allocated <= 0 { break } if err := s.applyUsableDeltas(ctx, tx, candidate.Config, candidate.UsableID, outcome.allocated, -outcome.allocated); err != nil { return nil, err } if err := s.productWarehouseRepo.AdjustQuantities(ctx, map[uint]float64{ productWarehouseID: -outcome.allocated, }, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return nil, err } resolutions = append(resolutions, PendingResolution{ UsableKey: candidate.UsableKey, UsableID: candidate.UsableID, Quantity: outcome.allocated, }) if outcome.pending > 0 { // No more stock available for this warehouse at the moment. break } } return resolutions, nil } func (s *fifoService) releaseUsagePortion( ctx context.Context, tx *gorm.DB, usableKey fifo.UsableKey, usableID uint, target float64, ) (float64, error) { if target <= 0 { return 0, nil } allocations, err := s.allocations.FindActiveByUsable(ctx, usableKey.String(), usableID, func(db *gorm.DB) *gorm.DB { target := s.txOrDB(tx, db) return target.Clauses(clause.Locking{Strength: "UPDATE"}) }) if err != nil { return 0, err } if len(allocations) == 0 { return 0, nil } var ( remaining = target totalReleased float64 warehouseAdjustments = make(map[uint]float64) stockableAdjustments = make(map[fifo.StockableKey]map[uint]float64) ) now := time.Now() for i := len(allocations) - 1; i >= 0 && remaining > 0; i-- { allocation := allocations[i] releaseAmt := allocation.Qty if releaseAmt > remaining { releaseAmt = remaining } remaining -= releaseAmt totalReleased += releaseAmt warehouseAdjustments[allocation.ProductWarehouseId] += releaseAmt key := fifo.StockableKey(allocation.StockableType) if _, ok := stockableAdjustments[key]; !ok { stockableAdjustments[key] = make(map[uint]float64) } stockableAdjustments[key][allocation.StockableId] += releaseAmt if releaseAmt == allocation.Qty { if err := s.allocations.PatchOne(ctx, allocation.Id, map[string]any{ "status": entities.StockAllocationStatusReleased, "released_at": now, }, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return 0, err } } else { if err := s.allocations.PatchOne(ctx, allocation.Id, map[string]any{ "qty": allocation.Qty - releaseAmt, }, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return 0, err } } } if totalReleased == 0 { return 0, nil } for key, deltas := range stockableAdjustments { cfg, ok := fifo.Stockable(key) if !ok { continue } for id, qty := range deltas { if err := s.incrementStockableUsage(ctx, tx, cfg, id, -qty); err != nil { return 0, err } } } if len(warehouseAdjustments) > 0 { if err := s.productWarehouseRepo.AdjustQuantities(ctx, warehouseAdjustments, func(db *gorm.DB) *gorm.DB { return s.txOrDB(tx, db) }); err != nil { return 0, err } for warehouseID := range warehouseAdjustments { if _, err := s.resolvePendingForWarehouse(ctx, tx, warehouseID); err != nil { return 0, err } } } return totalReleased, nil } func (s *fifoService) fetchPendingCandidates(ctx context.Context, tx *gorm.DB, productWarehouseID uint) ([]pendingCandidate, error) { configs := fifo.Usables() if len(configs) == 0 { return nil, nil } var candidates []pendingCandidate for key, cfg := range configs { selectStmt := fmt.Sprintf( "%s AS id, %s AS pending_qty, %s AS created_at", cfg.Columns.ID, cfg.Columns.PendingQuantity, cfg.Columns.CreatedAt, ) var rows []struct { ID uint Pending float64 CreatedAt time.Time } query := tx.Table(cfg.Table). Select(selectStmt). Where(fmt.Sprintf("%s = ?", cfg.Columns.ProductWarehouseID), productWarehouseID). Where(fmt.Sprintf("%s > 0", cfg.Columns.PendingQuantity)). Limit(s.pendingBatchPerUsable) if cfg.Scope != nil { query = cfg.Scope(query) } for _, order := range s.orderClauses(cfg.OrderBy) { query = query.Order(order) } if err := query.Find(&rows).Error; err != nil { return nil, err } for _, row := range rows { if row.Pending <= 0 { continue } candidates = append(candidates, pendingCandidate{ UsableKey: key, Config: cfg, UsableID: row.ID, Pending: row.Pending, CreatedAt: row.CreatedAt, }) } } if len(candidates) == 0 { return nil, nil } sort.SliceStable(candidates, func(i, j int) bool { if candidates[i].CreatedAt.Equal(candidates[j].CreatedAt) { return candidates[i].UsableID < candidates[j].UsableID } return candidates[i].CreatedAt.Before(candidates[j].CreatedAt) }) return candidates, nil } func (s *fifoService) orderClauses(custom []string) []string { if len(custom) > 0 { return custom } return s.defaultOrderBy }