package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "sort" "strings" "text/tabwriter" commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" "gitlab.com/mbugroup/lti-api.git/internal/config" "gitlab.com/mbugroup/lti-api.git/internal/database" "gorm.io/gorm" ) const ( outputTable = "table" outputJSON = "json" ) type options struct { Apply bool Output string DBSSLMode string AreaName string } type duplicateGroup struct { WarehouseID uint `json:"warehouse_id"` WarehouseName string `json:"warehouse_name"` ProductID uint `json:"product_id"` ProductName string `json:"product_name"` AreaName string `json:"area_name"` LocationName string `json:"location_name"` ProjectFlockKandangID *uint `json:"project_flock_kandang_id,omitempty"` SurvivorID uint `json:"survivor_id"` SurvivorQty float64 `json:"survivor_qty"` AbsorbedCount int `json:"absorbed_count"` TotalMergedQty float64 `json:"total_merged_qty"` AbsorbedIDs string `json:"absorbed_ids"` } type consolidateSummary struct { TotalDuplicateGroups int `json:"total_duplicate_groups"` TotalProductWarehouses int64 `json:"total_product_warehouses"` UpdatedReferences map[string]int64 `json:"updated_references,omitempty"` DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"` OverallStatus string `json:"overall_status"` } func main() { opts, err := parseFlags() if err != nil { log.Fatalf("invalid flags: %v", err) } if opts.DBSSLMode != "" { config.DBSSLMode = opts.DBSSLMode } ctx := context.Background() db := database.Connect(config.DBHost, config.DBName) // Find duplicate groups groups, err := findDuplicateProductWarehouses(ctx, db, opts) if err != nil { log.Fatalf("failed to find duplicates: %v", err) } if len(groups) == 0 { fmt.Println("No duplicate product_warehouses found") return } summary := summarizeGroups(groups) if !opts.Apply { renderConsolidation(opts.Output, groups, summary) return } applied, err := applyConsolidation(ctx, db, groups) if err != nil { log.Fatalf("apply failed: %v", err) } renderConsolidation(opts.Output, groups, applied) } func parseFlags() (*options, error) { var opts options flag.BoolVar(&opts.Apply, "apply", false, "Apply consolidation (omit for dry-run)") flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json") flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override") flag.StringVar(&opts.AreaName, "area-name", "", "Optional area filter") flag.Parse() opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) opts.AreaName = strings.TrimSpace(opts.AreaName) opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) if opts.Output == "" { opts.Output = outputTable } if opts.Output != outputTable && opts.Output != outputJSON { return nil, fmt.Errorf("unsupported --output=%s", opts.Output) } return &opts, nil } func findDuplicateProductWarehouses(ctx context.Context, db *gorm.DB, opts *options) ([]duplicateGroup, error) { filters := "" args := []any{} if opts.AreaName != "" { filters = "WHERE a.name = ?" args = append(args, opts.AreaName) } query := fmt.Sprintf(` WITH duplicates AS ( SELECT pw.warehouse_id, w.name AS warehouse_name, pw.product_id, p.name AS product_name, COALESCE(a.name, 'N/A') AS area_name, COALESCE(l.name, 'N/A') AS location_name, pw.project_flock_kandang_id, pw.id, pw.qty, MIN(pw.id) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS survivor_id, COUNT(*) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS duplicate_count, SUM(pw.qty) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS total_qty FROM product_warehouses pw JOIN warehouses w ON w.id = pw.warehouse_id JOIN products p ON p.id = pw.product_id LEFT JOIN locations l ON l.id = w.location_id LEFT JOIN areas a ON a.id = l.area_id %s ) SELECT warehouse_id, warehouse_name, product_id, product_name, area_name, location_name, (SELECT project_flock_kandang_id FROM duplicates d2 WHERE d2.id = survivor_id LIMIT 1) AS project_flock_kandang_id, survivor_id, (SELECT qty FROM duplicates d2 WHERE d2.id = survivor_id LIMIT 1) AS survivor_qty, duplicate_count - 1 AS absorbed_count, total_qty AS total_merged_qty, STRING_AGG(id::text, ', ' ORDER BY id::text) FILTER (WHERE id <> survivor_id) AS absorbed_ids FROM duplicates WHERE duplicate_count > 1 GROUP BY warehouse_id, warehouse_name, product_id, product_name, area_name, location_name, survivor_id, total_qty, duplicate_count ORDER BY area_name, location_name, warehouse_name, product_name `, filters) rows := make([]duplicateGroup, 0) if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { return nil, err } return rows, nil } func applyConsolidation(ctx context.Context, db *gorm.DB, groups []duplicateGroup) (consolidateSummary, error) { summary := consolidateSummary{ TotalDuplicateGroups: len(groups), UpdatedReferences: make(map[string]int64), OverallStatus: "PASS", } fifoSvc := commonSvc.NewFifoStockV2Service(db, nil) err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { for _, group := range groups { absorbedIDs := []uint{} if group.AbsorbedIDs != "" { parts := strings.Split(group.AbsorbedIDs, ", ") for _, p := range parts { var id uint fmt.Sscanf(p, "%d", &id) absorbedIDs = append(absorbedIDs, id) } } if len(absorbedIDs) == 0 { continue } // Update all references to point to survivor refTables := []struct { table string column string }{ {"stock_allocations", "product_warehouse_id"}, {"stock_logs", "product_warehouse_id"}, {"purchase_items", "product_warehouse_id"}, {"recording_stocks", "product_warehouse_id"}, {"recording_eggs", "product_warehouse_id"}, {"recording_depletions", "product_warehouse_id"}, {"recording_depletions", "source_product_warehouse_id"}, {"marketing_delivery_products", "product_warehouse_id"}, {"marketing_products", "product_warehouse_id"}, {"stock_transfer_details", "source_product_warehouse_id"}, {"stock_transfer_details", "dest_product_warehouse_id"}, {"adjustment_stocks", "product_warehouse_id"}, {"laying_transfer_sources", "product_warehouse_id"}, {"laying_transfer_targets", "product_warehouse_id"}, {"laying_transfers", "source_product_warehouse_id"}, {"project_chickin_details", "product_warehouse_id"}, {"project_chickins", "product_warehouse_id"}, {"project_flock_populations", "product_warehouse_id"}, {"fifo_stock_v2_operation_log", "product_warehouse_id"}, {"fifo_stock_v2_reflow_checkpoints", "product_warehouse_id"}, {"fifo_stock_v2_shadow_allocations", "product_warehouse_id"}, } for _, ref := range refTables { res := tx.WithContext(ctx). Table(ref.table). Where(fmt.Sprintf("%s IN ?", ref.column), absorbedIDs). Update(ref.column, group.SurvivorID) if res.Error != nil { return fmt.Errorf("update %s.%s: %w", ref.table, ref.column, res.Error) } if res.RowsAffected > 0 { summary.UpdatedReferences[ref.table+"."+ref.column] += res.RowsAffected } } // Update survivor qty to merged total res := tx.WithContext(ctx). Table("product_warehouses"). Where("id = ?", group.SurvivorID). Update("qty", group.TotalMergedQty) if res.Error != nil { return fmt.Errorf("update survivor qty: %w", res.Error) } // Clear project_flock_kandang_id for LOKASI warehouse survivors if err := tx.WithContext(ctx).Exec(` UPDATE product_warehouses pw SET project_flock_kandang_id = NULL FROM warehouses w WHERE pw.warehouse_id = w.id AND pw.id = ? AND UPPER(w.type) = 'LOKASI' AND pw.project_flock_kandang_id IS NOT NULL `, group.SurvivorID).Error; err != nil { return fmt.Errorf("clear project_flock_kandang_id survivor %d: %w", group.SurvivorID, err) } // Delete absorbed product_warehouses res = tx.WithContext(ctx). Table("product_warehouses"). Where("id IN ?", absorbedIDs). Delete(nil) if res.Error != nil { return fmt.Errorf("delete absorbed: %w", res.Error) } summary.DeletedProductWarehouses += res.RowsAffected // Recalculate stock_logs for survivor if err := recalculateStockLogs(ctx, tx, []uint{group.SurvivorID}); err != nil { return fmt.Errorf("recalculate stock_logs: %w", err) } // Reflow and recalculate FIFO if err := reflowProductWarehouse(ctx, fifoSvc, tx, group.SurvivorID); err != nil { return fmt.Errorf("reflow product_warehouse %d: %w", group.SurvivorID, err) } } return nil }) if err != nil { summary.OverallStatus = "FAIL" return summary, err } return summary, nil } func recalculateStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error { if len(productWarehouseIDs) == 0 { return nil } query := ` WITH recalculated AS ( SELECT id, SUM(COALESCE(increase, 0) - COALESCE(decrease, 0)) OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock FROM stock_logs WHERE product_warehouse_id IN ? ) UPDATE stock_logs sl SET stock = recalculated.running_stock FROM recalculated WHERE sl.id = recalculated.id ` return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error } func reflowProductWarehouse(ctx context.Context, fifoSvc commonSvc.FifoStockV2Service, tx *gorm.DB, productWarehouseID uint) error { type row struct { FlagGroupCode string `gorm:"column:flag_group_code"` } var selected row err := tx.WithContext(ctx). Table("fifo_stock_v2_route_rules rr"). Select("rr.flag_group_code"). Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). Where("rr.is_active = TRUE"). Where("rr.lane = ?", "STOCKABLE"). Where("rr.function_code = ?", "PURCHASE_IN"). Where("rr.source_table = ?", "purchase_items"). Where(`EXISTS ( SELECT 1 FROM product_warehouses pw JOIN flags f ON f.flagable_id = pw.product_id JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE WHERE pw.id = ? AND f.flagable_type = 'products' AND fm.flag_group_code = rr.flag_group_code )`, productWarehouseID). Order("rr.id ASC"). Limit(1). Take(&selected).Error if err != nil && err != gorm.ErrRecordNotFound { return err } if err == gorm.ErrRecordNotFound { return nil } flagGroupCode := strings.TrimSpace(selected.FlagGroupCode) if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ FlagGroupCode: flagGroupCode, ProductWarehouseID: productWarehouseID, Tx: tx, }); err != nil { return err } if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{ ProductWarehouseIDs: []uint{productWarehouseID}, FlagGroupCodes: []string{flagGroupCode}, FixDrift: true, Tx: tx, }); err != nil { return err } return nil } func summarizeGroups(groups []duplicateGroup) consolidateSummary { var totalQty int64 for _, g := range groups { totalQty += int64(g.AbsorbedCount) } return consolidateSummary{ TotalDuplicateGroups: len(groups), TotalProductWarehouses: totalQty, OverallStatus: "PASS", } } func renderConsolidation(mode string, groups []duplicateGroup, summary consolidateSummary) { if mode == outputJSON { payload := map[string]any{ "groups": groups, "summary": summary, } enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") _ = enc.Encode(payload) return } w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) fmt.Fprintln(w, "AREA\tLOCATION\tWAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_ID\tSURVIVOR_QTY\tABSORBED_COUNT\tTOTAL_MERGED_QTY\tABSORBED_IDS") for _, g := range groups { pfkID := "-" if g.ProjectFlockKandangID != nil { pfkID = fmt.Sprintf("%d", *g.ProjectFlockKandangID) } fmt.Fprintf( w, "%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%d\t%.3f\t%s\n", g.AreaName, g.LocationName, g.WarehouseName, g.ProductName, pfkID, g.SurvivorID, g.SurvivorQty, g.AbsorbedCount, g.TotalMergedQty, g.AbsorbedIDs, ) } _ = w.Flush() fmt.Printf("\n=== SUMMARY ===\n") fmt.Printf("Duplicate groups found: %d\n", summary.TotalDuplicateGroups) fmt.Printf("Product warehouses to delete: %d\n", summary.TotalProductWarehouses) fmt.Printf("Overall status: %s\n", summary.OverallStatus) if len(summary.UpdatedReferences) > 0 { fmt.Println("\nUpdated references:") keys := make([]string, 0, len(summary.UpdatedReferences)) for k := range summary.UpdatedReferences { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { fmt.Printf(" %s=%d\n", k, summary.UpdatedReferences[k]) } } }