Merge branch 'cmd/consolidate-and-repoint-stocks' into 'development'

cmd: post-consolidation command for duplicate product_warehouses

See merge request mbugroup/lti-api!456
This commit is contained in:
Adnan Zahir
2026-04-24 01:20:39 +07:00
@@ -0,0 +1,384 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"sort"
"strings"
"text/tabwriter"
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
"gitlab.com/mbugroup/lti-api.git/internal/config"
"gitlab.com/mbugroup/lti-api.git/internal/database"
"gorm.io/gorm"
)
const (
outputTable = "table"
outputJSON = "json"
)
type options struct {
Apply bool
Output string
DBSSLMode string
AreaName string
}
type duplicateGroup struct {
WarehouseID uint `json:"warehouse_id"`
WarehouseName string `json:"warehouse_name"`
ProductID uint `json:"product_id"`
ProductName string `json:"product_name"`
AreaName string `json:"area_name"`
LocationName string `json:"location_name"`
SurvivorID uint `json:"survivor_id"`
SurvivorQty float64 `json:"survivor_qty"`
AbsorbedCount int `json:"absorbed_count"`
TotalMergedQty float64 `json:"total_merged_qty"`
AbsorbedIDs string `json:"absorbed_ids"`
}
type consolidateSummary struct {
TotalDuplicateGroups int `json:"total_duplicate_groups"`
TotalProductWarehouses int64 `json:"total_product_warehouses"`
UpdatedReferences map[string]int64 `json:"updated_references,omitempty"`
DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"`
OverallStatus string `json:"overall_status"`
}
func main() {
opts, err := parseFlags()
if err != nil {
log.Fatalf("invalid flags: %v", err)
}
if opts.DBSSLMode != "" {
config.DBSSLMode = opts.DBSSLMode
}
ctx := context.Background()
db := database.Connect(config.DBHost, config.DBName)
// Find duplicate groups
groups, err := findDuplicateProductWarehouses(ctx, db, opts)
if err != nil {
log.Fatalf("failed to find duplicates: %v", err)
}
if len(groups) == 0 {
fmt.Println("No duplicate product_warehouses found")
return
}
summary := summarizeGroups(groups)
if !opts.Apply {
renderConsolidation(opts.Output, groups, summary)
return
}
applied, err := applyConsolidation(ctx, db, groups)
if err != nil {
log.Fatalf("apply failed: %v", err)
}
renderConsolidation(opts.Output, groups, applied)
}
func parseFlags() (*options, error) {
var opts options
flag.BoolVar(&opts.Apply, "apply", false, "Apply consolidation (omit for dry-run)")
flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json")
flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override")
flag.StringVar(&opts.AreaName, "area-name", "", "Optional area filter")
flag.Parse()
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
opts.AreaName = strings.TrimSpace(opts.AreaName)
opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode)
if opts.Output == "" {
opts.Output = outputTable
}
if opts.Output != outputTable && opts.Output != outputJSON {
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
}
return &opts, nil
}
func findDuplicateProductWarehouses(ctx context.Context, db *gorm.DB, opts *options) ([]duplicateGroup, error) {
filters := ""
args := []any{}
if opts.AreaName != "" {
filters = "WHERE a.name = ?"
args = append(args, opts.AreaName)
}
query := fmt.Sprintf(`
SELECT
pw.warehouse_id,
w.name AS warehouse_name,
pw.product_id,
p.name AS product_name,
COALESCE(a.name, 'N/A') AS area_name,
COALESCE(l.name, 'N/A') AS location_name,
MIN(pw.id) AS survivor_id,
(SELECT qty FROM product_warehouses WHERE warehouse_id = pw.warehouse_id AND product_id = pw.product_id AND id = MIN(pw.id)) AS survivor_qty,
COUNT(*) - 1 AS absorbed_count,
SUM(pw.qty) AS total_merged_qty,
STRING_AGG(pw.id::text, ', ' ORDER BY pw.id::text) FILTER (WHERE pw.id <> MIN(pw.id)) AS absorbed_ids
FROM product_warehouses pw
JOIN warehouses w ON w.id = pw.warehouse_id
JOIN products p ON p.id = pw.product_id
LEFT JOIN locations l ON l.id = w.location_id
LEFT JOIN areas a ON a.id = l.area_id
%s
GROUP BY pw.warehouse_id, w.name, pw.product_id, p.name, a.name, l.name
HAVING COUNT(*) > 1
ORDER BY a.name, l.name, w.name, p.name
`, filters)
rows := make([]duplicateGroup, 0)
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
return nil, err
}
return rows, nil
}
func applyConsolidation(ctx context.Context, db *gorm.DB, groups []duplicateGroup) (consolidateSummary, error) {
summary := consolidateSummary{
TotalDuplicateGroups: len(groups),
UpdatedReferences: make(map[string]int64),
OverallStatus: "PASS",
}
fifoSvc := commonSvc.NewFifoStockV2Service(db, nil)
err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, group := range groups {
absorbedIDs := []uint{}
if group.AbsorbedIDs != "" {
parts := strings.Split(group.AbsorbedIDs, ", ")
for _, p := range parts {
var id uint
fmt.Sscanf(p, "%d", &id)
absorbedIDs = append(absorbedIDs, id)
}
}
if len(absorbedIDs) == 0 {
continue
}
// Update all references to point to survivor
refTables := []struct {
table string
column string
}{
{"stock_allocations", "product_warehouse_id"},
{"fifo_stock_v2_operation_log", "product_warehouse_id"},
{"fifo_stock_v2_reflow_checkpoints", "product_warehouse_id"},
{"fifo_stock_v2_shadow_allocations", "product_warehouse_id"},
{"recording_depletions", "source_product_warehouse_id"},
{"stock_logs", "product_warehouse_id"},
}
for _, ref := range refTables {
res := tx.WithContext(ctx).
Table(ref.table).
Where(fmt.Sprintf("%s IN ?", ref.column), absorbedIDs).
Update(ref.column, group.SurvivorID)
if res.Error != nil {
return fmt.Errorf("update %s.%s: %w", ref.table, ref.column, res.Error)
}
if res.RowsAffected > 0 {
summary.UpdatedReferences[ref.table+"."+ref.column] += res.RowsAffected
}
}
// Update survivor qty to merged total
res := tx.WithContext(ctx).
Table("product_warehouses").
Where("id = ?", group.SurvivorID).
Update("qty", group.TotalMergedQty)
if res.Error != nil {
return fmt.Errorf("update survivor qty: %w", res.Error)
}
// Delete absorbed product_warehouses
res = tx.WithContext(ctx).
Where("id IN ?", absorbedIDs).
Delete(&struct{}{})
if res.Error != nil {
return fmt.Errorf("delete absorbed: %w", res.Error)
}
summary.DeletedProductWarehouses += res.RowsAffected
// Recalculate stock_logs for survivor
if err := recalculateStockLogs(ctx, tx, []uint{group.SurvivorID}); err != nil {
return fmt.Errorf("recalculate stock_logs: %w", err)
}
// Reflow and recalculate FIFO
if err := reflowProductWarehouse(ctx, fifoSvc, tx, group.SurvivorID); err != nil {
return fmt.Errorf("reflow product_warehouse %d: %w", group.SurvivorID, err)
}
}
return nil
})
if err != nil {
summary.OverallStatus = "FAIL"
return summary, err
}
return summary, nil
}
func recalculateStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error {
if len(productWarehouseIDs) == 0 {
return nil
}
query := `
WITH recalculated AS (
SELECT
id,
SUM(COALESCE(increase, 0) - COALESCE(decrease, 0))
OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock
FROM stock_logs
WHERE product_warehouse_id IN ?
)
UPDATE stock_logs sl
SET stock = recalculated.running_stock
FROM recalculated
WHERE sl.id = recalculated.id
`
return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error
}
func reflowProductWarehouse(ctx context.Context, fifoSvc commonSvc.FifoStockV2Service, tx *gorm.DB, productWarehouseID uint) error {
type row struct {
FlagGroupCode string `gorm:"column:flag_group_code"`
}
var selected row
err := tx.WithContext(ctx).
Table("fifo_stock_v2_route_rules rr").
Select("rr.flag_group_code").
Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE").
Where("rr.is_active = TRUE").
Where("rr.lane = ?", "STOCKABLE").
Where("rr.function_code = ?", "PURCHASE_IN").
Where("rr.source_table = ?", "purchase_items").
Where(`EXISTS (
SELECT 1
FROM product_warehouses pw
JOIN flags f ON f.flagable_id = pw.product_id
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
WHERE pw.id = ?
AND f.flagable_type = 'products'
AND fm.flag_group_code = rr.flag_group_code
)`, productWarehouseID).
Order("rr.id ASC").
Limit(1).
Take(&selected).Error
if err != nil && err != gorm.ErrRecordNotFound {
return err
}
if err == gorm.ErrRecordNotFound {
return nil
}
flagGroupCode := strings.TrimSpace(selected.FlagGroupCode)
if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{
FlagGroupCode: flagGroupCode,
ProductWarehouseID: productWarehouseID,
Tx: tx,
}); err != nil {
return err
}
if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{
ProductWarehouseIDs: []uint{productWarehouseID},
FlagGroupCodes: []string{flagGroupCode},
FixDrift: true,
Tx: tx,
}); err != nil {
return err
}
return nil
}
func summarizeGroups(groups []duplicateGroup) consolidateSummary {
var totalQty int64
for _, g := range groups {
totalQty += int64(g.AbsorbedCount)
}
return consolidateSummary{
TotalDuplicateGroups: len(groups),
TotalProductWarehouses: totalQty,
OverallStatus: "PASS",
}
}
func renderConsolidation(mode string, groups []duplicateGroup, summary consolidateSummary) {
if mode == outputJSON {
payload := map[string]any{
"groups": groups,
"summary": summary,
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(payload)
return
}
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
fmt.Fprintln(w, "AREA\tLOCATION\tWAREHOUSE\tPRODUCT\tSURVIVOR_ID\tSURVIVOR_QTY\tABSORBED_COUNT\tTOTAL_MERGED_QTY\tABSORBED_IDS")
for _, g := range groups {
fmt.Fprintf(
w,
"%s\t%s\t%s\t%s\t%d\t%.3f\t%d\t%.3f\t%s\n",
g.AreaName,
g.LocationName,
g.WarehouseName,
g.ProductName,
g.SurvivorID,
g.SurvivorQty,
g.AbsorbedCount,
g.TotalMergedQty,
g.AbsorbedIDs,
)
}
_ = w.Flush()
fmt.Printf("\n=== SUMMARY ===\n")
fmt.Printf("Duplicate groups found: %d\n", summary.TotalDuplicateGroups)
fmt.Printf("Product warehouses to delete: %d\n", summary.TotalProductWarehouses)
fmt.Printf("Overall status: %s\n", summary.OverallStatus)
if len(summary.UpdatedReferences) > 0 {
fmt.Println("\nUpdated references:")
keys := make([]string, 0, len(summary.UpdatedReferences))
for k := range summary.UpdatedReferences {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
fmt.Printf(" %s=%d\n", k, summary.UpdatedReferences[k])
}
}
}