mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
add base fifo-v2
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
fifoStockV2 "gitlab.com/mbugroup/lti-api.git/internal/common/service/fifo_stock_v2"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type FifoStockV2Service = fifoStockV2.Service
|
||||
|
||||
type FifoStockV2Lane = fifoStockV2.Lane
|
||||
|
||||
type FifoStockV2Ref = fifoStockV2.Ref
|
||||
|
||||
type FifoStockV2GatherRequest = fifoStockV2.GatherRequest
|
||||
|
||||
type FifoStockV2GatherRow = fifoStockV2.GatherRow
|
||||
|
||||
type FifoStockV2AllocateRequest = fifoStockV2.AllocateRequest
|
||||
|
||||
type FifoStockV2AllocateResult = fifoStockV2.AllocateResult
|
||||
|
||||
type FifoStockV2AllocationDetail = fifoStockV2.AllocationDetail
|
||||
|
||||
type FifoStockV2RollbackRequest = fifoStockV2.RollbackRequest
|
||||
|
||||
type FifoStockV2RollbackResult = fifoStockV2.RollbackResult
|
||||
|
||||
type FifoStockV2ReflowRequest = fifoStockV2.ReflowRequest
|
||||
|
||||
type FifoStockV2ReflowResult = fifoStockV2.ReflowResult
|
||||
|
||||
type FifoStockV2RecalculateRequest = fifoStockV2.RecalculateRequest
|
||||
|
||||
type FifoStockV2RecalculateResult = fifoStockV2.RecalculateResult
|
||||
|
||||
type FifoStockV2WarehouseDrift = fifoStockV2.WarehouseDrift
|
||||
|
||||
func NewFifoStockV2Service(db *gorm.DB, logger *logrus.Logger) FifoStockV2Service {
|
||||
return fifoStockV2.NewService(db, logger)
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
# RFC Ringkas: FIFO Stock V2
|
||||
|
||||
## Tujuan
|
||||
`fifo_stock_v2` adalah engine FIFO baru berbasis konfigurasi `Flag Group + Jalur` yang berjalan paralel dengan v1 tanpa memutus kompatibilitas `stock_allocations`, HPP, dan closing/reporting existing.
|
||||
|
||||
## Prinsip
|
||||
- V1 tidak dihapus, V2 jalan paralel.
|
||||
- Semua operasi transactional.
|
||||
- FIFO sorting deterministic lintas tabel.
|
||||
- Default over-consume `ALLOW` (pending), exception dapat `BLOCK`.
|
||||
- Reflow idempotent.
|
||||
- Recalculate bisa memperbaiki drift `product_warehouses.qty`.
|
||||
|
||||
## Komponen
|
||||
- `fifo_stock_v2_flag_groups`: master grouping flag produk.
|
||||
- `fifo_stock_v2_flag_members`: pemetaan flag -> group.
|
||||
- `fifo_stock_v2_traits`: trait sort per `table:date_column` (+ optional join date source).
|
||||
- `fifo_stock_v2_route_rules`: rule per `flag_group + lane + function + table`.
|
||||
- `fifo_stock_v2_overconsume_rules`: policy pending/over-consume.
|
||||
- `fifo_stock_v2_operation_log`: idempotency + audit operasi.
|
||||
- `fifo_stock_v2_reflow_runs` + checkpoints + shadow allocations: bulk reflow resumable/observable.
|
||||
|
||||
## API Service
|
||||
- `Gather`: union cross-table berdasarkan route rules + trait sorting.
|
||||
- `Allocate`: alokasi lot FIFO ke usable.
|
||||
- `Rollback`: batalkan alokasi aktif.
|
||||
- `Reflow`: rollback penuh lalu allocate ulang (idempotent).
|
||||
- `Recalculate`: rekonsiliasi qty warehouse dari ledger FIFO.
|
||||
|
||||
## Deterministic Sorting
|
||||
Urutan gather:
|
||||
1. `sort_at ASC` (dari trait `date_column`)
|
||||
2. `sort_priority ASC`
|
||||
3. `source_table ASC`
|
||||
4. `source_id ASC`
|
||||
|
||||
Fallback waktu: `1970-01-01 00:00:00+00` bila tanggal null.
|
||||
|
||||
## Compat Strategy
|
||||
- Tetap menulis ke `stock_allocations` dengan tambahan metadata:
|
||||
- `engine_version` (`v1`/`v2`)
|
||||
- `flag_group_code`
|
||||
- `function_code`
|
||||
- `idempotency_key`
|
||||
- Query lama yang bergantung `stockable_type/usable_type` tetap berjalan.
|
||||
|
||||
## Migration Strategy
|
||||
1. Deploy schema + seed v2.
|
||||
2. Aktifkan shadow-run comparator v1 vs v2.
|
||||
3. Canary cutover per flag group.
|
||||
4. Full cutover jika parity aman.
|
||||
5. Jalankan bulk reflow existing data.
|
||||
|
||||
## Acceptance Criteria Singkat
|
||||
- Parity mismatch terkendali pada aggregate + detail alokasi.
|
||||
- Tidak ada regression closing/HPP.
|
||||
- Drift qty warehouse turun signifikan pasca reflow.
|
||||
- Rollback via feature flag memungkinkan kembali ke v1.
|
||||
@@ -0,0 +1,660 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type allocationRow struct {
|
||||
ID uint `gorm:"column:id"`
|
||||
ProductWarehouseID uint `gorm:"column:product_warehouse_id"`
|
||||
StockableType string `gorm:"column:stockable_type"`
|
||||
StockableID uint `gorm:"column:stockable_id"`
|
||||
UsableType string `gorm:"column:usable_type"`
|
||||
UsableID uint `gorm:"column:usable_id"`
|
||||
Qty float64 `gorm:"column:qty"`
|
||||
Status string `gorm:"column:status"`
|
||||
CreatedAt time.Time `gorm:"column:created_at"`
|
||||
}
|
||||
|
||||
type usableQtySnapshot struct {
|
||||
Usage float64 `gorm:"column:usage_qty"`
|
||||
Pending float64 `gorm:"column:pending_qty"`
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) Allocate(ctx context.Context, req AllocateRequest) (*AllocateResult, error) {
|
||||
if err := s.validateAllocateRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &AllocateResult{}
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
if err := s.ensureStockAllocationColumns(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := requestHash(map[string]any{
|
||||
"flag_group_code": req.FlagGroupCode,
|
||||
"product_warehouse_id": req.ProductWarehouseID,
|
||||
"usable_type": req.Usable.LegacyTypeKey,
|
||||
"usable_id": req.Usable.ID,
|
||||
"need_qty": req.NeedQty,
|
||||
"as_of": req.AsOf,
|
||||
"allow_over_consume": req.AllowOverConsume,
|
||||
})
|
||||
logRow, reused, err := s.beginOperation(
|
||||
tx,
|
||||
OperationAllocate,
|
||||
req.IdempotencyKey,
|
||||
hash,
|
||||
req.ProductWarehouseID,
|
||||
req.FlagGroupCode,
|
||||
req.Usable.LegacyTypeKey,
|
||||
req.Usable.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reused {
|
||||
if len(logRow.ResultPayload) == 0 {
|
||||
return fmt.Errorf("idempotent allocate has empty payload")
|
||||
}
|
||||
if err := json.Unmarshal(logRow.ResultPayload, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if logRow != nil {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.failOperation(tx, logRow, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
allocated, allocErr := s.allocateInternal(ctx, tx, req)
|
||||
if allocErr != nil {
|
||||
err = allocErr
|
||||
return allocErr
|
||||
}
|
||||
*result = *allocated
|
||||
|
||||
if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil {
|
||||
err = finishErr
|
||||
return finishErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) allocateInternal(ctx context.Context, tx *gorm.DB, req AllocateRequest) (*AllocateResult, error) {
|
||||
usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, req.FlagGroupCode, req.Usable.LegacyTypeKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
allowOverConsume := usableRule.AllowPendingDefault
|
||||
if req.AllowOverConsume != nil {
|
||||
allowOverConsume = *req.AllowOverConsume
|
||||
} else {
|
||||
allowOverConsume, err = s.resolveOverConsume(tx, req.FlagGroupCode, req.Usable.FunctionCode, LaneUsable, allowOverConsume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
gatherRows, err := s.gatherRows(ctx, tx, GatherRequest{
|
||||
FlagGroupCode: req.FlagGroupCode,
|
||||
Lane: LaneStockable,
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
AsOf: req.AsOf,
|
||||
Limit: s.defaultGatherLimit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, req.FlagGroupCode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
remaining := req.NeedQty
|
||||
result := &AllocateResult{Details: make([]AllocationDetail, 0)}
|
||||
|
||||
for _, lot := range gatherRows {
|
||||
if remaining <= 0 {
|
||||
break
|
||||
}
|
||||
if lot.AvailableQuantity <= 0 {
|
||||
continue
|
||||
}
|
||||
portion := math.Min(remaining, lot.AvailableQuantity)
|
||||
if nearlyZero(portion) {
|
||||
continue
|
||||
}
|
||||
|
||||
allocationInsert := map[string]any{
|
||||
"product_warehouse_id": req.ProductWarehouseID,
|
||||
"stockable_type": lot.Ref.LegacyTypeKey,
|
||||
"stockable_id": lot.Ref.ID,
|
||||
"usable_type": req.Usable.LegacyTypeKey,
|
||||
"usable_id": req.Usable.ID,
|
||||
"qty": portion,
|
||||
"status": activeAllocationStatus(),
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"engine_version": "v2",
|
||||
"flag_group_code": req.FlagGroupCode,
|
||||
"function_code": req.Usable.FunctionCode,
|
||||
}
|
||||
if strings.TrimSpace(req.IdempotencyKey) != "" {
|
||||
allocationInsert["idempotency_key"] = req.IdempotencyKey
|
||||
}
|
||||
if err := tx.Table("stock_allocations").Create(allocationInsert).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rule, ok := stockableRuleMap[lot.Ref.LegacyTypeKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing stockable route rule for type %s", lot.Ref.LegacyTypeKey)
|
||||
}
|
||||
if err := s.adjustStockableUsedQuantity(tx, rule, lot.Ref.ID, portion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.Details = append(result.Details, AllocationDetail{
|
||||
StockableType: lot.Ref.LegacyTypeKey,
|
||||
StockableID: lot.Ref.ID,
|
||||
Qty: portion,
|
||||
SortAt: lot.SortAt,
|
||||
})
|
||||
|
||||
remaining -= portion
|
||||
result.AllocatedQty += portion
|
||||
}
|
||||
|
||||
if remaining > 0 {
|
||||
if !allowOverConsume {
|
||||
return nil, fmt.Errorf("%w: requested %.3f, allocated %.3f", ErrInsufficientStock, req.NeedQty, result.AllocatedQty)
|
||||
}
|
||||
result.PendingQty = remaining
|
||||
}
|
||||
|
||||
if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, result.AllocatedQty, result.PendingQty); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, -result.AllocatedQty); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) Rollback(ctx context.Context, req RollbackRequest) (*RollbackResult, error) {
|
||||
if err := s.validateRollbackRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &RollbackResult{}
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
if err := s.ensureStockAllocationColumns(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
flagGroupCode, err := s.resolveRollbackFlagGroup(ctx, tx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.lockShard(tx, flagGroupCode, req.ProductWarehouseID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := requestHash(map[string]any{
|
||||
"product_warehouse_id": req.ProductWarehouseID,
|
||||
"usable_type": req.Usable.LegacyTypeKey,
|
||||
"usable_id": req.Usable.ID,
|
||||
"release_qty": req.ReleaseQty,
|
||||
"reason": req.Reason,
|
||||
"flag_group_code": flagGroupCode,
|
||||
})
|
||||
logRow, reused, beginErr := s.beginOperation(
|
||||
tx,
|
||||
OperationRollback,
|
||||
req.IdempotencyKey,
|
||||
hash,
|
||||
req.ProductWarehouseID,
|
||||
flagGroupCode,
|
||||
req.Usable.LegacyTypeKey,
|
||||
req.Usable.ID,
|
||||
)
|
||||
if beginErr != nil {
|
||||
return beginErr
|
||||
}
|
||||
if reused {
|
||||
if len(logRow.ResultPayload) == 0 {
|
||||
return fmt.Errorf("idempotent rollback has empty payload")
|
||||
}
|
||||
if err := json.Unmarshal(logRow.ResultPayload, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if logRow != nil {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.failOperation(tx, logRow, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
rolled, rollbackErr := s.rollbackInternal(ctx, tx, req, flagGroupCode)
|
||||
if rollbackErr != nil {
|
||||
err = rollbackErr
|
||||
return rollbackErr
|
||||
}
|
||||
*result = *rolled
|
||||
|
||||
if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil {
|
||||
err = finishErr
|
||||
return finishErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) rollbackInternal(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
req RollbackRequest,
|
||||
flagGroupCode string,
|
||||
) (*RollbackResult, error) {
|
||||
usableRule, err := s.loadRouteRuleByLegacyType(ctx, tx, LaneUsable, flagGroupCode, req.Usable.LegacyTypeKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
allocations, err := s.loadActiveAllocations(tx, req.Usable.LegacyTypeKey, req.Usable.ID, req.ProductWarehouseID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(allocations) == 0 {
|
||||
if req.ReleaseQty == nil {
|
||||
if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &RollbackResult{}, nil
|
||||
}
|
||||
|
||||
stockableRuleMap, err := s.loadStockableRuleMap(ctx, tx, flagGroupCode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
target := 0.0
|
||||
for _, alloc := range allocations {
|
||||
target += alloc.Qty
|
||||
}
|
||||
if req.ReleaseQty != nil {
|
||||
if *req.ReleaseQty < 0 {
|
||||
return nil, fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest)
|
||||
}
|
||||
target = *req.ReleaseQty
|
||||
}
|
||||
if nearlyZero(target) {
|
||||
return &RollbackResult{}, nil
|
||||
}
|
||||
|
||||
result := &RollbackResult{Details: make([]AllocationDetail, 0)}
|
||||
now := time.Now()
|
||||
remaining := target
|
||||
|
||||
for _, alloc := range allocations {
|
||||
if remaining <= 0 {
|
||||
break
|
||||
}
|
||||
portion := math.Min(remaining, alloc.Qty)
|
||||
if nearlyZero(portion) {
|
||||
continue
|
||||
}
|
||||
|
||||
if nearlyZero(alloc.Qty - portion) {
|
||||
updates := map[string]any{
|
||||
"status": releasedAllocationStatus(),
|
||||
"released_at": now,
|
||||
"updated_at": now,
|
||||
}
|
||||
if strings.TrimSpace(req.Reason) != "" {
|
||||
updates["note"] = req.Reason
|
||||
}
|
||||
if err := tx.Table("stock_allocations").Where("id = ?", alloc.ID).Updates(updates).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if err := tx.Table("stock_allocations").
|
||||
Where("id = ?", alloc.ID).
|
||||
Updates(map[string]any{
|
||||
"qty": alloc.Qty - portion,
|
||||
"updated_at": now,
|
||||
}).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
stockableRule, ok := stockableRuleMap[alloc.StockableType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing stockable route rule for type %s", alloc.StockableType)
|
||||
}
|
||||
if err := s.adjustStockableUsedQuantity(tx, stockableRule, alloc.StockableID, -portion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.ReleasedQty += portion
|
||||
remaining -= portion
|
||||
result.Details = append(result.Details, AllocationDetail{
|
||||
StockableType: alloc.StockableType,
|
||||
StockableID: alloc.StockableID,
|
||||
Qty: portion,
|
||||
SortAt: alloc.CreatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
if req.ReleaseQty != nil && remaining > 1e-6 {
|
||||
return nil, fmt.Errorf("unable to release %.3f; only %.3f allocation exists", target, result.ReleasedQty)
|
||||
}
|
||||
|
||||
if req.ReleaseQty == nil {
|
||||
if err := s.resetUsableQuantities(tx, *usableRule, req.Usable.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if err := s.applyUsableDeltas(tx, *usableRule, req.Usable.ID, -result.ReleasedQty, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, result.ReleasedQty); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error) {
|
||||
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 || req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" {
|
||||
return nil, fmt.Errorf("%w: invalid reflow request", ErrInvalidRequest)
|
||||
}
|
||||
if req.DesiredQty < 0 {
|
||||
return nil, fmt.Errorf("%w: desired qty must be >= 0", ErrInvalidRequest)
|
||||
}
|
||||
|
||||
result := &ReflowResult{}
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
if err := s.ensureStockAllocationColumns(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.lockShard(tx, req.FlagGroupCode, req.ProductWarehouseID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := requestHash(map[string]any{
|
||||
"flag_group_code": req.FlagGroupCode,
|
||||
"product_warehouse_id": req.ProductWarehouseID,
|
||||
"usable_type": req.Usable.LegacyTypeKey,
|
||||
"usable_id": req.Usable.ID,
|
||||
"desired_qty": req.DesiredQty,
|
||||
"as_of": req.AsOf,
|
||||
"allow_over_consume": req.AllowOverConsume,
|
||||
})
|
||||
logRow, reused, err := s.beginOperation(
|
||||
tx,
|
||||
OperationReflow,
|
||||
req.IdempotencyKey,
|
||||
hash,
|
||||
req.ProductWarehouseID,
|
||||
req.FlagGroupCode,
|
||||
req.Usable.LegacyTypeKey,
|
||||
req.Usable.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reused {
|
||||
if len(logRow.ResultPayload) == 0 {
|
||||
return fmt.Errorf("idempotent reflow has empty payload")
|
||||
}
|
||||
if err := json.Unmarshal(logRow.ResultPayload, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if logRow != nil {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.failOperation(tx, logRow, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
rollbackRes, rollbackErr := s.rollbackInternal(ctx, tx, RollbackRequest{
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
Usable: req.Usable,
|
||||
ReleaseQty: nil,
|
||||
Reason: "reflow reset",
|
||||
}, req.FlagGroupCode)
|
||||
if rollbackErr != nil {
|
||||
err = rollbackErr
|
||||
return rollbackErr
|
||||
}
|
||||
result.Rollback = *rollbackRes
|
||||
|
||||
if req.DesiredQty > 0 {
|
||||
allocateRes, allocateErr := s.allocateInternal(ctx, tx, AllocateRequest{
|
||||
FlagGroupCode: req.FlagGroupCode,
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
Usable: req.Usable,
|
||||
NeedQty: req.DesiredQty,
|
||||
AllowOverConsume: req.AllowOverConsume,
|
||||
AsOf: req.AsOf,
|
||||
})
|
||||
if allocateErr != nil {
|
||||
err = allocateErr
|
||||
return allocateErr
|
||||
}
|
||||
result.Allocate = *allocateRes
|
||||
}
|
||||
|
||||
if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil {
|
||||
err = finishErr
|
||||
return finishErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadActiveAllocations(
|
||||
tx *gorm.DB,
|
||||
usableType string,
|
||||
usableID uint,
|
||||
productWarehouseID uint,
|
||||
) ([]allocationRow, error) {
|
||||
query := tx.Table("stock_allocations").
|
||||
Select("id, product_warehouse_id, stockable_type, stockable_id, usable_type, usable_id, qty, status, created_at").
|
||||
Where("usable_type = ? AND usable_id = ? AND status = ?", usableType, usableID, activeAllocationStatus())
|
||||
if productWarehouseID > 0 {
|
||||
query = query.Where("product_warehouse_id = ?", productWarehouseID)
|
||||
}
|
||||
query = query.Order("created_at DESC, id DESC")
|
||||
|
||||
var rows []allocationRow
|
||||
if err := query.Find(&rows).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadStockableRuleMap(ctx context.Context, tx *gorm.DB, flagGroupCode string) (map[string]routeRule, error) {
|
||||
rules, err := s.loadRouteRules(ctx, tx, flagGroupCode, LaneStockable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := make(map[string]routeRule, len(rules))
|
||||
for _, rule := range rules {
|
||||
m[rule.LegacyTypeKey] = rule
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) adjustStockableUsedQuantity(tx *gorm.DB, rule routeRule, sourceID uint, delta float64) error {
|
||||
if nearlyZero(delta) || sourceID == 0 {
|
||||
return nil
|
||||
}
|
||||
if rule.UsedQuantityCol == nil || strings.TrimSpace(*rule.UsedQuantityCol) == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol)
|
||||
sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn)
|
||||
sourceTable, _ := mustSafeIdentifier(rule.SourceTable)
|
||||
|
||||
expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usedCol)
|
||||
return tx.Table(sourceTable).
|
||||
Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID).
|
||||
Update(usedCol, gorm.Expr(expr, delta)).Error
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) applyUsableDeltas(tx *gorm.DB, rule routeRule, sourceID uint, usageDelta, pendingDelta float64) error {
|
||||
if sourceID == 0 || (nearlyZero(usageDelta) && nearlyZero(pendingDelta)) {
|
||||
return nil
|
||||
}
|
||||
sourceTable, _ := mustSafeIdentifier(rule.SourceTable)
|
||||
sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn)
|
||||
usageCol, _ := mustSafeIdentifier(rule.QuantityCol)
|
||||
|
||||
updates := map[string]any{}
|
||||
if !nearlyZero(usageDelta) {
|
||||
expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", usageCol)
|
||||
updates[usageCol] = gorm.Expr(expr, usageDelta)
|
||||
}
|
||||
if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" && !nearlyZero(pendingDelta) {
|
||||
pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol)
|
||||
expr := fmt.Sprintf("GREATEST(0, COALESCE(%s,0) + ?)", pendingCol)
|
||||
updates[pendingCol] = gorm.Expr(expr, pendingDelta)
|
||||
}
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return tx.Table(sourceTable).
|
||||
Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID).
|
||||
Updates(updates).Error
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) resetUsableQuantities(tx *gorm.DB, rule routeRule, sourceID uint) error {
|
||||
if sourceID == 0 {
|
||||
return nil
|
||||
}
|
||||
sourceTable, _ := mustSafeIdentifier(rule.SourceTable)
|
||||
sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn)
|
||||
usageCol, _ := mustSafeIdentifier(rule.QuantityCol)
|
||||
|
||||
updates := map[string]any{usageCol: 0}
|
||||
if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" {
|
||||
pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol)
|
||||
updates[pendingCol] = 0
|
||||
}
|
||||
|
||||
return tx.Table(sourceTable).
|
||||
Where(fmt.Sprintf("%s = ?", sourceIDCol), sourceID).
|
||||
Updates(updates).Error
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) resolveRollbackFlagGroup(ctx context.Context, tx *gorm.DB, req RollbackRequest) (string, error) {
|
||||
type row struct {
|
||||
FlagGroupCode string `gorm:"column:flag_group_code"`
|
||||
}
|
||||
var latest row
|
||||
err := tx.WithContext(ctx).
|
||||
Table("stock_allocations").
|
||||
Select("flag_group_code").
|
||||
Where("usable_type = ? AND usable_id = ?", req.Usable.LegacyTypeKey, req.Usable.ID).
|
||||
Where("engine_version = 'v2'").
|
||||
Where("flag_group_code IS NOT NULL AND flag_group_code <> ''").
|
||||
Order("id DESC").
|
||||
Limit(1).
|
||||
Take(&latest).Error
|
||||
if err == nil && strings.TrimSpace(latest.FlagGroupCode) != "" {
|
||||
return latest.FlagGroupCode, nil
|
||||
}
|
||||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var rules []routeRule
|
||||
err = tx.WithContext(ctx).
|
||||
Table("fifo_stock_v2_route_rules").
|
||||
Where("is_active = TRUE").
|
||||
Where("lane = ?", string(LaneUsable)).
|
||||
Where("legacy_type_key = ?", req.Usable.LegacyTypeKey).
|
||||
Find(&rules).Error
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(rules) == 0 {
|
||||
return "", fmt.Errorf("cannot resolve flag group for usable type %s", req.Usable.LegacyTypeKey)
|
||||
}
|
||||
if len(rules) > 1 {
|
||||
return "", fmt.Errorf("ambiguous rollback flag group for usable type %s", req.Usable.LegacyTypeKey)
|
||||
}
|
||||
return rules[0].FlagGroupCode, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) validateAllocateRequest(req AllocateRequest) error {
|
||||
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 {
|
||||
return fmt.Errorf("%w: missing flag group or product warehouse", ErrInvalidRequest)
|
||||
}
|
||||
if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" {
|
||||
return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest)
|
||||
}
|
||||
if req.NeedQty < 0 {
|
||||
return fmt.Errorf("%w: need qty must be >= 0", ErrInvalidRequest)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) validateRollbackRequest(req RollbackRequest) error {
|
||||
if req.ProductWarehouseID == 0 {
|
||||
return fmt.Errorf("%w: product warehouse is required", ErrInvalidRequest)
|
||||
}
|
||||
if req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" {
|
||||
return fmt.Errorf("%w: usable id and type are required", ErrInvalidRequest)
|
||||
}
|
||||
if req.ReleaseQty != nil && *req.ReleaseQty < 0 {
|
||||
return fmt.Errorf("%w: release qty must be >= 0", ErrInvalidRequest)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type routeRule struct {
|
||||
ID uint `gorm:"column:id"`
|
||||
FlagGroupCode string `gorm:"column:flag_group_code"`
|
||||
Lane string `gorm:"column:lane"`
|
||||
FunctionCode string `gorm:"column:function_code"`
|
||||
SourceTable string `gorm:"column:source_table"`
|
||||
SourceIDColumn string `gorm:"column:source_id_column"`
|
||||
ProductWarehouseCol string `gorm:"column:product_warehouse_col"`
|
||||
QuantityCol string `gorm:"column:quantity_col"`
|
||||
UsedQuantityCol *string `gorm:"column:used_quantity_col"`
|
||||
PendingQuantityCol *string `gorm:"column:pending_quantity_col"`
|
||||
ScopeSQL *string `gorm:"column:scope_sql"`
|
||||
LegacyTypeKey string `gorm:"column:legacy_type_key"`
|
||||
AllowPendingDefault bool `gorm:"column:allow_pending_default"`
|
||||
}
|
||||
|
||||
type traitRule struct {
|
||||
ID uint `gorm:"column:id"`
|
||||
SourceTable string `gorm:"column:source_table"`
|
||||
Lane string `gorm:"column:lane"`
|
||||
DateTable *string `gorm:"column:date_table"`
|
||||
DateJoinLeftCol *string `gorm:"column:date_join_left_col"`
|
||||
DateJoinRightCol *string `gorm:"column:date_join_right_col"`
|
||||
DateColumn string `gorm:"column:date_column"`
|
||||
FallbackDateColumn *string `gorm:"column:fallback_date_column"`
|
||||
SortPriority int `gorm:"column:sort_priority"`
|
||||
IDColumn string `gorm:"column:id_column"`
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadRouteRules(ctx context.Context, tx *gorm.DB, flagGroupCode string, lane Lane) ([]routeRule, error) {
|
||||
var rules []routeRule
|
||||
err := tx.WithContext(ctx).
|
||||
Table("fifo_stock_v2_route_rules").
|
||||
Where("is_active = TRUE").
|
||||
Where("flag_group_code = ?", flagGroupCode).
|
||||
Where("lane = ?", string(lane)).
|
||||
Order("id ASC").
|
||||
Find(&rules).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, rule := range rules {
|
||||
if err := validateRouteRule(rule); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return rules, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadRouteRuleByLegacyType(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
lane Lane,
|
||||
flagGroupCode string,
|
||||
legacyTypeKey string,
|
||||
) (*routeRule, error) {
|
||||
var rule routeRule
|
||||
err := tx.WithContext(ctx).
|
||||
Table("fifo_stock_v2_route_rules").
|
||||
Where("is_active = TRUE").
|
||||
Where("lane = ?", string(lane)).
|
||||
Where("flag_group_code = ?", flagGroupCode).
|
||||
Where("legacy_type_key = ?", legacyTypeKey).
|
||||
Order("id ASC").
|
||||
Limit(1).
|
||||
Take(&rule).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := validateRouteRule(rule); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rule, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadTraitMap(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
lane Lane,
|
||||
sourceTables []string,
|
||||
) (map[string]traitRule, error) {
|
||||
if len(sourceTables) == 0 {
|
||||
return map[string]traitRule{}, nil
|
||||
}
|
||||
|
||||
var traits []traitRule
|
||||
err := tx.WithContext(ctx).
|
||||
Table("fifo_stock_v2_traits").
|
||||
Where("is_active = TRUE").
|
||||
Where("lane = ?", string(lane)).
|
||||
Where("source_table IN ?", sourceTables).
|
||||
Find(&traits).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(map[string]traitRule, len(traits))
|
||||
for _, tr := range traits {
|
||||
if err := validateTraitRule(tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[tr.SourceTable] = tr
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func validateRouteRule(rule routeRule) error {
|
||||
fields := []string{rule.SourceTable, rule.SourceIDColumn, rule.ProductWarehouseCol, rule.QuantityCol}
|
||||
for _, value := range fields {
|
||||
if _, err := mustSafeIdentifier(value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rule.UsedQuantityCol != nil {
|
||||
if _, err := mustSafeIdentifier(*rule.UsedQuantityCol); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rule.PendingQuantityCol != nil {
|
||||
if _, err := mustSafeIdentifier(*rule.PendingQuantityCol); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(rule.LegacyTypeKey) == "" {
|
||||
return fmt.Errorf("route rule has empty legacy type key")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateTraitRule(rule traitRule) error {
|
||||
if _, err := mustSafeIdentifier(rule.SourceTable); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := mustSafeIdentifier(rule.DateColumn); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := mustSafeIdentifier(rule.IDColumn); err != nil {
|
||||
return err
|
||||
}
|
||||
if rule.DateTable != nil {
|
||||
if _, err := mustSafeIdentifier(*rule.DateTable); err != nil {
|
||||
return err
|
||||
}
|
||||
if rule.DateJoinLeftCol == nil || rule.DateJoinRightCol == nil {
|
||||
return fmt.Errorf("trait %s requires date join columns", rule.SourceTable)
|
||||
}
|
||||
if _, err := mustSafeIdentifier(*rule.DateJoinLeftCol); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := mustSafeIdentifier(*rule.DateJoinRightCol); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if rule.FallbackDateColumn != nil {
|
||||
if _, err := mustSafeIdentifier(*rule.FallbackDateColumn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrInvalidRequest = errors.New("invalid fifo stock v2 request")
|
||||
ErrInsufficientStock = errors.New("insufficient stock")
|
||||
)
|
||||
@@ -0,0 +1,268 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type gatherSQLRow struct {
|
||||
SourceTable string `gorm:"column:source_table"`
|
||||
LegacyTypeKey string `gorm:"column:legacy_type_key"`
|
||||
FunctionCode string `gorm:"column:function_code"`
|
||||
SourceID uint `gorm:"column:source_id"`
|
||||
ProductWarehouseID uint `gorm:"column:product_warehouse_id"`
|
||||
SortAt time.Time `gorm:"column:sort_at"`
|
||||
SortPriority int `gorm:"column:sort_priority"`
|
||||
Quantity float64 `gorm:"column:quantity"`
|
||||
UsedQuantity float64 `gorm:"column:used_quantity"`
|
||||
PendingQuantity float64 `gorm:"column:pending_quantity"`
|
||||
AvailableQuantity float64 `gorm:"column:available_quantity"`
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error) {
|
||||
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 {
|
||||
return nil, fmt.Errorf("%w: flag group and product warehouse are required", ErrInvalidRequest)
|
||||
}
|
||||
if req.Lane != LaneStockable && req.Lane != LaneUsable {
|
||||
return nil, fmt.Errorf("%w: unsupported lane %q", ErrInvalidRequest, req.Lane)
|
||||
}
|
||||
|
||||
var out []GatherRow
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
rows, err := s.gatherRows(ctx, tx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out = rows
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) gatherRows(ctx context.Context, tx *gorm.DB, req GatherRequest) ([]GatherRow, error) {
|
||||
rules, err := s.loadRouteRules(ctx, tx, req.FlagGroupCode, req.Lane)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rules) == 0 {
|
||||
return []GatherRow{}, nil
|
||||
}
|
||||
|
||||
tables := make([]string, 0, len(rules))
|
||||
for _, rule := range rules {
|
||||
tables = append(tables, rule.SourceTable)
|
||||
}
|
||||
|
||||
traits, err := s.loadTraitMap(ctx, tx, req.Lane, tables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
subqueries := make([]string, 0, len(rules))
|
||||
args := make([]any, 0, len(rules)*10)
|
||||
|
||||
for _, rule := range rules {
|
||||
trait, ok := traits[rule.SourceTable]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing trait for table %s lane %s", rule.SourceTable, req.Lane)
|
||||
}
|
||||
subSQL, subArgs, err := s.buildGatherSubquery(rule, trait, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
subqueries = append(subqueries, subSQL)
|
||||
args = append(args, subArgs...)
|
||||
}
|
||||
|
||||
if len(subqueries) == 0 {
|
||||
return []GatherRow{}, nil
|
||||
}
|
||||
|
||||
limit := req.Limit
|
||||
if limit <= 0 {
|
||||
limit = s.defaultGatherLimit
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
query := "SELECT * FROM (" + strings.Join(subqueries, " UNION ALL ") + ") AS g"
|
||||
if req.AfterSortAt != nil {
|
||||
query += `
|
||||
WHERE
|
||||
(g.sort_at > ?)
|
||||
OR (g.sort_at = ? AND g.source_table > ?)
|
||||
OR (g.sort_at = ? AND g.source_table = ? AND g.source_id > ?)
|
||||
`
|
||||
args = append(args,
|
||||
*req.AfterSortAt,
|
||||
*req.AfterSortAt, req.AfterSourceTable,
|
||||
*req.AfterSortAt, req.AfterSourceTable, req.AfterSourceID,
|
||||
)
|
||||
}
|
||||
query += " ORDER BY g.sort_at ASC, g.sort_priority ASC, g.source_table ASC, g.source_id ASC LIMIT ?"
|
||||
args = append(args, limit)
|
||||
|
||||
var rows []gatherSQLRow
|
||||
if err := tx.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]GatherRow, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
out = append(out, GatherRow{
|
||||
Ref: Ref{
|
||||
Table: row.SourceTable,
|
||||
ID: row.SourceID,
|
||||
LegacyTypeKey: row.LegacyTypeKey,
|
||||
FunctionCode: row.FunctionCode,
|
||||
},
|
||||
FlagGroupCode: req.FlagGroupCode,
|
||||
ProductWarehouseID: row.ProductWarehouseID,
|
||||
SortAt: row.SortAt,
|
||||
SortPriority: row.SortPriority,
|
||||
Quantity: row.Quantity,
|
||||
UsedQuantity: row.UsedQuantity,
|
||||
PendingQuantity: row.PendingQuantity,
|
||||
AvailableQuantity: row.AvailableQuantity,
|
||||
SourceTable: row.SourceTable,
|
||||
SourceID: row.SourceID,
|
||||
})
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule, req GatherRequest) (string, []any, error) {
|
||||
sourceTable, _ := mustSafeIdentifier(rule.SourceTable)
|
||||
sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn)
|
||||
productWarehouseCol, _ := mustSafeIdentifier(rule.ProductWarehouseCol)
|
||||
quantityCol, _ := mustSafeIdentifier(rule.QuantityCol)
|
||||
|
||||
baseQtyExpr := fmt.Sprintf("COALESCE(src.%s,0)::numeric", quantityCol)
|
||||
usedExpr := "0::numeric"
|
||||
pendingExpr := "0::numeric"
|
||||
availableExpr := baseQtyExpr
|
||||
extraArgs := make([]any, 0, 1)
|
||||
|
||||
if req.Lane == LaneStockable {
|
||||
if rule.UsedQuantityCol != nil && strings.TrimSpace(*rule.UsedQuantityCol) != "" {
|
||||
usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol)
|
||||
usedExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", usedCol)
|
||||
} else {
|
||||
usedExpr = fmt.Sprintf(
|
||||
"(SELECT COALESCE(SUM(sa.qty),0)::numeric FROM stock_allocations sa WHERE sa.stockable_type = ? AND sa.stockable_id = src.%s AND sa.status = '%s')",
|
||||
sourceIDCol,
|
||||
activeAllocationStatus(),
|
||||
)
|
||||
extraArgs = append(extraArgs, rule.LegacyTypeKey)
|
||||
}
|
||||
availableExpr = fmt.Sprintf("(%s - %s)", baseQtyExpr, usedExpr)
|
||||
} else {
|
||||
if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" {
|
||||
pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol)
|
||||
pendingExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", pendingCol)
|
||||
}
|
||||
availableExpr = baseQtyExpr
|
||||
}
|
||||
|
||||
sortExpr, joinClause, err := buildSortExpr(trait)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
whereParts := []string{
|
||||
fmt.Sprintf("src.%s = ?", productWarehouseCol),
|
||||
fmt.Sprintf(`EXISTS (
|
||||
SELECT 1
|
||||
FROM product_warehouses pw
|
||||
JOIN flags f ON f.flagable_type = ? AND f.flagable_id = pw.product_id
|
||||
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
|
||||
WHERE pw.id = src.%s AND fm.flag_group_code = ?
|
||||
)`, productWarehouseCol),
|
||||
}
|
||||
|
||||
if req.Lane == LaneStockable {
|
||||
whereParts = append(whereParts, fmt.Sprintf("%s > 0", availableExpr))
|
||||
}
|
||||
|
||||
if req.AsOf != nil {
|
||||
whereParts = append(whereParts, fmt.Sprintf("%s <= ?", sortExpr))
|
||||
}
|
||||
|
||||
if rule.ScopeSQL != nil && strings.TrimSpace(*rule.ScopeSQL) != "" {
|
||||
whereParts = append(whereParts, fmt.Sprintf("(%s)", strings.TrimSpace(*rule.ScopeSQL)))
|
||||
}
|
||||
|
||||
subquery := fmt.Sprintf(`
|
||||
SELECT
|
||||
? AS source_table,
|
||||
? AS legacy_type_key,
|
||||
? AS function_code,
|
||||
src.%s AS source_id,
|
||||
src.%s AS product_warehouse_id,
|
||||
%s AS sort_at,
|
||||
? AS sort_priority,
|
||||
%s AS quantity,
|
||||
%s AS used_quantity,
|
||||
%s AS pending_quantity,
|
||||
%s AS available_quantity
|
||||
FROM %s src
|
||||
%s
|
||||
WHERE %s
|
||||
`, sourceIDCol, productWarehouseCol, sortExpr, baseQtyExpr, usedExpr, pendingExpr, availableExpr, sourceTable, joinClause, strings.Join(whereParts, " AND "))
|
||||
|
||||
args := []any{
|
||||
rule.SourceTable,
|
||||
rule.LegacyTypeKey,
|
||||
rule.FunctionCode,
|
||||
trait.SortPriority,
|
||||
}
|
||||
args = append(args, extraArgs...)
|
||||
args = append(args,
|
||||
req.ProductWarehouseID,
|
||||
entity.FlagableTypeProduct,
|
||||
req.FlagGroupCode,
|
||||
)
|
||||
|
||||
if req.AsOf != nil {
|
||||
args = append(args, *req.AsOf)
|
||||
}
|
||||
|
||||
return subquery, args, nil
|
||||
}
|
||||
|
||||
func buildSortExpr(trait traitRule) (string, string, error) {
|
||||
dateCol, _ := mustSafeIdentifier(trait.DateColumn)
|
||||
idCol, _ := mustSafeIdentifier(trait.IDColumn)
|
||||
_ = idCol
|
||||
|
||||
joinClause := ""
|
||||
sortBase := fmt.Sprintf("src.%s", dateCol)
|
||||
if trait.DateTable != nil && strings.TrimSpace(*trait.DateTable) != "" {
|
||||
dateTable, _ := mustSafeIdentifier(*trait.DateTable)
|
||||
if trait.DateJoinLeftCol == nil || trait.DateJoinRightCol == nil {
|
||||
return "", "", fmt.Errorf("trait %s requires date join columns", trait.SourceTable)
|
||||
}
|
||||
leftCol, _ := mustSafeIdentifier(*trait.DateJoinLeftCol)
|
||||
rightCol, _ := mustSafeIdentifier(*trait.DateJoinRightCol)
|
||||
joinClause = fmt.Sprintf("LEFT JOIN %s dt ON src.%s = dt.%s", dateTable, leftCol, rightCol)
|
||||
sortBase = fmt.Sprintf("dt.%s", dateCol)
|
||||
}
|
||||
|
||||
if trait.FallbackDateColumn != nil && strings.TrimSpace(*trait.FallbackDateColumn) != "" {
|
||||
fallbackCol, _ := mustSafeIdentifier(*trait.FallbackDateColumn)
|
||||
sortBase = fmt.Sprintf("COALESCE(%s, src.%s)", sortBase, fallbackCol)
|
||||
}
|
||||
|
||||
sortExpr := fmt.Sprintf("COALESCE(%s, '1970-01-01 00:00:00+00'::timestamptz)", sortBase)
|
||||
return sortExpr, joinClause, nil
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func (s *fifoStockV2Service) Recalculate(ctx context.Context, req RecalculateRequest) (*RecalculateResult, error) {
|
||||
result := &RecalculateResult{Drifts: make([]WarehouseDrift, 0)}
|
||||
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
hash := requestHash(map[string]any{
|
||||
"product_warehouse_ids": req.ProductWarehouseIDs,
|
||||
"flag_group_codes": req.FlagGroupCodes,
|
||||
"as_of": req.AsOf,
|
||||
"fix_drift": req.FixDrift,
|
||||
})
|
||||
logRow, reused, err := s.beginOperation(
|
||||
tx,
|
||||
OperationRecalculate,
|
||||
req.IdempotencyKey,
|
||||
hash,
|
||||
0,
|
||||
"RECALCULATE",
|
||||
"",
|
||||
0,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reused {
|
||||
if len(logRow.ResultPayload) == 0 {
|
||||
return fmt.Errorf("idempotent recalculate has empty payload")
|
||||
}
|
||||
if err := json.Unmarshal(logRow.ResultPayload, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if logRow != nil {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.failOperation(tx, logRow, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
warehouseIDs, err := s.resolveRecalculateWarehouseIDs(ctx, tx, req.ProductWarehouseIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
groupCodes, err := s.resolveRecalculateGroupCodes(ctx, tx, req.FlagGroupCodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, warehouseID := range warehouseIDs {
|
||||
expected := 0.0
|
||||
for _, flagGroup := range groupCodes {
|
||||
available, calcErr := s.calculateWarehouseAvailableForGroup(ctx, tx, warehouseID, flagGroup, req.AsOf)
|
||||
if calcErr != nil {
|
||||
return calcErr
|
||||
}
|
||||
expected += available
|
||||
}
|
||||
|
||||
actual, actualErr := s.loadWarehouseQty(ctx, tx, warehouseID)
|
||||
if actualErr != nil {
|
||||
return actualErr
|
||||
}
|
||||
|
||||
delta := expected - actual
|
||||
result.Checked++
|
||||
if math.Abs(delta) < 1e-6 {
|
||||
continue
|
||||
}
|
||||
|
||||
drift := WarehouseDrift{
|
||||
ProductWarehouseID: warehouseID,
|
||||
ExpectedQty: expected,
|
||||
ActualQty: actual,
|
||||
Delta: delta,
|
||||
}
|
||||
result.Drifts = append(result.Drifts, drift)
|
||||
|
||||
if req.FixDrift {
|
||||
if err := s.adjustProductWarehouseQty(tx, warehouseID, delta); err != nil {
|
||||
return err
|
||||
}
|
||||
result.Fixed++
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.finishOperation(tx, logRow, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) resolveRecalculateWarehouseIDs(ctx context.Context, tx *gorm.DB, provided []uint) ([]uint, error) {
|
||||
if len(provided) > 0 {
|
||||
return provided, nil
|
||||
}
|
||||
var ids []uint
|
||||
err := tx.WithContext(ctx).Table("product_warehouses").Select("id").Order("id ASC").Scan(&ids).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) resolveRecalculateGroupCodes(ctx context.Context, tx *gorm.DB, provided []string) ([]string, error) {
|
||||
if len(provided) > 0 {
|
||||
return provided, nil
|
||||
}
|
||||
var groups []string
|
||||
err := tx.WithContext(ctx).
|
||||
Table("fifo_stock_v2_flag_groups").
|
||||
Select("code").
|
||||
Where("is_active = TRUE").
|
||||
Order("priority ASC, code ASC").
|
||||
Scan(&groups).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) calculateWarehouseAvailableForGroup(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
warehouseID uint,
|
||||
flagGroupCode string,
|
||||
asOf *time.Time,
|
||||
) (float64, error) {
|
||||
rows, err := s.gatherRows(ctx, tx, GatherRequest{
|
||||
FlagGroupCode: flagGroupCode,
|
||||
Lane: LaneStockable,
|
||||
ProductWarehouseID: warehouseID,
|
||||
AsOf: asOf,
|
||||
Limit: 50000,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total := 0.0
|
||||
for _, row := range rows {
|
||||
total += row.AvailableQuantity
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadWarehouseQty(ctx context.Context, tx *gorm.DB, warehouseID uint) (float64, error) {
|
||||
type row struct {
|
||||
Qty float64 `gorm:"column:qty"`
|
||||
}
|
||||
var out row
|
||||
err := tx.WithContext(ctx).
|
||||
Table("product_warehouses").
|
||||
Select("COALESCE(qty,0) AS qty").
|
||||
Where("id = ?", warehouseID).
|
||||
Take(&out).Error
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return out.Qty, nil
|
||||
}
|
||||
@@ -0,0 +1,265 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var identifierPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
|
||||
|
||||
type fifoStockV2Service struct {
|
||||
db *gorm.DB
|
||||
logger *logrus.Logger
|
||||
defaultGatherLimit int
|
||||
}
|
||||
|
||||
func NewService(db *gorm.DB, logger *logrus.Logger) Service {
|
||||
if logger == nil {
|
||||
logger = logrus.StandardLogger()
|
||||
}
|
||||
|
||||
return &fifoStockV2Service{
|
||||
db: db,
|
||||
logger: logger,
|
||||
defaultGatherLimit: 1000,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) withTransaction(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
fn func(*gorm.DB) error,
|
||||
) error {
|
||||
if tx != nil {
|
||||
return fn(tx.WithContext(ctx))
|
||||
}
|
||||
return s.db.WithContext(ctx).Transaction(func(inner *gorm.DB) error {
|
||||
return fn(inner)
|
||||
})
|
||||
}
|
||||
|
||||
func isSafeIdentifier(v string) bool {
|
||||
return identifierPattern.MatchString(strings.TrimSpace(v))
|
||||
}
|
||||
|
||||
func mustSafeIdentifier(v string) (string, error) {
|
||||
v = strings.TrimSpace(v)
|
||||
if !isSafeIdentifier(v) {
|
||||
return "", fmt.Errorf("unsafe identifier: %s", v)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func requestHash(v any) string {
|
||||
payload, _ := json.Marshal(v)
|
||||
sum := sha256.Sum256(payload)
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func shardLockKey(flagGroupCode string, productWarehouseID uint) int64 {
|
||||
h := fnv.New64a()
|
||||
_, _ = h.Write([]byte(strings.TrimSpace(strings.ToUpper(flagGroupCode))))
|
||||
_, _ = h.Write([]byte("|"))
|
||||
_, _ = h.Write([]byte(fmt.Sprintf("%d", productWarehouseID)))
|
||||
return int64(h.Sum64())
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) lockShard(tx *gorm.DB, flagGroupCode string, productWarehouseID uint) error {
|
||||
if strings.TrimSpace(flagGroupCode) == "" || productWarehouseID == 0 {
|
||||
return fmt.Errorf("lock shard requires flag group and product warehouse")
|
||||
}
|
||||
return tx.Exec("SELECT pg_advisory_xact_lock(?)", shardLockKey(flagGroupCode, productWarehouseID)).Error
|
||||
}
|
||||
|
||||
type operationLogRow struct {
|
||||
ID uint `gorm:"column:id"`
|
||||
Status string `gorm:"column:status"`
|
||||
RequestHash string `gorm:"column:request_hash"`
|
||||
ResultPayload json.RawMessage `gorm:"column:result_payload"`
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) beginOperation(
|
||||
tx *gorm.DB,
|
||||
op Operation,
|
||||
idempotencyKey string,
|
||||
requestHashValue string,
|
||||
productWarehouseID uint,
|
||||
flagGroupCode string,
|
||||
usableType string,
|
||||
usableID uint,
|
||||
) (*operationLogRow, bool, error) {
|
||||
if strings.TrimSpace(idempotencyKey) == "" {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
inserted := operationLogRow{}
|
||||
insertSQL := `
|
||||
INSERT INTO fifo_stock_v2_operation_log
|
||||
(idempotency_key, operation, product_warehouse_id, flag_group_code, usable_type, usable_id, request_hash, status, created_at)
|
||||
VALUES (?, ?, ?, ?, NULLIF(?, ''), NULLIF(?, 0), ?, 'RUNNING', NOW())
|
||||
ON CONFLICT (idempotency_key, operation) DO NOTHING
|
||||
RETURNING id, status, request_hash
|
||||
`
|
||||
if err := tx.Raw(insertSQL,
|
||||
idempotencyKey,
|
||||
string(op),
|
||||
productWarehouseID,
|
||||
flagGroupCode,
|
||||
usableType,
|
||||
usableID,
|
||||
requestHashValue,
|
||||
).Scan(&inserted).Error; err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if inserted.ID != 0 {
|
||||
return &inserted, false, nil
|
||||
}
|
||||
|
||||
existing := operationLogRow{}
|
||||
if err := tx.Table("fifo_stock_v2_operation_log").
|
||||
Select("id, status, request_hash, result_payload").
|
||||
Where("idempotency_key = ? AND operation = ?", idempotencyKey, string(op)).
|
||||
Take(&existing).Error; err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if existing.RequestHash != requestHashValue {
|
||||
return nil, false, fmt.Errorf("idempotency key %s reused with different payload", idempotencyKey)
|
||||
}
|
||||
|
||||
switch strings.ToUpper(existing.Status) {
|
||||
case "DONE":
|
||||
return &existing, true, nil
|
||||
case "RUNNING":
|
||||
return nil, false, fmt.Errorf("operation %s with idempotency key %s is still running", op, idempotencyKey)
|
||||
case "FAILED":
|
||||
if err := tx.Table("fifo_stock_v2_operation_log").
|
||||
Where("id = ?", existing.ID).
|
||||
Updates(map[string]any{
|
||||
"status": "RUNNING",
|
||||
"error_text": nil,
|
||||
"finished_at": nil,
|
||||
}).Error; err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
existing.Status = "RUNNING"
|
||||
return &existing, false, nil
|
||||
default:
|
||||
return nil, false, fmt.Errorf("unknown operation status: %s", existing.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) finishOperation(tx *gorm.DB, logRow *operationLogRow, payload any) error {
|
||||
if logRow == nil || logRow.ID == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
encoded, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Table("fifo_stock_v2_operation_log").
|
||||
Where("id = ?", logRow.ID).
|
||||
Updates(map[string]any{
|
||||
"status": "DONE",
|
||||
"result_payload": encoded,
|
||||
"finished_at": gorm.Expr("NOW()"),
|
||||
}).Error
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) failOperation(tx *gorm.DB, logRow *operationLogRow, failure error) {
|
||||
if logRow == nil || logRow.ID == 0 || failure == nil {
|
||||
return
|
||||
}
|
||||
_ = tx.Table("fifo_stock_v2_operation_log").
|
||||
Where("id = ?", logRow.ID).
|
||||
Updates(map[string]any{
|
||||
"status": "FAILED",
|
||||
"error_text": failure.Error(),
|
||||
"finished_at": gorm.Expr("NOW()"),
|
||||
}).Error
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) resolveOverConsume(
|
||||
tx *gorm.DB,
|
||||
flagGroupCode string,
|
||||
functionCode string,
|
||||
lane Lane,
|
||||
defaultValue bool,
|
||||
) (bool, error) {
|
||||
type row struct {
|
||||
Allow bool `gorm:"column:allow_overconsume"`
|
||||
}
|
||||
selected := row{}
|
||||
err := tx.Table("fifo_stock_v2_overconsume_rules").
|
||||
Select("allow_overconsume").
|
||||
Where("is_active = TRUE").
|
||||
Where("lane = ?", string(lane)).
|
||||
Where("(flag_group_code IS NULL OR flag_group_code = ?)", flagGroupCode).
|
||||
Where("(function_code IS NULL OR function_code = ?)", functionCode).
|
||||
Order("CASE WHEN flag_group_code IS NULL THEN 1 ELSE 0 END ASC").
|
||||
Order("CASE WHEN function_code IS NULL THEN 1 ELSE 0 END ASC").
|
||||
Order("priority ASC, id ASC").
|
||||
Limit(1).
|
||||
Take(&selected).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return defaultValue, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return selected.Allow, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) adjustProductWarehouseQty(tx *gorm.DB, productWarehouseID uint, delta float64) error {
|
||||
if productWarehouseID == 0 || delta == 0 {
|
||||
return nil
|
||||
}
|
||||
return tx.Table("product_warehouses").
|
||||
Where("id = ?", productWarehouseID).
|
||||
Update("qty", gorm.Expr("COALESCE(qty,0) + ?", delta)).Error
|
||||
}
|
||||
|
||||
func nearlyZero(v float64) bool {
|
||||
return math.Abs(v) < 1e-6
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) ensureStockAllocationColumns(tx *gorm.DB) error {
|
||||
checkCols := []string{"engine_version", "flag_group_code", "function_code", "idempotency_key"}
|
||||
for _, col := range checkCols {
|
||||
var count int64
|
||||
err := tx.Raw(`
|
||||
SELECT COUNT(1)
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public' AND table_name = 'stock_allocations' AND column_name = ?
|
||||
`, col).Scan(&count).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count == 0 {
|
||||
return fmt.Errorf("stock_allocations.%s does not exist, run fifo_stock_v2 migration first", col)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func activeAllocationStatus() string {
|
||||
return entity.StockAllocationStatusActive
|
||||
}
|
||||
|
||||
func releasedAllocationStatus() string {
|
||||
return entity.StockAllocationStatusReleased
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package fifo_stock_v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Lane string
|
||||
|
||||
const (
|
||||
LaneStockable Lane = "STOCKABLE"
|
||||
LaneUsable Lane = "USABLE"
|
||||
)
|
||||
|
||||
type Operation string
|
||||
|
||||
const (
|
||||
OperationAllocate Operation = "ALLOCATE"
|
||||
OperationRollback Operation = "ROLLBACK"
|
||||
OperationReflow Operation = "REFLOW"
|
||||
OperationRecalculate Operation = "RECALCULATE"
|
||||
)
|
||||
|
||||
type Ref struct {
|
||||
Table string
|
||||
ID uint
|
||||
LegacyTypeKey string
|
||||
FunctionCode string
|
||||
}
|
||||
|
||||
type GatherRequest struct {
|
||||
FlagGroupCode string
|
||||
Lane Lane
|
||||
ProductWarehouseID uint
|
||||
AsOf *time.Time
|
||||
Limit int
|
||||
AfterSortAt *time.Time
|
||||
AfterSourceTable string
|
||||
AfterSourceID uint
|
||||
ForUpdate bool
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type GatherRow struct {
|
||||
Ref Ref
|
||||
FlagGroupCode string
|
||||
ProductWarehouseID uint
|
||||
SortAt time.Time
|
||||
SortPriority int
|
||||
Quantity float64
|
||||
UsedQuantity float64
|
||||
PendingQuantity float64
|
||||
AvailableQuantity float64
|
||||
SourceTable string
|
||||
SourceID uint
|
||||
}
|
||||
|
||||
type AllocateRequest struct {
|
||||
FlagGroupCode string
|
||||
ProductWarehouseID uint
|
||||
Usable Ref
|
||||
NeedQty float64
|
||||
AllowOverConsume *bool
|
||||
IdempotencyKey string
|
||||
AsOf *time.Time
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type AllocationDetail struct {
|
||||
StockableType string
|
||||
StockableID uint
|
||||
Qty float64
|
||||
SortAt time.Time
|
||||
}
|
||||
|
||||
type AllocateResult struct {
|
||||
AllocatedQty float64
|
||||
PendingQty float64
|
||||
Details []AllocationDetail
|
||||
}
|
||||
|
||||
type RollbackRequest struct {
|
||||
ProductWarehouseID uint
|
||||
Usable Ref
|
||||
ReleaseQty *float64
|
||||
Reason string
|
||||
IdempotencyKey string
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type RollbackResult struct {
|
||||
ReleasedQty float64
|
||||
Details []AllocationDetail
|
||||
}
|
||||
|
||||
type ReflowRequest struct {
|
||||
FlagGroupCode string
|
||||
ProductWarehouseID uint
|
||||
Usable Ref
|
||||
DesiredQty float64
|
||||
AllowOverConsume *bool
|
||||
IdempotencyKey string
|
||||
AsOf *time.Time
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type ReflowResult struct {
|
||||
Rollback RollbackResult
|
||||
Allocate AllocateResult
|
||||
}
|
||||
|
||||
type RecalculateRequest struct {
|
||||
ProductWarehouseIDs []uint
|
||||
FlagGroupCodes []string
|
||||
AsOf *time.Time
|
||||
FixDrift bool
|
||||
IdempotencyKey string
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type WarehouseDrift struct {
|
||||
ProductWarehouseID uint
|
||||
ExpectedQty float64
|
||||
ActualQty float64
|
||||
Delta float64
|
||||
}
|
||||
|
||||
type RecalculateResult struct {
|
||||
Checked int
|
||||
Fixed int
|
||||
Drifts []WarehouseDrift
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error)
|
||||
Allocate(ctx context.Context, req AllocateRequest) (*AllocateResult, error)
|
||||
Rollback(ctx context.Context, req RollbackRequest) (*RollbackResult, error)
|
||||
Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error)
|
||||
Recalculate(ctx context.Context, req RecalculateRequest) (*RecalculateResult, error)
|
||||
}
|
||||
Reference in New Issue
Block a user