mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
278 lines
7.3 KiB
Go
278 lines
7.3 KiB
Go
package fifo_stock_v2
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"math"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var identifierPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
|
|
|
|
type fifoStockV2Service struct {
|
|
db *gorm.DB
|
|
logger *logrus.Logger
|
|
defaultGatherLimit int
|
|
}
|
|
|
|
func NewService(db *gorm.DB, logger *logrus.Logger) Service {
|
|
if logger == nil {
|
|
logger = logrus.StandardLogger()
|
|
}
|
|
|
|
return &fifoStockV2Service{
|
|
db: db,
|
|
logger: logger,
|
|
defaultGatherLimit: 1000,
|
|
}
|
|
}
|
|
|
|
func (s *fifoStockV2Service) withTransaction(
|
|
ctx context.Context,
|
|
tx *gorm.DB,
|
|
fn func(*gorm.DB) error,
|
|
) error {
|
|
if tx != nil {
|
|
return fn(tx.WithContext(ctx))
|
|
}
|
|
return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error {
|
|
return fn(inner)
|
|
})
|
|
}
|
|
|
|
func isSafeIdentifier(v string) bool {
|
|
return identifierPattern.MatchString(strings.TrimSpace(v))
|
|
}
|
|
|
|
func mustSafeIdentifier(v string) (string, error) {
|
|
v = strings.TrimSpace(v)
|
|
if !isSafeIdentifier(v) {
|
|
return "", fmt.Errorf("unsafe identifier: %s", v)
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
func requestHash(v any) string {
|
|
payload, _ := json.Marshal(v)
|
|
sum := sha256.Sum256(payload)
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
func shardLockKey(flagGroupCode string, productWarehouseID uint) int64 {
|
|
h := fnv.New64a()
|
|
_, _ = h.Write([]byte(strings.TrimSpace(strings.ToUpper(flagGroupCode))))
|
|
_, _ = h.Write([]byte("|"))
|
|
_, _ = h.Write([]byte(fmt.Sprintf("%d", productWarehouseID)))
|
|
return int64(h.Sum64())
|
|
}
|
|
|
|
func (s *fifoStockV2Service) lockShard(tx *gorm.DB, flagGroupCode string, productWarehouseID uint) error {
|
|
if strings.TrimSpace(flagGroupCode) == "" || productWarehouseID == 0 {
|
|
return fmt.Errorf("lock shard requires flag group and product warehouse")
|
|
}
|
|
return tx.Exec("SELECT pg_advisory_xact_lock(?)", shardLockKey(flagGroupCode, productWarehouseID)).Error
|
|
}
|
|
|
|
type operationLogRow struct {
|
|
ID uint `gorm:"column:id"`
|
|
Status string `gorm:"column:status"`
|
|
RequestHash string `gorm:"column:request_hash"`
|
|
ResultPayload json.RawMessage `gorm:"column:result_payload"`
|
|
}
|
|
|
|
func (s *fifoStockV2Service) beginOperation(
|
|
tx *gorm.DB,
|
|
op Operation,
|
|
idempotencyKey string,
|
|
requestHashValue string,
|
|
productWarehouseID uint,
|
|
flagGroupCode string,
|
|
usableType string,
|
|
usableID uint,
|
|
) (*operationLogRow, bool, error) {
|
|
if strings.TrimSpace(idempotencyKey) == "" {
|
|
return nil, false, nil
|
|
}
|
|
|
|
inserted := operationLogRow{}
|
|
insertSQL := `
|
|
INSERT INTO fifo_stock_v2_operation_log
|
|
(idempotency_key, operation, product_warehouse_id, flag_group_code, usable_type, usable_id, request_hash, status, created_at)
|
|
VALUES (?, ?, ?, ?, NULLIF(?, ''), NULLIF(?, 0), ?, 'RUNNING', NOW())
|
|
ON CONFLICT (idempotency_key, operation) DO NOTHING
|
|
RETURNING id, status, request_hash
|
|
`
|
|
if err := tx.Raw(insertSQL,
|
|
idempotencyKey,
|
|
string(op),
|
|
productWarehouseID,
|
|
flagGroupCode,
|
|
usableType,
|
|
usableID,
|
|
requestHashValue,
|
|
).Scan(&inserted).Error; err != nil {
|
|
return nil, false, err
|
|
}
|
|
if inserted.ID != 0 {
|
|
return &inserted, false, nil
|
|
}
|
|
|
|
existing := operationLogRow{}
|
|
if err := tx.Table("fifo_stock_v2_operation_log").
|
|
Select("id, status, request_hash, result_payload").
|
|
Where("idempotency_key = ? AND operation = ?", idempotencyKey, string(op)).
|
|
Take(&existing).Error; err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
if existing.RequestHash != requestHashValue {
|
|
return nil, false, fmt.Errorf("idempotency key %s reused with different payload", idempotencyKey)
|
|
}
|
|
|
|
switch strings.ToUpper(existing.Status) {
|
|
case "DONE":
|
|
return &existing, true, nil
|
|
case "RUNNING":
|
|
return nil, false, fmt.Errorf("operation %s with idempotency key %s is still running", op, idempotencyKey)
|
|
case "FAILED":
|
|
if err := tx.Table("fifo_stock_v2_operation_log").
|
|
Where("id = ?", existing.ID).
|
|
Updates(map[string]any{
|
|
"status": "RUNNING",
|
|
"error_text": nil,
|
|
"finished_at": nil,
|
|
}).Error; err != nil {
|
|
return nil, false, err
|
|
}
|
|
existing.Status = "RUNNING"
|
|
return &existing, false, nil
|
|
default:
|
|
return nil, false, fmt.Errorf("unknown operation status: %s", existing.Status)
|
|
}
|
|
}
|
|
|
|
func (s *fifoStockV2Service) finishOperation(tx *gorm.DB, logRow *operationLogRow, payload any) error {
|
|
if logRow == nil || logRow.ID == 0 {
|
|
return nil
|
|
}
|
|
|
|
encoded, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Table("fifo_stock_v2_operation_log").
|
|
Where("id = ?", logRow.ID).
|
|
Updates(map[string]any{
|
|
"status": "DONE",
|
|
"result_payload": encoded,
|
|
"finished_at": gorm.Expr("NOW()"),
|
|
}).Error
|
|
}
|
|
|
|
func (s *fifoStockV2Service) failOperation(tx *gorm.DB, logRow *operationLogRow, failure error) {
|
|
if logRow == nil || logRow.ID == 0 || failure == nil {
|
|
return
|
|
}
|
|
_ = tx.Table("fifo_stock_v2_operation_log").
|
|
Where("id = ?", logRow.ID).
|
|
Updates(map[string]any{
|
|
"status": "FAILED",
|
|
"error_text": failure.Error(),
|
|
"finished_at": gorm.Expr("NOW()"),
|
|
}).Error
|
|
}
|
|
|
|
func (s *fifoStockV2Service) resolveOverConsume(
|
|
tx *gorm.DB,
|
|
flagGroupCode string,
|
|
functionCode string,
|
|
lane Lane,
|
|
defaultValue bool,
|
|
) (bool, error) {
|
|
type row struct {
|
|
Allow bool `gorm:"column:allow_overconsume"`
|
|
}
|
|
selected := row{}
|
|
err := tx.Table("fifo_stock_v2_overconsume_rules").
|
|
Select("allow_overconsume").
|
|
Where("is_active = TRUE").
|
|
Where("lane = ?", string(lane)).
|
|
Where("(flag_group_code IS NULL OR flag_group_code = ?)", flagGroupCode).
|
|
Where("(function_code IS NULL OR function_code = ?)", functionCode).
|
|
Order("CASE WHEN flag_group_code IS NULL THEN 1 ELSE 0 END ASC").
|
|
Order("CASE WHEN function_code IS NULL THEN 1 ELSE 0 END ASC").
|
|
Order("priority ASC, id ASC").
|
|
Limit(1).
|
|
Take(&selected).Error
|
|
if err != nil {
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return defaultValue, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return selected.Allow, nil
|
|
}
|
|
|
|
func (s *fifoStockV2Service) adjustProductWarehouseQty(tx *gorm.DB, productWarehouseID uint, delta float64) error {
|
|
if productWarehouseID == 0 || delta == 0 {
|
|
return nil
|
|
}
|
|
return tx.Table("product_warehouses").
|
|
Where("id = ?", productWarehouseID).
|
|
Update("qty", gorm.Expr("COALESCE(qty,0) + ?", delta)).Error
|
|
}
|
|
|
|
func nearlyZero(v float64) bool {
|
|
return math.Abs(v) < 1e-6
|
|
}
|
|
|
|
func (s *fifoStockV2Service) ensureStockAllocationColumns(tx *gorm.DB) error {
|
|
checkCols := []string{"engine_version", "flag_group_code", "function_code", "idempotency_key", "allocation_purpose"}
|
|
for _, col := range checkCols {
|
|
var count int64
|
|
err := tx.Raw(`
|
|
SELECT COUNT(1)
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public' AND table_name = 'stock_allocations' AND column_name = ?
|
|
`, col).Scan(&count).Error
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if count == 0 {
|
|
return fmt.Errorf("stock_allocations.%s does not exist, run fifo_stock_v2 migration first", col)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func activeAllocationStatus() string {
|
|
return entity.StockAllocationStatusActive
|
|
}
|
|
|
|
func releasedAllocationStatus() string {
|
|
return entity.StockAllocationStatusReleased
|
|
}
|
|
|
|
func defaultAllocationPurpose() string {
|
|
return entity.StockAllocationPurposeConsume
|
|
}
|
|
|
|
func normalizeAllocationPurpose(purpose string) string {
|
|
purpose = strings.TrimSpace(strings.ToUpper(purpose))
|
|
if purpose == "" {
|
|
return defaultAllocationPurpose()
|
|
}
|
|
return purpose
|
|
}
|