diff --git a/cmd/consolidate-duplicate-product-warehouses/main.go b/cmd/consolidate-duplicate-product-warehouses/main.go new file mode 100644 index 00000000..f77979b7 --- /dev/null +++ b/cmd/consolidate-duplicate-product-warehouses/main.go @@ -0,0 +1,384 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "sort" + "strings" + "text/tabwriter" + + commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" + "gitlab.com/mbugroup/lti-api.git/internal/config" + "gitlab.com/mbugroup/lti-api.git/internal/database" + "gorm.io/gorm" +) + +const ( + outputTable = "table" + outputJSON = "json" +) + +type options struct { + Apply bool + Output string + DBSSLMode string + AreaName string +} + +type duplicateGroup struct { + WarehouseID uint `json:"warehouse_id"` + WarehouseName string `json:"warehouse_name"` + ProductID uint `json:"product_id"` + ProductName string `json:"product_name"` + AreaName string `json:"area_name"` + LocationName string `json:"location_name"` + + SurvivorID uint `json:"survivor_id"` + SurvivorQty float64 `json:"survivor_qty"` + AbsorbedCount int `json:"absorbed_count"` + TotalMergedQty float64 `json:"total_merged_qty"` + AbsorbedIDs string `json:"absorbed_ids"` +} + +type consolidateSummary struct { + TotalDuplicateGroups int `json:"total_duplicate_groups"` + TotalProductWarehouses int64 `json:"total_product_warehouses"` + UpdatedReferences map[string]int64 `json:"updated_references,omitempty"` + DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"` + OverallStatus string `json:"overall_status"` +} + +func main() { + opts, err := parseFlags() + if err != nil { + log.Fatalf("invalid flags: %v", err) + } + + if opts.DBSSLMode != "" { + config.DBSSLMode = opts.DBSSLMode + } + + ctx := context.Background() + db := database.Connect(config.DBHost, config.DBName) + + // Find duplicate groups + groups, err := findDuplicateProductWarehouses(ctx, db, opts) + if err != nil { + log.Fatalf("failed to find duplicates: %v", err) + } + + if len(groups) == 0 { + fmt.Println("No duplicate product_warehouses found") + return + } + + summary := summarizeGroups(groups) + if !opts.Apply { + renderConsolidation(opts.Output, groups, summary) + return + } + + applied, err := applyConsolidation(ctx, db, groups) + if err != nil { + log.Fatalf("apply failed: %v", err) + } + renderConsolidation(opts.Output, groups, applied) +} + +func parseFlags() (*options, error) { + var opts options + flag.BoolVar(&opts.Apply, "apply", false, "Apply consolidation (omit for dry-run)") + flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json") + flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override") + flag.StringVar(&opts.AreaName, "area-name", "", "Optional area filter") + flag.Parse() + + opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) + opts.AreaName = strings.TrimSpace(opts.AreaName) + opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) + + if opts.Output == "" { + opts.Output = outputTable + } + if opts.Output != outputTable && opts.Output != outputJSON { + return nil, fmt.Errorf("unsupported --output=%s", opts.Output) + } + + return &opts, nil +} + +func findDuplicateProductWarehouses(ctx context.Context, db *gorm.DB, opts *options) ([]duplicateGroup, error) { + filters := "" + args := []any{} + if opts.AreaName != "" { + filters = "WHERE a.name = ?" + args = append(args, opts.AreaName) + } + + query := fmt.Sprintf(` +SELECT + pw.warehouse_id, + w.name AS warehouse_name, + pw.product_id, + p.name AS product_name, + COALESCE(a.name, 'N/A') AS area_name, + COALESCE(l.name, 'N/A') AS location_name, + MIN(pw.id) AS survivor_id, + (SELECT qty FROM product_warehouses WHERE warehouse_id = pw.warehouse_id AND product_id = pw.product_id AND id = MIN(pw.id)) AS survivor_qty, + COUNT(*) - 1 AS absorbed_count, + SUM(pw.qty) AS total_merged_qty, + STRING_AGG(pw.id::text, ', ' ORDER BY pw.id::text) FILTER (WHERE pw.id <> MIN(pw.id)) AS absorbed_ids +FROM product_warehouses pw +JOIN warehouses w ON w.id = pw.warehouse_id +JOIN products p ON p.id = pw.product_id +LEFT JOIN locations l ON l.id = w.location_id +LEFT JOIN areas a ON a.id = l.area_id +%s +GROUP BY pw.warehouse_id, w.name, pw.product_id, p.name, a.name, l.name +HAVING COUNT(*) > 1 +ORDER BY a.name, l.name, w.name, p.name +`, filters) + + rows := make([]duplicateGroup, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + + return rows, nil +} + +func applyConsolidation(ctx context.Context, db *gorm.DB, groups []duplicateGroup) (consolidateSummary, error) { + summary := consolidateSummary{ + TotalDuplicateGroups: len(groups), + UpdatedReferences: make(map[string]int64), + OverallStatus: "PASS", + } + + fifoSvc := commonSvc.NewFifoStockV2Service(db, nil) + + err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + for _, group := range groups { + absorbedIDs := []uint{} + if group.AbsorbedIDs != "" { + parts := strings.Split(group.AbsorbedIDs, ", ") + for _, p := range parts { + var id uint + fmt.Sscanf(p, "%d", &id) + absorbedIDs = append(absorbedIDs, id) + } + } + + if len(absorbedIDs) == 0 { + continue + } + + // Update all references to point to survivor + refTables := []struct { + table string + column string + }{ + {"stock_allocations", "product_warehouse_id"}, + {"fifo_stock_v2_operation_log", "product_warehouse_id"}, + {"fifo_stock_v2_reflow_checkpoints", "product_warehouse_id"}, + {"fifo_stock_v2_shadow_allocations", "product_warehouse_id"}, + {"recording_depletions", "source_product_warehouse_id"}, + {"stock_logs", "product_warehouse_id"}, + } + + for _, ref := range refTables { + res := tx.WithContext(ctx). + Table(ref.table). + Where(fmt.Sprintf("%s IN ?", ref.column), absorbedIDs). + Update(ref.column, group.SurvivorID) + if res.Error != nil { + return fmt.Errorf("update %s.%s: %w", ref.table, ref.column, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedReferences[ref.table+"."+ref.column] += res.RowsAffected + } + } + + // Update survivor qty to merged total + res := tx.WithContext(ctx). + Table("product_warehouses"). + Where("id = ?", group.SurvivorID). + Update("qty", group.TotalMergedQty) + if res.Error != nil { + return fmt.Errorf("update survivor qty: %w", res.Error) + } + + // Delete absorbed product_warehouses + res = tx.WithContext(ctx). + Where("id IN ?", absorbedIDs). + Delete(&struct{}{}) + if res.Error != nil { + return fmt.Errorf("delete absorbed: %w", res.Error) + } + summary.DeletedProductWarehouses += res.RowsAffected + + // Recalculate stock_logs for survivor + if err := recalculateStockLogs(ctx, tx, []uint{group.SurvivorID}); err != nil { + return fmt.Errorf("recalculate stock_logs: %w", err) + } + + // Reflow and recalculate FIFO + if err := reflowProductWarehouse(ctx, fifoSvc, tx, group.SurvivorID); err != nil { + return fmt.Errorf("reflow product_warehouse %d: %w", group.SurvivorID, err) + } + } + + return nil + }) + + if err != nil { + summary.OverallStatus = "FAIL" + return summary, err + } + + return summary, nil +} + +func recalculateStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error { + if len(productWarehouseIDs) == 0 { + return nil + } + + query := ` +WITH recalculated AS ( + SELECT + id, + SUM(COALESCE(increase, 0) - COALESCE(decrease, 0)) + OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock + FROM stock_logs + WHERE product_warehouse_id IN ? +) +UPDATE stock_logs sl +SET stock = recalculated.running_stock +FROM recalculated +WHERE sl.id = recalculated.id +` + return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error +} + +func reflowProductWarehouse(ctx context.Context, fifoSvc commonSvc.FifoStockV2Service, tx *gorm.DB, productWarehouseID uint) error { + type row struct { + FlagGroupCode string `gorm:"column:flag_group_code"` + } + + var selected row + err := tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules rr"). + Select("rr.flag_group_code"). + Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). + Where("rr.is_active = TRUE"). + Where("rr.lane = ?", "STOCKABLE"). + Where("rr.function_code = ?", "PURCHASE_IN"). + Where("rr.source_table = ?", "purchase_items"). + Where(`EXISTS ( + SELECT 1 + FROM product_warehouses pw + JOIN flags f ON f.flagable_id = pw.product_id + JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE + WHERE pw.id = ? + AND f.flagable_type = 'products' + AND fm.flag_group_code = rr.flag_group_code + )`, productWarehouseID). + Order("rr.id ASC"). + Limit(1). + Take(&selected).Error + + if err != nil && err != gorm.ErrRecordNotFound { + return err + } + + if err == gorm.ErrRecordNotFound { + return nil + } + + flagGroupCode := strings.TrimSpace(selected.FlagGroupCode) + + if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ + FlagGroupCode: flagGroupCode, + ProductWarehouseID: productWarehouseID, + Tx: tx, + }); err != nil { + return err + } + + if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{ + ProductWarehouseIDs: []uint{productWarehouseID}, + FlagGroupCodes: []string{flagGroupCode}, + FixDrift: true, + Tx: tx, + }); err != nil { + return err + } + + return nil +} + +func summarizeGroups(groups []duplicateGroup) consolidateSummary { + var totalQty int64 + for _, g := range groups { + totalQty += int64(g.AbsorbedCount) + } + + return consolidateSummary{ + TotalDuplicateGroups: len(groups), + TotalProductWarehouses: totalQty, + OverallStatus: "PASS", + } +} + +func renderConsolidation(mode string, groups []duplicateGroup, summary consolidateSummary) { + if mode == outputJSON { + payload := map[string]any{ + "groups": groups, + "summary": summary, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tLOCATION\tWAREHOUSE\tPRODUCT\tSURVIVOR_ID\tSURVIVOR_QTY\tABSORBED_COUNT\tTOTAL_MERGED_QTY\tABSORBED_IDS") + for _, g := range groups { + fmt.Fprintf( + w, + "%s\t%s\t%s\t%s\t%d\t%.3f\t%d\t%.3f\t%s\n", + g.AreaName, + g.LocationName, + g.WarehouseName, + g.ProductName, + g.SurvivorID, + g.SurvivorQty, + g.AbsorbedCount, + g.TotalMergedQty, + g.AbsorbedIDs, + ) + } + _ = w.Flush() + + fmt.Printf("\n=== SUMMARY ===\n") + fmt.Printf("Duplicate groups found: %d\n", summary.TotalDuplicateGroups) + fmt.Printf("Product warehouses to delete: %d\n", summary.TotalProductWarehouses) + fmt.Printf("Overall status: %s\n", summary.OverallStatus) + + if len(summary.UpdatedReferences) > 0 { + fmt.Println("\nUpdated references:") + keys := make([]string, 0, len(summary.UpdatedReferences)) + for k := range summary.UpdatedReferences { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Printf(" %s=%d\n", k, summary.UpdatedReferences[k]) + } + } +}