package fifo_stock_v2 import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "hash/fnv" "math" "regexp" "strings" "github.com/sirupsen/logrus" entity "gitlab.com/mbugroup/lti-api.git/internal/entities" "gorm.io/gorm" ) var identifierPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) type fifoStockV2Service struct { db *gorm.DB logger *logrus.Logger defaultGatherLimit int } func NewService(db *gorm.DB, logger *logrus.Logger) Service { if logger == nil { logger = logrus.StandardLogger() } return &fifoStockV2Service{ db: db, logger: logger, defaultGatherLimit: 1000, } } func (s *fifoStockV2Service) withTransaction( ctx context.Context, tx *gorm.DB, fn func(*gorm.DB) error, ) error { if tx != nil { return fn(tx.WithContext(ctx)) } return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error { return fn(inner) }) } func isSafeIdentifier(v string) bool { return identifierPattern.MatchString(strings.TrimSpace(v)) } func mustSafeIdentifier(v string) (string, error) { v = strings.TrimSpace(v) if !isSafeIdentifier(v) { return "", fmt.Errorf("unsafe identifier: %s", v) } return v, nil } func requestHash(v any) string { payload, _ := json.Marshal(v) sum := sha256.Sum256(payload) return hex.EncodeToString(sum[:]) } func shardLockKey(flagGroupCode string, productWarehouseID uint) int64 { h := fnv.New64a() _, _ = h.Write([]byte(strings.TrimSpace(strings.ToUpper(flagGroupCode)))) _, _ = h.Write([]byte("|")) _, _ = h.Write([]byte(fmt.Sprintf("%d", productWarehouseID))) return int64(h.Sum64()) } func (s *fifoStockV2Service) lockShard(tx *gorm.DB, flagGroupCode string, productWarehouseID uint) error { if strings.TrimSpace(flagGroupCode) == "" || productWarehouseID == 0 { return fmt.Errorf("lock shard requires flag group and product warehouse") } return tx.Exec("SELECT pg_advisory_xact_lock(?)", shardLockKey(flagGroupCode, productWarehouseID)).Error } type operationLogRow struct { ID uint `gorm:"column:id"` Status string `gorm:"column:status"` RequestHash string `gorm:"column:request_hash"` ResultPayload json.RawMessage `gorm:"column:result_payload"` } func (s *fifoStockV2Service) beginOperation( tx *gorm.DB, op Operation, idempotencyKey string, requestHashValue string, productWarehouseID uint, flagGroupCode string, usableType string, usableID uint, ) (*operationLogRow, bool, error) { if strings.TrimSpace(idempotencyKey) == "" { return nil, false, nil } inserted := operationLogRow{} insertSQL := ` INSERT INTO fifo_stock_v2_operation_log (idempotency_key, operation, product_warehouse_id, flag_group_code, usable_type, usable_id, request_hash, status, created_at) VALUES (?, ?, ?, ?, NULLIF(?, ''), NULLIF(?, 0), ?, 'RUNNING', NOW()) ON CONFLICT (idempotency_key, operation) DO NOTHING RETURNING id, status, request_hash ` if err := tx.Raw(insertSQL, idempotencyKey, string(op), productWarehouseID, flagGroupCode, usableType, usableID, requestHashValue, ).Scan(&inserted).Error; err != nil { return nil, false, err } if inserted.ID != 0 { return &inserted, false, nil } existing := operationLogRow{} if err := tx.Table("fifo_stock_v2_operation_log"). Select("id, status, request_hash, result_payload"). Where("idempotency_key = ? AND operation = ?", idempotencyKey, string(op)). Take(&existing).Error; err != nil { return nil, false, err } if existing.RequestHash != requestHashValue { return nil, false, fmt.Errorf("idempotency key %s reused with different payload", idempotencyKey) } switch strings.ToUpper(existing.Status) { case "DONE": return &existing, true, nil case "RUNNING": return nil, false, fmt.Errorf("operation %s with idempotency key %s is still running", op, idempotencyKey) case "FAILED": if err := tx.Table("fifo_stock_v2_operation_log"). Where("id = ?", existing.ID). Updates(map[string]any{ "status": "RUNNING", "error_text": nil, "finished_at": nil, }).Error; err != nil { return nil, false, err } existing.Status = "RUNNING" return &existing, false, nil default: return nil, false, fmt.Errorf("unknown operation status: %s", existing.Status) } } func (s *fifoStockV2Service) finishOperation(tx *gorm.DB, logRow *operationLogRow, payload any) error { if logRow == nil || logRow.ID == 0 { return nil } encoded, err := json.Marshal(payload) if err != nil { return err } return tx.Table("fifo_stock_v2_operation_log"). Where("id = ?", logRow.ID). Updates(map[string]any{ "status": "DONE", "result_payload": encoded, "finished_at": gorm.Expr("NOW()"), }).Error } func (s *fifoStockV2Service) failOperation(tx *gorm.DB, logRow *operationLogRow, failure error) { if logRow == nil || logRow.ID == 0 || failure == nil { return } _ = tx.Table("fifo_stock_v2_operation_log"). Where("id = ?", logRow.ID). Updates(map[string]any{ "status": "FAILED", "error_text": failure.Error(), "finished_at": gorm.Expr("NOW()"), }).Error } func (s *fifoStockV2Service) resolveOverConsume( tx *gorm.DB, flagGroupCode string, functionCode string, lane Lane, defaultValue bool, ) (bool, error) { type row struct { Allow bool `gorm:"column:allow_overconsume"` } selected := row{} err := tx.Table("fifo_stock_v2_overconsume_rules"). Select("allow_overconsume"). Where("is_active = TRUE"). Where("lane = ?", string(lane)). Where("(flag_group_code IS NULL OR flag_group_code = ?)", flagGroupCode). Where("(function_code IS NULL OR function_code = ?)", functionCode). Order("CASE WHEN flag_group_code IS NULL THEN 1 ELSE 0 END ASC"). Order("CASE WHEN function_code IS NULL THEN 1 ELSE 0 END ASC"). Order("priority ASC, id ASC"). Limit(1). Take(&selected).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return defaultValue, nil } return false, err } return selected.Allow, nil } func (s *fifoStockV2Service) adjustProductWarehouseQty(tx *gorm.DB, productWarehouseID uint, delta float64) error { if productWarehouseID == 0 || delta == 0 { return nil } return tx.Table("product_warehouses"). Where("id = ?", productWarehouseID). Update("qty", gorm.Expr("COALESCE(qty,0) + ?", delta)).Error } func nearlyZero(v float64) bool { return math.Abs(v) < 1e-6 } func (s *fifoStockV2Service) ensureStockAllocationColumns(tx *gorm.DB) error { checkCols := []string{"engine_version", "flag_group_code", "function_code", "idempotency_key", "allocation_purpose"} for _, col := range checkCols { var count int64 err := tx.Raw(` SELECT COUNT(1) FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'stock_allocations' AND column_name = ? `, col).Scan(&count).Error if err != nil { return err } if count == 0 { return fmt.Errorf("stock_allocations.%s does not exist, run fifo_stock_v2 migration first", col) } } return nil } func activeAllocationStatus() string { return entity.StockAllocationStatusActive } func releasedAllocationStatus() string { return entity.StockAllocationStatusReleased } func defaultAllocationPurpose() string { return entity.StockAllocationPurposeConsume } func normalizeAllocationPurpose(purpose string) string { purpose = strings.TrimSpace(strings.ToUpper(purpose)) if purpose == "" { return defaultAllocationPurpose() } return purpose }