Files
lti-api/internal/common/service/fifo_stock_v2/service.go
T

278 lines
7.3 KiB
Go

package fifo_stock_v2
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math"
"regexp"
"strings"
"github.com/sirupsen/logrus"
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
"gorm.io/gorm"
)
var identifierPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
type fifoStockV2Service struct {
db *gorm.DB
logger *logrus.Logger
defaultGatherLimit int
}
func NewService(db *gorm.DB, logger *logrus.Logger) Service {
if logger == nil {
logger = logrus.StandardLogger()
}
return &fifoStockV2Service{
db: db,
logger: logger,
defaultGatherLimit: 1000,
}
}
func (s *fifoStockV2Service) withTransaction(
ctx context.Context,
tx *gorm.DB,
fn func(*gorm.DB) error,
) error {
if tx != nil {
return fn(tx.WithContext(ctx))
}
return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error {
return fn(inner)
})
}
func isSafeIdentifier(v string) bool {
return identifierPattern.MatchString(strings.TrimSpace(v))
}
func mustSafeIdentifier(v string) (string, error) {
v = strings.TrimSpace(v)
if !isSafeIdentifier(v) {
return "", fmt.Errorf("unsafe identifier: %s", v)
}
return v, nil
}
func requestHash(v any) string {
payload, _ := json.Marshal(v)
sum := sha256.Sum256(payload)
return hex.EncodeToString(sum[:])
}
func shardLockKey(flagGroupCode string, productWarehouseID uint) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(strings.TrimSpace(strings.ToUpper(flagGroupCode))))
_, _ = h.Write([]byte("|"))
_, _ = h.Write([]byte(fmt.Sprintf("%d", productWarehouseID)))
return int64(h.Sum64())
}
func (s *fifoStockV2Service) lockShard(tx *gorm.DB, flagGroupCode string, productWarehouseID uint) error {
if strings.TrimSpace(flagGroupCode) == "" || productWarehouseID == 0 {
return fmt.Errorf("lock shard requires flag group and product warehouse")
}
return tx.Exec("SELECT pg_advisory_xact_lock(?)", shardLockKey(flagGroupCode, productWarehouseID)).Error
}
type operationLogRow struct {
ID uint `gorm:"column:id"`
Status string `gorm:"column:status"`
RequestHash string `gorm:"column:request_hash"`
ResultPayload json.RawMessage `gorm:"column:result_payload"`
}
func (s *fifoStockV2Service) beginOperation(
tx *gorm.DB,
op Operation,
idempotencyKey string,
requestHashValue string,
productWarehouseID uint,
flagGroupCode string,
usableType string,
usableID uint,
) (*operationLogRow, bool, error) {
if strings.TrimSpace(idempotencyKey) == "" {
return nil, false, nil
}
inserted := operationLogRow{}
insertSQL := `
INSERT INTO fifo_stock_v2_operation_log
(idempotency_key, operation, product_warehouse_id, flag_group_code, usable_type, usable_id, request_hash, status, created_at)
VALUES (?, ?, ?, ?, NULLIF(?, ''), NULLIF(?, 0), ?, 'RUNNING', NOW())
ON CONFLICT (idempotency_key, operation) DO NOTHING
RETURNING id, status, request_hash
`
if err := tx.Raw(insertSQL,
idempotencyKey,
string(op),
productWarehouseID,
flagGroupCode,
usableType,
usableID,
requestHashValue,
).Scan(&inserted).Error; err != nil {
return nil, false, err
}
if inserted.ID != 0 {
return &inserted, false, nil
}
existing := operationLogRow{}
if err := tx.Table("fifo_stock_v2_operation_log").
Select("id, status, request_hash, result_payload").
Where("idempotency_key = ? AND operation = ?", idempotencyKey, string(op)).
Take(&existing).Error; err != nil {
return nil, false, err
}
if existing.RequestHash != requestHashValue {
return nil, false, fmt.Errorf("idempotency key %s reused with different payload", idempotencyKey)
}
switch strings.ToUpper(existing.Status) {
case "DONE":
return &existing, true, nil
case "RUNNING":
return nil, false, fmt.Errorf("operation %s with idempotency key %s is still running", op, idempotencyKey)
case "FAILED":
if err := tx.Table("fifo_stock_v2_operation_log").
Where("id = ?", existing.ID).
Updates(map[string]any{
"status": "RUNNING",
"error_text": nil,
"finished_at": nil,
}).Error; err != nil {
return nil, false, err
}
existing.Status = "RUNNING"
return &existing, false, nil
default:
return nil, false, fmt.Errorf("unknown operation status: %s", existing.Status)
}
}
func (s *fifoStockV2Service) finishOperation(tx *gorm.DB, logRow *operationLogRow, payload any) error {
if logRow == nil || logRow.ID == 0 {
return nil
}
encoded, err := json.Marshal(payload)
if err != nil {
return err
}
return tx.Table("fifo_stock_v2_operation_log").
Where("id = ?", logRow.ID).
Updates(map[string]any{
"status": "DONE",
"result_payload": encoded,
"finished_at": gorm.Expr("NOW()"),
}).Error
}
func (s *fifoStockV2Service) failOperation(tx *gorm.DB, logRow *operationLogRow, failure error) {
if logRow == nil || logRow.ID == 0 || failure == nil {
return
}
_ = tx.Table("fifo_stock_v2_operation_log").
Where("id = ?", logRow.ID).
Updates(map[string]any{
"status": "FAILED",
"error_text": failure.Error(),
"finished_at": gorm.Expr("NOW()"),
}).Error
}
func (s *fifoStockV2Service) resolveOverConsume(
tx *gorm.DB,
flagGroupCode string,
functionCode string,
lane Lane,
defaultValue bool,
) (bool, error) {
type row struct {
Allow bool `gorm:"column:allow_overconsume"`
}
selected := row{}
err := tx.Table("fifo_stock_v2_overconsume_rules").
Select("allow_overconsume").
Where("is_active = TRUE").
Where("lane = ?", string(lane)).
Where("(flag_group_code IS NULL OR flag_group_code = ?)", flagGroupCode).
Where("(function_code IS NULL OR function_code = ?)", functionCode).
Order("CASE WHEN flag_group_code IS NULL THEN 1 ELSE 0 END ASC").
Order("CASE WHEN function_code IS NULL THEN 1 ELSE 0 END ASC").
Order("priority ASC, id ASC").
Limit(1).
Take(&selected).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return defaultValue, nil
}
return false, err
}
return selected.Allow, nil
}
func (s *fifoStockV2Service) adjustProductWarehouseQty(tx *gorm.DB, productWarehouseID uint, delta float64) error {
if productWarehouseID == 0 || delta == 0 {
return nil
}
return tx.Table("product_warehouses").
Where("id = ?", productWarehouseID).
Update("qty", gorm.Expr("COALESCE(qty,0) + ?", delta)).Error
}
func nearlyZero(v float64) bool {
return math.Abs(v) < 1e-6
}
func (s *fifoStockV2Service) ensureStockAllocationColumns(tx *gorm.DB) error {
checkCols := []string{"engine_version", "flag_group_code", "function_code", "idempotency_key", "allocation_purpose"}
for _, col := range checkCols {
var count int64
err := tx.Raw(`
SELECT COUNT(1)
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'stock_allocations' AND column_name = ?
`, col).Scan(&count).Error
if err != nil {
return err
}
if count == 0 {
return fmt.Errorf("stock_allocations.%s does not exist, run fifo_stock_v2 migration first", col)
}
}
return nil
}
func activeAllocationStatus() string {
return entity.StockAllocationStatusActive
}
func releasedAllocationStatus() string {
return entity.StockAllocationStatusReleased
}
func defaultAllocationPurpose() string {
return entity.StockAllocationPurposeConsume
}
func normalizeAllocationPurpose(purpose string) string {
purpose = strings.TrimSpace(strings.ToUpper(purpose))
if purpose == "" {
return defaultAllocationPurpose()
}
return purpose
}