mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
292 lines
9.0 KiB
Go
292 lines
9.0 KiB
Go
package fifo_stock_v2
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type gatherSQLRow struct {
|
|
SourceTable string `gorm:"column:source_table"`
|
|
LegacyTypeKey string `gorm:"column:legacy_type_key"`
|
|
FunctionCode string `gorm:"column:function_code"`
|
|
SourceID uint `gorm:"column:source_id"`
|
|
ProductWarehouseID uint `gorm:"column:product_warehouse_id"`
|
|
SortAt time.Time `gorm:"column:sort_at"`
|
|
SortPriority int `gorm:"column:sort_priority"`
|
|
Quantity float64 `gorm:"column:quantity"`
|
|
UsedQuantity float64 `gorm:"column:used_quantity"`
|
|
PendingQuantity float64 `gorm:"column:pending_quantity"`
|
|
AvailableQuantity float64 `gorm:"column:available_quantity"`
|
|
}
|
|
|
|
func (s *fifoStockV2Service) Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error) {
|
|
if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 {
|
|
return nil, fmt.Errorf("%w: flag group and product warehouse are required", ErrInvalidRequest)
|
|
}
|
|
if req.Lane != LaneStockable && req.Lane != LaneUsable {
|
|
return nil, fmt.Errorf("%w: unsupported lane %q", ErrInvalidRequest, req.Lane)
|
|
}
|
|
|
|
var out []GatherRow
|
|
err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error {
|
|
rows, err := s.gatherRows(ctx, tx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out = rows
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *fifoStockV2Service) gatherRows(ctx context.Context, tx *gorm.DB, req GatherRequest) ([]GatherRow, error) {
|
|
rules, err := s.loadRouteRules(ctx, tx, req.FlagGroupCode, req.Lane)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(rules) == 0 {
|
|
return []GatherRow{}, nil
|
|
}
|
|
|
|
tables := make([]string, 0, len(rules))
|
|
for _, rule := range rules {
|
|
tables = append(tables, rule.SourceTable)
|
|
}
|
|
|
|
traits, err := s.loadTraitMap(ctx, tx, req.Lane, tables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
subqueries := make([]string, 0, len(rules))
|
|
args := make([]any, 0, len(rules)*10)
|
|
|
|
for _, rule := range rules {
|
|
trait, ok := traits[rule.SourceTable]
|
|
if !ok {
|
|
return nil, fmt.Errorf("missing trait for table %s lane %s", rule.SourceTable, req.Lane)
|
|
}
|
|
subSQL, subArgs, err := s.buildGatherSubquery(rule, trait, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
subqueries = append(subqueries, subSQL)
|
|
args = append(args, subArgs...)
|
|
}
|
|
|
|
if len(subqueries) == 0 {
|
|
return []GatherRow{}, nil
|
|
}
|
|
|
|
limit := req.Limit
|
|
if limit <= 0 {
|
|
limit = s.defaultGatherLimit
|
|
}
|
|
if limit <= 0 {
|
|
limit = 1000
|
|
}
|
|
|
|
query := "SELECT * FROM (" + strings.Join(subqueries, " UNION ALL ") + ") AS g"
|
|
if req.AfterSortAt != nil {
|
|
query += `
|
|
WHERE
|
|
(g.sort_at > ?)
|
|
OR (g.sort_at = ? AND g.source_table > ?)
|
|
OR (g.sort_at = ? AND g.source_table = ? AND g.source_id > ?)
|
|
`
|
|
args = append(args,
|
|
*req.AfterSortAt,
|
|
*req.AfterSortAt, req.AfterSourceTable,
|
|
*req.AfterSortAt, req.AfterSourceTable, req.AfterSourceID,
|
|
)
|
|
}
|
|
query += " ORDER BY g.sort_at ASC, g.sort_priority ASC, g.source_table ASC, g.source_id ASC LIMIT ?"
|
|
args = append(args, limit)
|
|
|
|
var rows []gatherSQLRow
|
|
if err := tx.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := make([]GatherRow, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, GatherRow{
|
|
Ref: Ref{
|
|
Table: row.SourceTable,
|
|
ID: row.SourceID,
|
|
LegacyTypeKey: row.LegacyTypeKey,
|
|
FunctionCode: row.FunctionCode,
|
|
},
|
|
FlagGroupCode: req.FlagGroupCode,
|
|
ProductWarehouseID: row.ProductWarehouseID,
|
|
SortAt: row.SortAt,
|
|
SortPriority: row.SortPriority,
|
|
Quantity: row.Quantity,
|
|
UsedQuantity: row.UsedQuantity,
|
|
PendingQuantity: row.PendingQuantity,
|
|
AvailableQuantity: row.AvailableQuantity,
|
|
SourceTable: row.SourceTable,
|
|
SourceID: row.SourceID,
|
|
})
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule, req GatherRequest) (string, []any, error) {
|
|
sourceTable, _ := mustSafeIdentifier(rule.SourceTable)
|
|
sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn)
|
|
productWarehouseCol, _ := mustSafeIdentifier(rule.ProductWarehouseCol)
|
|
quantityCol, _ := mustSafeIdentifier(rule.QuantityCol)
|
|
|
|
baseQtyExpr := fmt.Sprintf("COALESCE(src.%s,0)::numeric", quantityCol)
|
|
usedExpr := "0::numeric"
|
|
pendingExpr := "0::numeric"
|
|
availableExpr := baseQtyExpr
|
|
extraArgs := make([]any, 0, 2)
|
|
whereExtraArgs := make([]any, 0, 1)
|
|
|
|
if req.Lane == LaneStockable {
|
|
if rule.UsedQuantityCol != nil && strings.TrimSpace(*rule.UsedQuantityCol) != "" {
|
|
usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol)
|
|
usedExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", usedCol)
|
|
} else {
|
|
// NOTE:
|
|
// usedExpr is referenced twice in the generated SELECT:
|
|
// 1) as used_quantity
|
|
// 2) inside available_quantity = base - usedExpr
|
|
// plus once in stockable WHERE clause via availableExpr > 0.
|
|
// We split the args because the WHERE placeholder order appears
|
|
// after product/flag filter placeholders in the final SQL.
|
|
usedExpr = fmt.Sprintf(
|
|
"(SELECT COALESCE(SUM(sa.qty),0)::numeric FROM stock_allocations sa WHERE sa.stockable_type = ? AND sa.stockable_id = src.%s AND sa.status = '%s')",
|
|
sourceIDCol,
|
|
activeAllocationStatus(),
|
|
)
|
|
extraArgs = append(extraArgs, rule.LegacyTypeKey)
|
|
extraArgs = append(extraArgs, rule.LegacyTypeKey)
|
|
whereExtraArgs = append(whereExtraArgs, rule.LegacyTypeKey)
|
|
}
|
|
availableExpr = fmt.Sprintf("(%s - %s)", baseQtyExpr, usedExpr)
|
|
} else {
|
|
if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" {
|
|
pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol)
|
|
pendingExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", pendingCol)
|
|
}
|
|
availableExpr = baseQtyExpr
|
|
}
|
|
|
|
sortExpr, joinClause, err := buildSortExpr(trait)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
functionCodeExpr := "?::text"
|
|
functionCodeArgs := []any{rule.FunctionCode}
|
|
if rule.SourceTable == "adjustment_stocks" {
|
|
functionCodeExpr = "COALESCE(NULLIF(src.function_code,''), ?::text)"
|
|
}
|
|
|
|
whereParts := []string{
|
|
fmt.Sprintf("src.%s = ?", productWarehouseCol),
|
|
fmt.Sprintf(`EXISTS (
|
|
SELECT 1
|
|
FROM product_warehouses pw
|
|
JOIN flags f ON f.flagable_type = ? AND f.flagable_id = pw.product_id
|
|
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
|
|
WHERE pw.id = src.%s AND fm.flag_group_code = ?
|
|
)`, productWarehouseCol),
|
|
}
|
|
|
|
if req.Lane == LaneStockable {
|
|
whereParts = append(whereParts, fmt.Sprintf("%s > 0", availableExpr))
|
|
}
|
|
|
|
if req.AsOf != nil {
|
|
whereParts = append(whereParts, fmt.Sprintf("%s <= ?", sortExpr))
|
|
}
|
|
if req.From != nil {
|
|
whereParts = append(whereParts, fmt.Sprintf("%s >= ?", sortExpr))
|
|
}
|
|
|
|
if rule.ScopeSQL != nil && strings.TrimSpace(*rule.ScopeSQL) != "" {
|
|
whereParts = append(whereParts, fmt.Sprintf("(%s)", normalizeScopeSQL(*rule.ScopeSQL)))
|
|
}
|
|
|
|
subquery := fmt.Sprintf(`
|
|
SELECT
|
|
?::text AS source_table,
|
|
?::text AS legacy_type_key,
|
|
%s AS function_code,
|
|
src.%s AS source_id,
|
|
src.%s AS product_warehouse_id,
|
|
%s AS sort_at,
|
|
?::int AS sort_priority,
|
|
%s AS quantity,
|
|
%s AS used_quantity,
|
|
%s AS pending_quantity,
|
|
%s AS available_quantity
|
|
FROM %s src
|
|
%s
|
|
WHERE %s
|
|
`, functionCodeExpr, sourceIDCol, productWarehouseCol, sortExpr, baseQtyExpr, usedExpr, pendingExpr, availableExpr, sourceTable, joinClause, strings.Join(whereParts, " AND "))
|
|
|
|
args := []any{
|
|
rule.SourceTable,
|
|
rule.LegacyTypeKey,
|
|
}
|
|
args = append(args, functionCodeArgs...)
|
|
args = append(args, trait.SortPriority)
|
|
args = append(args, extraArgs...)
|
|
args = append(args,
|
|
req.ProductWarehouseID,
|
|
entity.FlagableTypeProduct,
|
|
req.FlagGroupCode,
|
|
)
|
|
args = append(args, whereExtraArgs...)
|
|
|
|
if req.AsOf != nil {
|
|
args = append(args, *req.AsOf)
|
|
}
|
|
if req.From != nil {
|
|
args = append(args, *req.From)
|
|
}
|
|
|
|
return subquery, args, nil
|
|
}
|
|
|
|
func buildSortExpr(trait traitRule) (string, string, error) {
|
|
dateCol, _ := mustSafeIdentifier(trait.DateColumn)
|
|
idCol, _ := mustSafeIdentifier(trait.IDColumn)
|
|
_ = idCol
|
|
|
|
joinClause := ""
|
|
sortBase := fmt.Sprintf("src.%s", dateCol)
|
|
if trait.DateTable != nil && strings.TrimSpace(*trait.DateTable) != "" {
|
|
dateTable, _ := mustSafeIdentifier(*trait.DateTable)
|
|
if trait.DateJoinLeftCol == nil || trait.DateJoinRightCol == nil {
|
|
return "", "", fmt.Errorf("trait %s requires date join columns", trait.SourceTable)
|
|
}
|
|
leftCol, _ := mustSafeIdentifier(*trait.DateJoinLeftCol)
|
|
rightCol, _ := mustSafeIdentifier(*trait.DateJoinRightCol)
|
|
joinClause = fmt.Sprintf("LEFT JOIN %s dt ON src.%s = dt.%s", dateTable, leftCol, rightCol)
|
|
sortBase = fmt.Sprintf("dt.%s", dateCol)
|
|
}
|
|
|
|
if trait.FallbackDateColumn != nil && strings.TrimSpace(*trait.FallbackDateColumn) != "" {
|
|
fallbackCol, _ := mustSafeIdentifier(*trait.FallbackDateColumn)
|
|
sortBase = fmt.Sprintf("COALESCE(%s, src.%s)", sortBase, fallbackCol)
|
|
}
|
|
|
|
sortExpr := fmt.Sprintf("COALESCE(%s, '1970-01-01 00:00:00+00'::timestamptz)", sortBase)
|
|
return sortExpr, joinClause, nil
|
|
}
|