mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
fix: first push need support testing, and implemented fifo v2 to all modules
This commit is contained in:
@@ -401,12 +401,9 @@ func (s *fifoStockV2Service) rollbackInternal(
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*ReflowResult, error) {
|
||||
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 || req.Usable.ID == 0 || strings.TrimSpace(req.Usable.LegacyTypeKey) == "" {
|
||||
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 {
|
||||
return nil, fmt.Errorf("%w: invalid reflow request", ErrInvalidRequest)
|
||||
}
|
||||
if req.DesiredQty < 0 {
|
||||
return nil, fmt.Errorf("%w: desired qty must be >= 0", ErrInvalidRequest)
|
||||
}
|
||||
|
||||
result := &ReflowResult{}
|
||||
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
||||
@@ -420,11 +417,7 @@ func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*Re
|
||||
hash := requestHash(map[string]any{
|
||||
"flag_group_code": req.FlagGroupCode,
|
||||
"product_warehouse_id": req.ProductWarehouseID,
|
||||
"usable_type": req.Usable.LegacyTypeKey,
|
||||
"usable_id": req.Usable.ID,
|
||||
"desired_qty": req.DesiredQty,
|
||||
"as_of": req.AsOf,
|
||||
"allow_over_consume": req.AllowOverConsume,
|
||||
})
|
||||
logRow, reused, err := s.beginOperation(
|
||||
tx,
|
||||
@@ -433,8 +426,8 @@ func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*Re
|
||||
hash,
|
||||
req.ProductWarehouseID,
|
||||
req.FlagGroupCode,
|
||||
req.Usable.LegacyTypeKey,
|
||||
req.Usable.ID,
|
||||
"",
|
||||
0,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -456,32 +449,82 @@ func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*Re
|
||||
}()
|
||||
}
|
||||
|
||||
rollbackRes, rollbackErr := s.rollbackInternal(ctx, tx, RollbackRequest{
|
||||
usableRows, gatherErr := s.gatherAllRows(ctx, tx, GatherRequest{
|
||||
FlagGroupCode: req.FlagGroupCode,
|
||||
Lane: LaneUsable,
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
Usable: req.Usable,
|
||||
ReleaseQty: nil,
|
||||
Reason: "reflow reset",
|
||||
}, req.FlagGroupCode)
|
||||
if rollbackErr != nil {
|
||||
err = rollbackErr
|
||||
return rollbackErr
|
||||
Limit: s.defaultGatherLimit,
|
||||
})
|
||||
if gatherErr != nil {
|
||||
err = gatherErr
|
||||
return gatherErr
|
||||
}
|
||||
result.Rollback = *rollbackRes
|
||||
result.ProcessedUsables = len(usableRows)
|
||||
|
||||
if req.DesiredQty > 0 {
|
||||
for _, usableRow := range usableRows {
|
||||
desiredQty := usableRow.Quantity + usableRow.PendingQuantity
|
||||
|
||||
rollbackRes, rollbackErr := s.rollbackInternal(ctx, tx, RollbackRequest{
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
Usable: usableRow.Ref,
|
||||
ReleaseQty: nil,
|
||||
Reason: "reflow reset",
|
||||
}, req.FlagGroupCode)
|
||||
if rollbackErr != nil {
|
||||
err = rollbackErr
|
||||
return rollbackErr
|
||||
}
|
||||
result.Rollback.ReleasedQty += rollbackRes.ReleasedQty
|
||||
if len(rollbackRes.Details) > 0 {
|
||||
result.Rollback.Details = append(result.Rollback.Details, rollbackRes.Details...)
|
||||
}
|
||||
minDesired := rollbackRes.ReleasedQty + usableRow.PendingQuantity
|
||||
if desiredQty < minDesired {
|
||||
desiredQty = minDesired
|
||||
}
|
||||
|
||||
if desiredQty <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
asOf := usableRow.SortAt
|
||||
if req.AsOf != nil && asOf.Before(*req.AsOf) {
|
||||
asOf = *req.AsOf
|
||||
}
|
||||
allocateRes, allocateErr := s.allocateInternal(ctx, tx, AllocateRequest{
|
||||
FlagGroupCode: req.FlagGroupCode,
|
||||
ProductWarehouseID: req.ProductWarehouseID,
|
||||
Usable: req.Usable,
|
||||
NeedQty: req.DesiredQty,
|
||||
AllowOverConsume: req.AllowOverConsume,
|
||||
AsOf: req.AsOf,
|
||||
Usable: usableRow.Ref,
|
||||
NeedQty: desiredQty,
|
||||
AsOf: &asOf,
|
||||
})
|
||||
if allocateErr != nil {
|
||||
err = allocateErr
|
||||
return allocateErr
|
||||
}
|
||||
result.Allocate = *allocateRes
|
||||
result.Allocate.AllocatedQty += allocateRes.AllocatedQty
|
||||
result.Allocate.PendingQty += allocateRes.PendingQty
|
||||
if len(allocateRes.Details) > 0 {
|
||||
result.Allocate.Details = append(result.Allocate.Details, allocateRes.Details...)
|
||||
}
|
||||
}
|
||||
|
||||
expectedQty, calcErr := s.calculateWarehouseAvailableForGroup(ctx, tx, req.ProductWarehouseID, req.FlagGroupCode, nil)
|
||||
if calcErr != nil {
|
||||
err = calcErr
|
||||
return calcErr
|
||||
}
|
||||
actualQty, loadErr := s.loadWarehouseQty(ctx, tx, req.ProductWarehouseID)
|
||||
if loadErr != nil {
|
||||
err = loadErr
|
||||
return loadErr
|
||||
}
|
||||
drift := expectedQty - actualQty
|
||||
if math.Abs(drift) >= 1e-6 {
|
||||
if adjustErr := s.adjustProductWarehouseQty(tx, req.ProductWarehouseID, drift); adjustErr != nil {
|
||||
err = adjustErr
|
||||
return adjustErr
|
||||
}
|
||||
}
|
||||
|
||||
if finishErr := s.finishOperation(tx, logRow, result); finishErr != nil {
|
||||
@@ -496,6 +539,54 @@ func (s *fifoStockV2Service) Reflow(ctx context.Context, req ReflowRequest) (*Re
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) gatherAllRows(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
req GatherRequest,
|
||||
) ([]GatherRow, error) {
|
||||
limit := req.Limit
|
||||
if limit <= 0 {
|
||||
limit = s.defaultGatherLimit
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
req.Limit = limit
|
||||
out := make([]GatherRow, 0, limit)
|
||||
|
||||
var cursorSortAt *time.Time
|
||||
cursorSourceTable := ""
|
||||
var cursorSourceID uint
|
||||
|
||||
for {
|
||||
req.AfterSortAt = cursorSortAt
|
||||
req.AfterSourceTable = cursorSourceTable
|
||||
req.AfterSourceID = cursorSourceID
|
||||
|
||||
rows, err := s.gatherRows(ctx, tx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rows) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
out = append(out, rows...)
|
||||
if len(rows) < limit {
|
||||
break
|
||||
}
|
||||
|
||||
last := rows[len(rows)-1]
|
||||
lastSortAt := last.SortAt
|
||||
cursorSortAt = &lastSortAt
|
||||
cursorSourceTable = last.SourceTable
|
||||
cursorSourceID = last.SourceID
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *fifoStockV2Service) loadActiveAllocations(
|
||||
tx *gorm.DB,
|
||||
usableType string,
|
||||
|
||||
@@ -197,6 +197,9 @@ func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule
|
||||
if req.AsOf != nil {
|
||||
whereParts = append(whereParts, fmt.Sprintf("%s <= ?", sortExpr))
|
||||
}
|
||||
if req.From != nil {
|
||||
whereParts = append(whereParts, fmt.Sprintf("%s >= ?", sortExpr))
|
||||
}
|
||||
|
||||
if rule.ScopeSQL != nil && strings.TrimSpace(*rule.ScopeSQL) != "" {
|
||||
whereParts = append(whereParts, fmt.Sprintf("(%s)", normalizeScopeSQL(*rule.ScopeSQL)))
|
||||
@@ -236,6 +239,9 @@ func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule
|
||||
if req.AsOf != nil {
|
||||
args = append(args, *req.AsOf)
|
||||
}
|
||||
if req.From != nil {
|
||||
args = append(args, *req.From)
|
||||
}
|
||||
|
||||
return subquery, args, nil
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ type GatherRequest struct {
|
||||
FlagGroupCode string
|
||||
Lane Lane
|
||||
ProductWarehouseID uint
|
||||
From *time.Time
|
||||
AsOf *time.Time
|
||||
Limit int
|
||||
AfterSortAt *time.Time
|
||||
@@ -98,17 +99,15 @@ type RollbackResult struct {
|
||||
type ReflowRequest struct {
|
||||
FlagGroupCode string
|
||||
ProductWarehouseID uint
|
||||
Usable Ref
|
||||
DesiredQty float64
|
||||
AllowOverConsume *bool
|
||||
IdempotencyKey string
|
||||
AsOf *time.Time
|
||||
IdempotencyKey string
|
||||
Tx *gorm.DB
|
||||
}
|
||||
|
||||
type ReflowResult struct {
|
||||
Rollback RollbackResult
|
||||
Allocate AllocateResult
|
||||
ProcessedUsables int
|
||||
Rollback RollbackResult
|
||||
Allocate AllocateResult
|
||||
}
|
||||
|
||||
type RecalculateRequest struct {
|
||||
|
||||
Reference in New Issue
Block a user