package fifo_stock_v2 import ( "context" "encoding/json" "errors" "fmt" "math" "strings" "time" "gorm.io/gorm" ) type allocationRow struct { ID uint `gorm:"column:id"` ProductWarehouseID uint `gorm:"column:product_warehouse_id"` StockableType string `gorm:"column:stockable_type"` StockableID uint `gorm:"column:stockable_id"` UsableType string `gorm:"column:usable_type"` UsableID uint `gorm:"column:usable_id"` Qty float64 `gorm:"column:qty"` Status string `gorm:"column:status"` CreatedAt time.Time `gorm:"column:created_at"` } type usableQtySnapshot struct { Usage float64 `gorm:"column:usage_qty"` Pending float64 `gorm:"column:pending_qty"` } func (s *fifoStockV2Service) Allocate(ctx context.Context, req AllocateRequest) (*AllocateResult, error) { if err := s.validateAllocateRequest(req); err != nil { return nil, err } result := &AllocateResult{} err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { if err := s.ensureStockAllocationColumns(tx); err != nil { return err } if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil { return err } hash := requestHash(map[string]any{ "flag_group_code": req.FlagGroupCode, "product_warehouse_id": req.ProductWarehouseID, "usable_type": req.Usable.LegacyTypeKey, "usable_id": req.Usable.ID, "need_qty": req.NeedQty, "as_of": req.AsOf, "allow_over_consume": req.AllowOverConsume, }) logRow, reused, err := s.beginOperation( tx, OperationAllocate, req.IdempotencyKey, hash, req.ProductWarehouseID, req.FlagGroupCode, req.Usable.LegacyTypeKey, req.Usable.ID, ) if err != nil { return err } if reused { if len(logRow.ResultPayload) == 0 { return fmt.Errorf("idempotent allocate has empty payload") } if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { return err } return nil } if logRow != nil { defer func() { if err != nil { s.failOperation(tx, logRow, err) } }() } allocated, allocErr := s.allocateInternal(ctx, tx, req) if allocErr != nil { err = allocErr return allocErr } *result = *allocated if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { err = finishErr return finishErr } return nil }) if err != nil { return nil, err } return result, nil } func (s *fifoStockV2Service) allocateInternal(ctx context.Context, tx *gorm.DB, req AllocateRequest) (*AllocateResult, error) { usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, req.FlagGroupCode, req.Usable.LegacyTypeKey) if err != nil { return nil, err } allowOverConsume := usableRule.AllowPendingDefault if req.AllowOverConsume != nil { allowOverConsume = *req.AllowOverConsume } else { allowOverConsume, err = s.resolveOverConsume(tx, req.FlagGroupCode, req.Usable.FunctionCode, LaneUsable, allowOverConsume) if err != nil { return nil, err } } gatherRows, err := s.gatherRows(ctx, tx, GatherRequest{ FlagGroupCode: req.FlagGroupCode, Lane: LaneStockable, ProductWarehouseID: req.ProductWarehouseID, AsOf: req.AsOf, Limit: s.defaultGatherLimit, }) if err != nil { return nil, err } stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, req.FlagGroupCode) if err != nil { return nil, err } now := time.Now() remaining := req.NeedQty result := &AllocateResult{Details: make([]AllocationDetail, 0)} for _, lot := range gatherRows { if remaining <= 0 { break } if shouldSkipStockableForUsable(req, lot.Ref.LegacyTypeKey) { continue } if lot.AvailableQuantity <= 0 { continue } portion := math.Min(remaining, lot.AvailableQuantity) if nearlyZero(portion) { continue } allocationInsert := map[string]any{ "product_warehouse_id": req.ProductWarehouseID, "stockable_type": lot.Ref.LegacyTypeKey, "stockable_id": lot.Ref.ID, "usable_type": req.Usable.LegacyTypeKey, "usable_id": req.Usable.ID, "qty": portion, "status": activeAllocationStatus(), "allocation_purpose": defaultAllocationPurpose(), "created_at": now, "updated_at": now, "engine_version": "v2", "flag_group_code": req.FlagGroupCode, "function_code": req.Usable.FunctionCode, } if strings.TrimSpace(req.IdempotencyKey) != "" { allocationInsert["idempotency_key"] = req.IdempotencyKey } if err := tx.Table("stock_allocations").Create(allocationInsert).Error; err != nil { return nil, err } rule, ok := stockableRuleMap[lot.Ref.LegacyTypeKey] if !ok { return nil, fmt.Errorf("missing stockable route rule for type %s", lot.Ref.LegacyTypeKey) } if err := s.adjustStockableUsedQuantity(tx, rule, lot.Ref.ID, portion); err != nil { return nil, err } result.Details = append(result.Details, AllocationDetail{ StockableType: lot.Ref.LegacyTypeKey, StockableID: lot.Ref.ID, Qty: portion, SortAt: lot.SortAt, }) remaining -= portion result.AllocatedQty += portion } if remaining > 0 { if !allowOverConsume { return nil, fmt.Errorf("%w: requested %.3f, allocated %.3f", ErrInsufficientStock, req.NeedQty, result.AllocatedQty) } result.PendingQty = remaining } if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, result.AllocatedQty, result.PendingQty); err != nil { return nil, err } if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, -result.AllocatedQty); err != nil { return nil, err } return result, nil } func shouldSkipStockableForUsable(req AllocateRequest, stockableType string) bool { usableType := strings.ToUpper(strings.TrimSpace(req.Usable.LegacyTypeKey)) functionCode := strings.ToUpper(strings.TrimSpace(req.Usable.FunctionCode)) stockable := strings.ToUpper(strings.TrimSpace(stockableType)) // CHICKIN_OUT must consume physical stock sources, not population lots, // otherwise approved chickin can consume its own just-created population. if (usableType == "PROJECT_CHICKIN" || functionCode == "CHICKIN_OUT") && stockable == "PROJECT_FLOCK_POPULATION" { return true } if (usableType == "STOCK_TRANSFER_OUT" || functionCode == "STOCK_TRANSFER_OUT") && stockable == "PROJECT_FLOCK_POPULATION" { return true } return false } func (s *fifoStockV2Service) Rollback(ctx context.Context, req RollbackRequest) (*RollbackResult, error) { if err := s.validateRollbackRequest(req); err != nil { return nil, err } result := &RollbackResult{} err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { if err := s.ensureStockAllocationColumns(tx); err != nil { return err } flagGroupCode, err := s.resolveRollbackFlagGroup(ctx, tx, req) if err != nil { return err } if err := s.lockShard(tx, flagGroupCode, req.ProductWarehouseID); err != nil { return err } hash := requestHash(map[string]any{ "product_warehouse_id": req.ProductWarehouseID, "usable_type": req.Usable.LegacyTypeKey, "usable_id": req.Usable.ID, "release_qty": req.ReleaseQty, "reason": req.Reason, "flag_group_code": flagGroupCode, }) logRow, reused, beginErr := s.beginOperation( tx, OperationRollback, req.IdempotencyKey, hash, req.ProductWarehouseID, flagGroupCode, req.Usable.LegacyTypeKey, req.Usable.ID, ) if beginErr != nil { return beginErr } if reused { if len(logRow.ResultPayload) == 0 { return fmt.Errorf("idempotent rollback has empty payload") } if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { return err } return nil } if logRow != nil { defer func() { if err != nil { s.failOperation(tx, logRow, err) } }() } rolled, rollbackErr := s.rollbackInternal(ctx, tx, req, flagGroupCode) if rollbackErr != nil { err = rollbackErr return rollbackErr } *result = *rolled if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { err = finishErr return finishErr } return nil }) if err != nil { return nil, err } return result, nil } func (s *fifoStockV2Service) rollbackInternal( ctx context.Context, tx *gorm.DB, req RollbackRequest, flagGroupCode string, ) (*RollbackResult, error) { usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, flagGroupCode, req.Usable.LegacyTypeKey) if err != nil { return nil, err } allocations, err := s.loadActiveAllocations(tx, req.Usable.LegacyTypeKey, req.Usable.ID, req.ProductWarehouseID) if err != nil { return nil, err } if len(allocations) == 0 { if req.ReleaseQty == nil { if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil { return nil, err } } return &RollbackResult{}, nil } stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, flagGroupCode) if err != nil { return nil, err } target := 0.0 for _, alloc := range allocations { target += alloc.Qty } if req.ReleaseQty != nil { if *req.ReleaseQty < 0 { return nil, fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest) } target = *req.ReleaseQty } if nearlyZero(target) { return &RollbackResult{}, nil } result := &RollbackResult{Details: make([]AllocationDetail, 0)} now := time.Now() remaining := target for _, alloc := range allocations { if remaining <= 0 { break } portion := math.Min(remaining, alloc.Qty) if nearlyZero(portion) { continue } if nearlyZero(alloc.Qty - portion) { updates := map[string]any{ "status": releasedAllocationStatus(), "released_at": now, "updated_at": now, } if strings.TrimSpace(req.Reason) != "" { updates["note"] = req.Reason } if err := tx.Table("stock_allocations").Where("id = ?", alloc.ID).Updates(updates).Error; err != nil { return nil, err } } else { if err := tx.Table("stock_allocations"). Where("id = ?", alloc.ID). Updates(map[string]any{ "qty": alloc.Qty - portion, "updated_at": now, }).Error; err != nil { return nil, err } } stockableRule, ok := stockableRuleMap[alloc.StockableType] if !ok { return nil, fmt.Errorf("missing stockable route rule for type %s", alloc.StockableType) } if err := s.adjustStockableUsedQuantity(tx, stockableRule, alloc.StockableID, -portion); err != nil { return nil, err } result.ReleasedQty += portion remaining -= portion result.Details = append(result.Details, AllocationDetail{ StockableType: alloc.StockableType, StockableID: alloc.StockableID, Qty: portion, SortAt: alloc.CreatedAt, }) } if req.ReleaseQty != nil && remaining > 1e-6 { return nil, fmt.Errorf("unable to release %.3f; only %.3f allocation exists", target, result.ReleasedQty) } if req.ReleaseQty == nil { if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil { return nil, err } } else { if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, -result.ReleasedQty, 0); err != nil { return nil, err } } if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, result.ReleasedQty); err != nil { return nil, err } return result, nil } func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error) { if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 { return nil, fmt.Errorf("%w: invalid reflow request", ErrInvalidRequest) } result := &ReflowResult{} err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { if err := s.ensureStockAllocationColumns(tx); err != nil { return err } if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil { return err } hash := requestHash(map[string]any{ "flag_group_code": req.FlagGroupCode, "product_warehouse_id": req.ProductWarehouseID, "as_of": req.AsOf, }) logRow, reused, err := s.beginOperation( tx, OperationReflow, req.IdempotencyKey, hash, req.ProductWarehouseID, req.FlagGroupCode, "", 0, ) if err != nil { return err } if reused { if len(logRow.ResultPayload) == 0 { return fmt.Errorf("idempotent reflow has empty payload") } if err := json.Unmarshal(logRow.ResultPayload, result); err != nil { return err } return nil } if logRow != nil { defer func() { if err != nil { s.failOperation(tx, logRow, err) } }() } usableRows, gatherErr := s.gatherAllRows(ctx, tx, GatherRequest{ FlagGroupCode: req.FlagGroupCode, Lane: LaneUsable, ProductWarehouseID: req.ProductWarehouseID, Limit: s.defaultGatherLimit, }) if gatherErr != nil { err = gatherErr return gatherErr } result.ProcessedUsables = len(usableRows) for _, usableRow := range usableRows { desiredQty := usableRow.Quantity + usableRow.PendingQuantity rollbackRes, rollbackErr := s.rollbackInternal(ctx, tx, RollbackRequest{ ProductWarehouseID: req.ProductWarehouseID, Usable: usableRow.Ref, ReleaseQty: nil, Reason: "reflow reset", }, req.FlagGroupCode) if rollbackErr != nil { err = rollbackErr return rollbackErr } result.Rollback.ReleasedQty += rollbackRes.ReleasedQty if len(rollbackRes.Details) > 0 { result.Rollback.Details = append(result.Rollback.Details, rollbackRes.Details...) } if desiredQty <= 0 { continue } allocateRes, allocateErr := s.allocateInternal(ctx, tx, AllocateRequest{ FlagGroupCode: req.FlagGroupCode, ProductWarehouseID: req.ProductWarehouseID, Usable: usableRow.Ref, NeedQty: desiredQty, AsOf: nil, }) if allocateErr != nil { err = allocateErr return allocateErr } result.Allocate.AllocatedQty += allocateRes.AllocatedQty result.Allocate.PendingQty += allocateRes.PendingQty if len(allocateRes.Details) > 0 { result.Allocate.Details = append(result.Allocate.Details, allocateRes.Details...) } } expectedQty, calcErr := s.calculateWarehouseAvailableForGroup(ctx, tx, req.ProductWarehouseID, req.FlagGroupCode, nil) if calcErr != nil { err = calcErr return calcErr } actualQty, loadErr := s.loadWarehouseQty(ctx, tx, req.ProductWarehouseID) if loadErr != nil { err = loadErr return loadErr } drift := expectedQty - actualQty if math.Abs(drift) >= 1e-6 { if adjustErr := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, drift); adjustErr != nil { err = adjustErr return adjustErr } } if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil { err = finishErr return finishErr } return nil }) if err != nil { return nil, err } return result, nil } func (s *fifoStockV2Service) gatherAllRows( ctx context.Context, tx *gorm.DB, req GatherRequest, ) ([]GatherRow, error) { limit := req.Limit if limit <= 0 { limit = s.defaultGatherLimit } if limit <= 0 { limit = 1000 } req.Limit = limit out := make([]GatherRow, 0, limit) var cursorSortAt *time.Time cursorSourceTable := "" var cursorSourceID uint for { req.AfterSortAt = cursorSortAt req.AfterSourceTable = cursorSourceTable req.AfterSourceID = cursorSourceID rows, err := s.gatherRows(ctx, tx, req) if err != nil { return nil, err } if len(rows) == 0 { break } out = append(out, rows...) if len(rows) < limit { break } last := rows[len(rows)-1] lastSortAt := last.SortAt cursorSortAt = &lastSortAt cursorSourceTable = last.SourceTable cursorSourceID = last.SourceID } return out, nil } func (s *fifoStockV2Service) loadActiveAllocations( tx *gorm.DB, usableType string, usableID uint, productWarehouseID uint, ) ([]allocationRow, error) { query := tx.Table("stock_allocations"). Select("id, product_warehouse_id, stockable_type, stockable_id, usable_type, usable_id, qty, status, created_at"). Where("usable_type = ? AND usable_id = ? AND status = ? AND allocation_purpose = ?", usableType, usableID, activeAllocationStatus(), defaultAllocationPurpose()) if productWarehouseID > 0 { query = query.Where("product_warehouse_id = ?", productWarehouseID) } query = query.Order("created_at DESC, id DESC") var rows []allocationRow if err := query.Find(&rows).Error; err != nil { return nil, err } return rows, nil } func (s *fifoStockV2Service) loadStockableRuleMap(ctx context.Context, tx *gorm.DB, flagGroupCode string) (map[string]routeRule, error) { rules, err := s.loadRouteRules(ctx, tx, flagGroupCode, LaneStockable) if err != nil { return nil, err } m := make(map[string]routeRule, len(rules)) for _, rule := range rules { m[rule.LegacyTypeKey] = rule } return m, nil } func (s *fifoStockV2Service) adjustStockableUsedQuantity(tx *gorm.DB, rule routeRule, sourceID uint, delta float64) error { if nearlyZero(delta) || sourceID == 0 { return nil } if rule.UsedQuantityCol == nil || strings.TrimSpace(*rule.UsedQuantityCol) == "" { return nil } usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol) sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) sourceTable, _ := mustSafeIdentifier(rule.SourceTable) expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usedCol) return tx.Table(sourceTable). Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). Update(usedCol, gorm.Expr(expr, delta)).Error } func (s *fifoStockV2Service) applyUsableDeltas(tx *gorm.DB, rule routeRule, sourceID uint, usageDelta, pendingDelta float64) error { if sourceID == 0 || (nearlyZero(usageDelta) && nearlyZero(pendingDelta)) { return nil } sourceTable, _ := mustSafeIdentifier(rule.SourceTable) sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) usageCol, _ := mustSafeIdentifier(rule.QuantityCol) updates := map[string]any{} if !nearlyZero(usageDelta) { expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usageCol) updates[usageCol] = gorm.Expr(expr, usageDelta) } if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" && !nearlyZero(pendingDelta) { pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", pendingCol) updates[pendingCol] = gorm.Expr(expr, pendingDelta) } if len(updates) == 0 { return nil } return tx.Table(sourceTable). Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). Updates(updates).Error } func (s *fifoStockV2Service) resetUsableQuantities(tx *gorm.DB, rule routeRule, sourceID uint) error { if sourceID == 0 { return nil } sourceTable, _ := mustSafeIdentifier(rule.SourceTable) sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) usageCol, _ := mustSafeIdentifier(rule.QuantityCol) updates := map[string]any{usageCol: 0} if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" { pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) updates[pendingCol] = 0 } return tx.Table(sourceTable). Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID). Updates(updates).Error } func (s *fifoStockV2Service) resolveRollbackFlagGroup(ctx context.Context, tx *gorm.DB, req RollbackRequest) (string, error) { type row struct { FlagGroupCode string `gorm:"column:flag_group_code"` } var latest row latestQuery := tx.WithContext(ctx). Table("stock_allocations"). Select("flag_group_code"). Where("usable_type = ? AND usable_id = ?", req.Usable.LegacyTypeKey, req.Usable.ID). Where("engine_version = 'v2'"). Where("allocation_purpose = ?", defaultAllocationPurpose()). Where("flag_group_code IS NOT NULL AND flag_group_code <> ''") if code := strings.TrimSpace(req.Usable.FunctionCode); code != "" { latestQuery = latestQuery.Where("function_code = ?", code) } err := latestQuery.Order("id DESC").Limit(1).Take(&latest).Error if err == nil && strings.TrimSpace(latest.FlagGroupCode) != "" { return latest.FlagGroupCode, nil } if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return "", err } rulesQuery := tx.WithContext(ctx). Table("fifo_stock_v2_route_rules"). Where("is_active = TRUE"). Where("lane = ?", string(LaneUsable)). Where("legacy_type_key = ?", req.Usable.LegacyTypeKey) if code := strings.TrimSpace(req.Usable.FunctionCode); code != "" { rulesQuery = rulesQuery.Where("function_code = ?", code) } var rules []routeRule err = rulesQuery.Find(&rules).Error if err != nil { return "", err } if len(rules) == 0 { return "", fmt.Errorf("cannot resolve flag group for usable type %s", req.Usable.LegacyTypeKey) } if len(rules) > 1 && req.ProductWarehouseID != 0 { type candidateRow struct { FlagGroupCode string `gorm:"column:flag_group_code"` } var candidates []candidateRow byProductQuery := tx.WithContext(ctx). Table("fifo_stock_v2_route_rules rr"). Select("DISTINCT rr.flag_group_code"). Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). Where("rr.is_active = TRUE"). Where("rr.lane = ?", string(LaneUsable)). Where("rr.legacy_type_key = ?", req.Usable.LegacyTypeKey). Where(` EXISTS ( SELECT 1 FROM product_warehouses pw JOIN flags f ON f.flagable_id = pw.product_id JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE WHERE pw.id = ? AND f.flagable_type = 'products' AND fm.flag_group_code = rr.flag_group_code ) `, req.ProductWarehouseID) if code := strings.TrimSpace(req.Usable.FunctionCode); code != "" { byProductQuery = byProductQuery.Where("rr.function_code = ?", code) } if err := byProductQuery.Order("rr.flag_group_code ASC").Scan(&candidates).Error; err != nil { return "", err } if len(candidates) == 1 { return strings.TrimSpace(candidates[0].FlagGroupCode), nil } } if len(rules) > 1 { return "", fmt.Errorf("ambiguous rollback flag group for usable type %s", req.Usable.LegacyTypeKey) } return rules[0].FlagGroupCode, nil } func (s *fifoStockV2Service) validateAllocateRequest(req AllocateRequest) error { if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 { return fmt.Errorf("%w: missing flag group or product warehouse", ErrInvalidRequest) } if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" { return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest) } if req.NeedQty < 0 { return fmt.Errorf("%w: need qty must be >= 0", ErrInvalidRequest) } return nil } func (s *fifoStockV2Service) validateRollbackRequest(req RollbackRequest) error { if req.ProductWarehouseID == 0 { return fmt.Errorf("%w: product warehouse is required", ErrInvalidRequest) } if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" { return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest) } if req.ReleaseQty != nil && *req.ReleaseQty < 0 { return fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest) } return nil }