package fifo_stock_v2 import ( "context" "fmt" "strings" "time" entity "gitlab.com/mbugroup/lti-api.git/internal/entities" "gorm.io/gorm" ) type gatherSQLRow struct { SourceTable string `gorm:"column:source_table"` LegacyTypeKey string `gorm:"column:legacy_type_key"` FunctionCode string `gorm:"column:function_code"` SourceID uint `gorm:"column:source_id"` ProductWarehouseID uint `gorm:"column:product_warehouse_id"` SortAt time.Time `gorm:"column:sort_at"` SortPriority int `gorm:"column:sort_priority"` Quantity float64 `gorm:"column:quantity"` UsedQuantity float64 `gorm:"column:used_quantity"` PendingQuantity float64 `gorm:"column:pending_quantity"` AvailableQuantity float64 `gorm:"column:available_quantity"` } func (s *fifoStockV2Service) Gather(ctx context.Context, req GatherRequest) ([]GatherRow, error) { if strings.TrimSpace(req.FlagGroupCode) == "" || req.ProductWarehouseID == 0 { return nil, fmt.Errorf("%w: flag group and product warehouse are required", ErrInvalidRequest) } if req.Lane != LaneStockable && req.Lane != LaneUsable { return nil, fmt.Errorf("%w: unsupported lane %q", ErrInvalidRequest, req.Lane) } var out []GatherRow err := s.withTransaction(ctx, req.Tx, func(tx *gorm.DB) error { rows, err := s.gatherRows(ctx, tx, req) if err != nil { return err } out = rows return nil }) if err != nil { return nil, err } return out, nil } func (s *fifoStockV2Service) gatherRows(ctx context.Context, tx *gorm.DB, req GatherRequest) ([]GatherRow, error) { req.AllocationPurpose = normalizeAllocationPurpose(req.AllocationPurpose) rules, err := s.loadRouteRules(ctx, tx, req.FlagGroupCode, req.Lane) if err != nil { return nil, err } if len(rules) == 0 { return []GatherRow{}, nil } tables := make([]string, 0, len(rules)) for _, rule := range rules { tables = append(tables, rule.SourceTable) } traits, err := s.loadTraitMap(ctx, tx, req.Lane, tables) if err != nil { return nil, err } subqueries := make([]string, 0, len(rules)) args := make([]any, 0, len(rules)*10) for _, rule := range rules { trait, ok := traits[rule.SourceTable] if !ok { return nil, fmt.Errorf("missing trait for table %s lane %s", rule.SourceTable, req.Lane) } subSQL, subArgs, err := s.buildGatherSubquery(rule, trait, req) if err != nil { return nil, err } subqueries = append(subqueries, subSQL) args = append(args, subArgs...) } if len(subqueries) == 0 { return []GatherRow{}, nil } limit := req.Limit if limit <= 0 { limit = s.defaultGatherLimit } if limit <= 0 { limit = 1000 } query := "SELECT * FROM (" + strings.Join(subqueries, " UNION ALL ") + ") AS g" if req.AfterSortAt != nil { query += ` WHERE (g.sort_at > ?) OR (g.sort_at = ? AND g.source_table > ?) OR (g.sort_at = ? AND g.source_table = ? AND g.source_id > ?) ` args = append(args, *req.AfterSortAt, *req.AfterSortAt, req.AfterSourceTable, *req.AfterSortAt, req.AfterSourceTable, req.AfterSourceID, ) } query += " ORDER BY g.sort_at ASC, g.sort_priority ASC, g.source_table ASC, g.source_id ASC LIMIT ?" args = append(args, limit) var rows []gatherSQLRow if err := tx.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { return nil, err } out := make([]GatherRow, 0, len(rows)) for _, row := range rows { out = append(out, GatherRow{ Ref: Ref{ Table: row.SourceTable, ID: row.SourceID, LegacyTypeKey: row.LegacyTypeKey, FunctionCode: row.FunctionCode, }, FlagGroupCode: req.FlagGroupCode, ProductWarehouseID: row.ProductWarehouseID, SortAt: row.SortAt, SortPriority: row.SortPriority, Quantity: row.Quantity, UsedQuantity: row.UsedQuantity, PendingQuantity: row.PendingQuantity, AvailableQuantity: row.AvailableQuantity, SourceTable: row.SourceTable, SourceID: row.SourceID, }) } return out, nil } func (s *fifoStockV2Service) buildGatherSubquery(rule routeRule, trait traitRule, req GatherRequest) (string, []any, error) { sourceTable, _ := mustSafeIdentifier(rule.SourceTable) sourceIDCol, _ := mustSafeIdentifier(rule.SourceIDColumn) productWarehouseCol, _ := mustSafeIdentifier(rule.ProductWarehouseCol) quantityCol, _ := mustSafeIdentifier(rule.QuantityCol) baseQtyExpr := fmt.Sprintf("COALESCE(src.%s,0)::numeric", quantityCol) usedExpr := "0::numeric" pendingExpr := "0::numeric" availableExpr := baseQtyExpr extraArgs := make([]any, 0, 2) whereExtraArgs := make([]any, 0, 1) if req.Lane == LaneStockable { if !req.IgnoreSourceUsed && rule.UsedQuantityCol != nil && strings.TrimSpace(*rule.UsedQuantityCol) != "" { usedCol, _ := mustSafeIdentifier(*rule.UsedQuantityCol) usedExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", usedCol) } else { // NOTE: // usedExpr is referenced twice in the generated SELECT: // 1) as used_quantity // 2) inside available_quantity = base - usedExpr // plus once in stockable WHERE clause via availableExpr > 0. // We split the args because the WHERE placeholder order appears // after product/flag filter placeholders in the final SQL. usedExpr = fmt.Sprintf( "(SELECT COALESCE(SUM(sa.qty),0)::numeric FROM stock_allocations sa WHERE sa.stockable_type = ? AND sa.stockable_id = src.%s AND sa.status = '%s' AND sa.allocation_purpose = ?)", sourceIDCol, activeAllocationStatus(), ) extraArgs = append(extraArgs, rule.LegacyTypeKey, req.AllocationPurpose) extraArgs = append(extraArgs, rule.LegacyTypeKey, req.AllocationPurpose) whereExtraArgs = append(whereExtraArgs, rule.LegacyTypeKey, req.AllocationPurpose) } availableExpr = fmt.Sprintf("(%s - %s)", baseQtyExpr, usedExpr) } else { if rule.PendingQuantityCol != nil && strings.TrimSpace(*rule.PendingQuantityCol) != "" { pendingCol, _ := mustSafeIdentifier(*rule.PendingQuantityCol) pendingExpr = fmt.Sprintf("COALESCE(src.%s,0)::numeric", pendingCol) } availableExpr = baseQtyExpr } sortExpr, joinClause, err := buildSortExpr(trait) if err != nil { return "", nil, err } functionCodeExpr := "?::text" functionCodeArgs := []any{rule.FunctionCode} if rule.SourceTable == "adjustment_stocks" { functionCodeExpr = "COALESCE(NULLIF(src.function_code,''), ?::text)" } whereParts := []string{ fmt.Sprintf("src.%s = ?", productWarehouseCol), fmt.Sprintf(`EXISTS ( SELECT 1 FROM product_warehouses pw JOIN flags f ON f.flagable_type = ? AND f.flagable_id = pw.product_id JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE WHERE pw.id = src.%s AND fm.flag_group_code = ? )`, productWarehouseCol), } if req.Lane == LaneStockable { whereParts = append(whereParts, fmt.Sprintf("%s > 0", availableExpr)) } if req.AsOf != nil { whereParts = append(whereParts, fmt.Sprintf("%s <= ?", sortExpr)) } if req.From != nil { whereParts = append(whereParts, fmt.Sprintf("%s >= ?", sortExpr)) } if rule.ScopeSQL != nil && strings.TrimSpace(*rule.ScopeSQL) != "" { whereParts = append(whereParts, fmt.Sprintf("(%s)", normalizeScopeSQL(*rule.ScopeSQL))) } subquery := fmt.Sprintf(` SELECT ?::text AS source_table, ?::text AS legacy_type_key, %s AS function_code, src.%s AS source_id, src.%s AS product_warehouse_id, %s AS sort_at, ?::int AS sort_priority, %s AS quantity, %s AS used_quantity, %s AS pending_quantity, %s AS available_quantity FROM %s src %s WHERE %s `, functionCodeExpr, sourceIDCol, productWarehouseCol, sortExpr, baseQtyExpr, usedExpr, pendingExpr, availableExpr, sourceTable, joinClause, strings.Join(whereParts, " AND ")) args := []any{ rule.SourceTable, rule.LegacyTypeKey, } args = append(args, functionCodeArgs...) args = append(args, trait.SortPriority) args = append(args, extraArgs...) args = append(args, req.ProductWarehouseID, entity.FlagableTypeProduct, req.FlagGroupCode, ) args = append(args, whereExtraArgs...) if req.AsOf != nil { args = append(args, *req.AsOf) } if req.From != nil { args = append(args, *req.From) } return subquery, args, nil } func buildSortExpr(trait traitRule) (string, string, error) { dateCol, _ := mustSafeIdentifier(trait.DateColumn) idCol, _ := mustSafeIdentifier(trait.IDColumn) _ = idCol joinClause := "" sortBase := fmt.Sprintf("src.%s", dateCol) if trait.DateTable != nil && strings.TrimSpace(*trait.DateTable) != "" { dateTable, _ := mustSafeIdentifier(*trait.DateTable) if trait.DateJoinLeftCol == nil || trait.DateJoinRightCol == nil { return "", "", fmt.Errorf("trait %s requires date join columns", trait.SourceTable) } leftCol, _ := mustSafeIdentifier(*trait.DateJoinLeftCol) rightCol, _ := mustSafeIdentifier(*trait.DateJoinRightCol) joinClause = fmt.Sprintf("LEFT JOIN %s dt ON src.%s = dt.%s", dateTable, leftCol, rightCol) sortBase = fmt.Sprintf("dt.%s", dateCol) } if trait.FallbackDateColumn != nil && strings.TrimSpace(*trait.FallbackDateColumn) != "" { fallbackCol, _ := mustSafeIdentifier(*trait.FallbackDateColumn) sortBase = fmt.Sprintf("COALESCE(%s, src.%s)", sortBase, fallbackCol) } sortExpr := fmt.Sprintf("COALESCE(%s, '1970-01-01 00:00:00+00'::timestamptz)", sortBase) return sortExpr, joinClause, nil }