package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "regexp" "sort" "strings" "text/tabwriter" "time" commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" "gitlab.com/mbugroup/lti-api.git/internal/config" "gitlab.com/mbugroup/lti-api.git/internal/database" entity "gitlab.com/mbugroup/lti-api.git/internal/entities" "gorm.io/gorm" ) const ( repointOutputTable = "table" repointOutputJSON = "json" ) type options struct { Apply bool Output string AreaName string KandangLocationName string DBSSLMode string DeleteWrongWarehouses bool AllowMovingAllocatedStocks bool } type planRow struct { AreaName string `gorm:"column:area_name" json:"area_name"` KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"` KandangID uint `gorm:"column:kandang_id" json:"kandang_id"` KandangName string `gorm:"column:kandang_name" json:"kandang_name"` WrongWarehouseID uint `gorm:"column:wrong_warehouse_id" json:"wrong_warehouse_id"` WrongWarehouseName string `gorm:"column:wrong_warehouse_name" json:"wrong_warehouse_name"` CorrectWarehouseID uint `gorm:"column:correct_warehouse_id" json:"correct_warehouse_id"` CorrectWarehouseName string `gorm:"column:correct_warehouse_name" json:"correct_warehouse_name"` ProductID uint `gorm:"column:product_id" json:"product_id"` ProductName string `gorm:"column:product_name" json:"product_name"` ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id" json:"project_flock_kandang_id,omitempty"` SurvivorPWID uint `gorm:"column:survivor_pw_id" json:"survivor_pw_id"` SurvivorCurrentQty float64 `gorm:"column:survivor_current_qty" json:"survivor_current_qty"` AbsorbedPWID *uint `gorm:"column:absorbed_pw_id" json:"absorbed_pw_id,omitempty"` AbsorbedCurrentQty *float64 `gorm:"column:absorbed_current_qty" json:"absorbed_current_qty,omitempty"` } type pwMove struct { FromID uint ToID uint } type warehouseMove struct { WrongWarehouseID uint CorrectWarehouseID uint } type applySummary struct { TargetScope string `json:"target_scope,omitempty"` PlanRows int `json:"plan_rows"` WrongWarehouses int `json:"wrong_warehouses"` SurvivorProductWarehouses int `json:"survivor_product_warehouses"` AbsorbedProductWarehouses int `json:"absorbed_product_warehouses"` NeedsReflowProductWarehouses int `json:"needs_reflow_product_warehouses"` UpdatedPWRefs map[string]int64 `json:"updated_pw_refs,omitempty"` UpdatedWarehouseRefs map[string]int64 `json:"updated_warehouse_refs,omitempty"` DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"` SoftDeletedWarehouses int64 `json:"soft_deleted_warehouses,omitempty"` } type reference struct { Table string Column string } type referencePlan struct { ProductWarehouseRefs []reference WarehouseRefs []reference BlockedWarehouseRefs []reference } var sqlIdentifierPattern = regexp.MustCompile(`^[a-z_][a-z0-9_]*$`) var extraProductWarehouseRefs = []reference{ {Table: "fifo_stock_v2_operation_log", Column: "product_warehouse_id"}, {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, {Table: "recording_depletions", Column: "source_product_warehouse_id"}, } var allowedWarehouseRefs = []reference{ {Table: "product_warehouses", Column: "warehouse_id"}, {Table: "purchase_items", Column: "warehouse_id"}, } var blockedWarehouseRefs = []reference{ {Table: "stock_transfers", Column: "from_warehouse_id"}, {Table: "stock_transfers", Column: "to_warehouse_id"}, } func main() { opts, err := parseFlags() if err != nil { log.Fatalf("invalid flags: %v", err) } if opts.DBSSLMode != "" { config.DBSSLMode = opts.DBSSLMode } ctx := context.Background() db := database.Connect(config.DBHost, config.DBName) rows, err := loadPlanRows(ctx, db, opts) if err != nil { log.Fatalf("failed to load plan rows: %v", err) } if opts.AllowMovingAllocatedStocks { allocatedRows, err := loadPlanRowsWithAllocations(ctx, db, opts) if err != nil { log.Fatalf("failed to load allocated plan rows: %v", err) } rows = append(rows, allocatedRows...) // Remove duplicates rows = deduplicatePlanRows(rows) } if len(rows) == 0 { fmt.Println("No misplaced PAKAN/OVK stocks found in wrong-location warehouses") return } refs, err := buildReferencePlan(ctx, db) if err != nil { log.Fatalf("failed to inspect warehouse references: %v", err) } if err := runPrechecks(ctx, db, rows, refs, opts); err != nil { log.Fatalf("precheck failed: %v", err) } summary := summarizePlan(rows) if !opts.Apply { renderPlan(opts.Output, rows, summary) return } applied, err := applyPlan(ctx, db, rows, opts, refs) if err != nil { log.Fatalf("apply failed: %v", err) } renderPlan(opts.Output, rows, applied) } func parseFlags() (*options, error) { var opts options flag.BoolVar(&opts.Apply, "apply", false, "Apply the migration. If false, run as dry-run") flag.StringVar(&opts.Output, "output", repointOutputTable, "Output format: table or json") flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter") flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter") flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require") flag.BoolVar(&opts.DeleteWrongWarehouses, "delete-wrong-warehouses", true, "Soft delete wrong warehouse rows after all references have been moved") flag.BoolVar(&opts.AllowMovingAllocatedStocks, "allow-moving-allocated-stocks", false, "Allow moving stocks that have active allocations (use with caution - for old recordings with completed allocations)") flag.Parse() opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) opts.AreaName = strings.TrimSpace(opts.AreaName) opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName) opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) if opts.Output == "" { opts.Output = repointOutputTable } if opts.Output != repointOutputTable && opts.Output != repointOutputJSON { return nil, fmt.Errorf("unsupported --output=%s", opts.Output) } return &opts, nil } func deduplicatePlanRows(rows []planRow) []planRow { seen := make(map[uint]struct{}) result := make([]planRow, 0, len(rows)) for _, row := range rows { if _, ok := seen[row.SurvivorPWID]; !ok { seen[row.SurvivorPWID] = struct{}{} result = append(result, row) } } return result } func loadPlanRowsWithAllocations(ctx context.Context, db *gorm.DB, opts *options) ([]planRow, error) { filters := make([]string, 0, 2) args := make([]any, 0, 2) if opts.AreaName != "" { filters = append(filters, "a.name = ?") args = append(args, opts.AreaName) } if opts.KandangLocationName != "" { filters = append(filters, "kl.name = ?") args = append(args, opts.KandangLocationName) } query := fmt.Sprintf(` SELECT a.name AS area_name, kl.name AS kandang_location_name, k.id AS kandang_id, k.name AS kandang_name, w.id AS wrong_warehouse_id, w.name AS wrong_warehouse_name, correct_w.id AS correct_warehouse_id, correct_w.name AS correct_warehouse_name, p.id AS product_id, p.name AS product_name, wp.project_flock_kandang_id, wp.id AS survivor_pw_id, COALESCE(wp.qty, 0) AS survivor_current_qty, cpw.id AS absorbed_pw_id, cpw.qty AS absorbed_current_qty FROM warehouses w JOIN kandangs k ON k.id = w.kandang_id AND k.deleted_at IS NULL JOIN locations kl ON kl.id = k.location_id JOIN areas a ON a.id = kl.area_id JOIN LATERAL ( SELECT w2.id, w2.name FROM warehouses w2 WHERE w2.location_id = k.location_id AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' AND w2.deleted_at IS NULL ORDER BY w2.id ASC LIMIT 1 ) AS correct_w ON TRUE JOIN product_warehouses wp ON wp.warehouse_id = w.id JOIN products p ON p.id = wp.product_id JOIN flags f ON f.flagable_id = p.id AND f.flagable_type = 'products' AND UPPER(f.name) IN ('PAKAN', 'OVK') LEFT JOIN product_warehouses cpw ON cpw.product_id = wp.product_id AND cpw.warehouse_id = correct_w.id AND cpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id WHERE w.deleted_at IS NULL AND w.kandang_id IS NOT NULL AND w.location_id IS DISTINCT FROM k.location_id %s ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC `, andClause(filters)) rows := make([]planRow, 0) if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { return nil, err } return rows, nil } func loadPlanRows(ctx context.Context, db *gorm.DB, opts *options) ([]planRow, error) { filters := make([]string, 0, 2) args := make([]any, 0, 2) if opts.AreaName != "" { filters = append(filters, "a.name = ?") args = append(args, opts.AreaName) } if opts.KandangLocationName != "" { filters = append(filters, "kl.name = ?") args = append(args, opts.KandangLocationName) } query := fmt.Sprintf(` SELECT a.name AS area_name, kl.name AS kandang_location_name, k.id AS kandang_id, k.name AS kandang_name, w.id AS wrong_warehouse_id, w.name AS wrong_warehouse_name, correct_w.id AS correct_warehouse_id, correct_w.name AS correct_warehouse_name, p.id AS product_id, p.name AS product_name, wp.project_flock_kandang_id, wp.id AS survivor_pw_id, COALESCE(wp.qty, 0) AS survivor_current_qty, cpw.id AS absorbed_pw_id, cpw.qty AS absorbed_current_qty FROM warehouses w JOIN kandangs k ON k.id = w.kandang_id AND k.deleted_at IS NULL JOIN locations kl ON kl.id = k.location_id JOIN areas a ON a.id = kl.area_id JOIN LATERAL ( SELECT w2.id, w2.name FROM warehouses w2 WHERE w2.location_id = k.location_id AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' AND w2.deleted_at IS NULL ORDER BY w2.id ASC LIMIT 1 ) AS correct_w ON TRUE JOIN product_warehouses wp ON wp.warehouse_id = w.id JOIN products p ON p.id = wp.product_id JOIN flags f ON f.flagable_id = p.id AND f.flagable_type = 'products' AND UPPER(f.name) IN ('PAKAN', 'OVK') LEFT JOIN product_warehouses cpw ON cpw.product_id = wp.product_id AND cpw.warehouse_id = correct_w.id AND cpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id WHERE w.deleted_at IS NULL AND w.kandang_id IS NOT NULL AND w.location_id IS DISTINCT FROM k.location_id AND NOT EXISTS ( SELECT 1 FROM stock_allocations sa WHERE sa.stockable_type = 'PURCHASE_ITEMS' AND sa.stockable_id IN ( SELECT pi.id FROM purchase_items pi WHERE pi.warehouse_id = w.id AND pi.product_id = p.id ) AND sa.status = 'ACTIVE' AND sa.allocation_purpose = 'CONSUME' AND sa.deleted_at IS NULL ) %s ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC `, andClause(filters)) rows := make([]planRow, 0) if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { return nil, err } return rows, nil } func buildReferencePlan(ctx context.Context, db *gorm.DB) (*referencePlan, error) { productWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "product_warehouses") if err != nil { return nil, err } productWarehouseRefs = mergeReferences(productWarehouseRefs, extraProductWarehouseRefs) if err := ensureHandledColumns(ctx, db, "%product_warehouse_id%", productWarehouseRefs, nil); err != nil { return nil, err } discoveredWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "warehouses") if err != nil { return nil, err } allowedWarehouseSet := referenceSet(allowedWarehouseRefs) blockedWarehouseSet := referenceSet(blockedWarehouseRefs) warehouseRefs := make([]reference, 0, len(discoveredWarehouseRefs)) for _, ref := range discoveredWarehouseRefs { key := ref.key() if _, ok := allowedWarehouseSet[key]; ok { warehouseRefs = append(warehouseRefs, ref) continue } if _, ok := blockedWarehouseSet[key]; ok { continue } return nil, fmt.Errorf("unsupported warehouse foreign-key reference discovered: %s", key) } if err := ensureHandledColumns(ctx, db, "%warehouse_id%", mergeReferences(warehouseRefs, blockedWarehouseRefs), []string{"%product_warehouse_id%"}); err != nil { return nil, err } return &referencePlan{ ProductWarehouseRefs: productWarehouseRefs, WarehouseRefs: warehouseRefs, BlockedWarehouseRefs: append([]reference(nil), blockedWarehouseRefs...), }, nil } func runPrechecks(ctx context.Context, db *gorm.DB, rows []planRow, refs *referencePlan, opts *options) error { if opts.DeleteWrongWarehouses { if err := ensureNoBlockedWarehouseRefs(ctx, db, rows, refs.BlockedWarehouseRefs); err != nil { return err } } if err := ensureNoPurchaseItemWarehouseConflicts(ctx, db, rows); err != nil { return err } if err := ensureNoInFlightFifoArtifacts(ctx, db, rows); err != nil { return err } return nil } func discoverSingleColumnFKReferences(ctx context.Context, db *gorm.DB, referencedTable string) ([]reference, error) { type countRow struct { Count int64 `gorm:"column:cnt"` } var multiColumn countRow if err := db.WithContext(ctx).Raw(` SELECT COUNT(*) AS cnt FROM pg_constraint c JOIN pg_class ref ON ref.oid = c.confrelid JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace WHERE c.contype = 'f' AND ref_ns.nspname = 'public' AND ref.relname = ? AND COALESCE(array_length(c.conkey, 1), 0) <> 1 `, referencedTable).Scan(&multiColumn).Error; err != nil { return nil, err } if multiColumn.Count > 0 { return nil, fmt.Errorf("table %s has %d multi-column foreign keys; command only supports single-column rewrites", referencedTable, multiColumn.Count) } type row struct { Table string `gorm:"column:table_name"` Column string `gorm:"column:column_name"` } selected := make([]row, 0) if err := db.WithContext(ctx).Raw(` SELECT src.relname AS table_name, src_att.attname AS column_name FROM pg_constraint c JOIN pg_class src ON src.oid = c.conrelid JOIN pg_namespace src_ns ON src_ns.oid = src.relnamespace JOIN pg_class ref ON ref.oid = c.confrelid JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace JOIN pg_attribute src_att ON src_att.attrelid = src.oid AND src_att.attnum = c.conkey[1] WHERE c.contype = 'f' AND src_ns.nspname = 'public' AND ref_ns.nspname = 'public' AND ref.relname = ? AND COALESCE(array_length(c.conkey, 1), 0) = 1 ORDER BY src.relname ASC, src_att.attname ASC `, referencedTable).Scan(&selected).Error; err != nil { return nil, err } refs := make([]reference, 0, len(selected)) for _, item := range selected { ref := reference{ Table: strings.TrimSpace(item.Table), Column: strings.TrimSpace(item.Column), } if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { return nil, fmt.Errorf("unsafe identifier discovered while inspecting %s references: %s.%s", referencedTable, ref.Table, ref.Column) } refs = append(refs, ref) } return refs, nil } func ensureHandledColumns( ctx context.Context, db *gorm.DB, columnPattern string, handled []reference, excludePatterns []string, ) error { type row struct { Table string `gorm:"column:table_name"` Column string `gorm:"column:column_name"` } query := ` SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'public' AND column_name LIKE ? ` args := []any{columnPattern} for _, pattern := range excludePatterns { query += " AND column_name NOT LIKE ?\n" args = append(args, pattern) } query += "ORDER BY table_name ASC, column_name ASC" rows := make([]row, 0) if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { return err } handledSet := referenceSet(handled) for _, item := range rows { ref := reference{ Table: strings.TrimSpace(item.Table), Column: strings.TrimSpace(item.Column), } if _, ok := handledSet[ref.key()]; ok { continue } return fmt.Errorf("unhandled warehouse-related column discovered: %s", ref.key()) } return nil } func ensureNoBlockedWarehouseRefs(ctx context.Context, db *gorm.DB, rows []planRow, blocked []reference) error { wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) if len(wrongWarehouseIDs) == 0 { return nil } for _, ref := range blocked { var count int64 if err := db.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs). Count(&count).Error; err != nil { return err } if count > 0 { return fmt.Errorf("found %d rows in %s.%s referencing the wrong warehouse ids; aborting", count, ref.Table, ref.Column) } } return nil } func ensureNoPurchaseItemWarehouseConflicts(ctx context.Context, db *gorm.DB, rows []planRow) error { wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) if len(wrongWarehouseIDs) == 0 { return nil } type countRow struct { Count int64 `gorm:"column:cnt"` } var row countRow if err := db.WithContext(ctx).Raw(` WITH wrong_pairs AS ( SELECT pi.id AS wrong_purchase_item_id, pi.purchase_id, pi.product_id, pi.warehouse_id AS wrong_warehouse_id, w2.id AS correct_warehouse_id FROM purchase_items pi JOIN warehouses w ON w.id = pi.warehouse_id AND w.deleted_at IS NULL JOIN kandangs k ON k.id = w.kandang_id AND k.deleted_at IS NULL JOIN LATERAL ( SELECT w2.id, w2.name FROM warehouses w2 WHERE w2.location_id = k.location_id AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' AND w2.deleted_at IS NULL ORDER BY w2.id ASC LIMIT 1 ) AS w2 ON TRUE WHERE pi.warehouse_id IN ? AND w.kandang_id IS NOT NULL AND w.location_id IS DISTINCT FROM k.location_id ) SELECT COUNT(*) AS cnt FROM wrong_pairs wp JOIN purchase_items target ON target.purchase_id = wp.purchase_id AND target.product_id = wp.product_id AND target.warehouse_id = wp.correct_warehouse_id `, wrongWarehouseIDs).Scan(&row).Error; err != nil { return err } if row.Count > 0 { return fmt.Errorf("found %d purchase_item uniqueness collisions on (purchase_id, product_id, warehouse_id); aborting", row.Count) } return nil } func ensureNoInFlightFifoArtifacts(ctx context.Context, db *gorm.DB, rows []planRow) error { affectedPWIDs := mergeUintSlices(uniqueSurvivorIDs(rows), uniqueAbsorbedIDs(rows)) if len(affectedPWIDs) == 0 { return nil } for _, ref := range []reference{ {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, } { var count int64 if err := db.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), affectedPWIDs). Count(&count).Error; err != nil { return err } if count > 0 { return fmt.Errorf("found %d in-flight FIFO rows in %s for affected product warehouses; aborting", count, ref.Table) } } return nil } func applyPlan( ctx context.Context, db *gorm.DB, rows []planRow, opts *options, refs *referencePlan, ) (applySummary, error) { summary := summarizePlan(rows) summary.UpdatedPWRefs = make(map[string]int64) summary.UpdatedWarehouseRefs = make(map[string]int64) pwMoves := uniquePWMoves(rows) warehouseMoves := uniqueWarehouseMoves(rows) survivorWarehouseMap := buildSurvivorWarehouseMap(rows) duplicateSurvivors := uniqueDuplicateSurvivorIDs(rows) fifoSvc := commonSvc.NewFifoStockV2Service(db, nil) now := time.Now().UTC() err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { for _, move := range pwMoves { for _, ref := range refs.ProductWarehouseRefs { res := tx.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.FromID). Update(ref.Column, move.ToID) if res.Error != nil { return fmt.Errorf("move %s from pw %d to %d: %w", ref.Table+"."+ref.Column, move.FromID, move.ToID, res.Error) } if res.RowsAffected > 0 { summary.UpdatedPWRefs[ref.Table+"."+ref.Column] += res.RowsAffected } } } if len(pwMoves) > 0 { fromIDs := make([]uint, 0, len(pwMoves)) for _, move := range pwMoves { fromIDs = append(fromIDs, move.FromID) } res := tx.WithContext(ctx). Where("id IN ?", fromIDs). Delete(&entity.ProductWarehouse{}) if res.Error != nil { return fmt.Errorf("delete absorbed product warehouses: %w", res.Error) } summary.DeletedProductWarehouses = res.RowsAffected } for survivorID, correctWarehouseID := range survivorWarehouseMap { res := tx.WithContext(ctx). Table("product_warehouses"). Where("id = ?", survivorID). Update("warehouse_id", correctWarehouseID) if res.Error != nil { return fmt.Errorf("update survivor product_warehouse %d -> warehouse %d: %w", survivorID, correctWarehouseID, res.Error) } if res.RowsAffected > 0 { summary.UpdatedWarehouseRefs["product_warehouses.warehouse_id"] += res.RowsAffected } } for _, move := range warehouseMoves { for _, ref := range refs.WarehouseRefs { res := tx.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.WrongWarehouseID). Update(ref.Column, move.CorrectWarehouseID) if res.Error != nil { return fmt.Errorf("update %s from warehouse %d to %d: %w", ref.Table+"."+ref.Column, move.WrongWarehouseID, move.CorrectWarehouseID, res.Error) } if res.RowsAffected > 0 { summary.UpdatedWarehouseRefs[ref.Table+"."+ref.Column] += res.RowsAffected } } } if len(duplicateSurvivors) > 0 { if err := recomputeStockLogs(ctx, tx, duplicateSurvivors); err != nil { return err } } for _, survivorID := range duplicateSurvivors { if err := reflowAndRecalculateProductWarehouse(ctx, fifoSvc, tx, survivorID); err != nil { return err } } if opts.DeleteWrongWarehouses { wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) if len(wrongWarehouseIDs) > 0 { res := tx.WithContext(ctx). Table("warehouses"). Where("id IN ? AND deleted_at IS NULL", wrongWarehouseIDs). Updates(map[string]any{ "deleted_at": now, "updated_at": now, }) if res.Error != nil { return fmt.Errorf("soft delete wrong warehouses: %w", res.Error) } summary.SoftDeletedWarehouses = res.RowsAffected } } if err := verifyNoWrongWarehouseRefsRemain(ctx, tx, rows, pwMoves, refs); err != nil { return err } return nil }) if err != nil { return applySummary{}, err } return summary, nil } func recomputeStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error { if len(productWarehouseIDs) == 0 { return nil } query := ` WITH recalculated AS ( SELECT id, SUM(COALESCE(increase, 0) - COALESCE(decrease, 0)) OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock FROM stock_logs WHERE product_warehouse_id IN ? ) UPDATE stock_logs sl SET stock = recalculated.running_stock FROM recalculated WHERE sl.id = recalculated.id ` return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error } func verifyNoWrongWarehouseRefsRemain( ctx context.Context, tx *gorm.DB, rows []planRow, pwMoves []pwMove, refs *referencePlan, ) error { wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) if len(wrongWarehouseIDs) > 0 { for _, ref := range refs.WarehouseRefs { var remaining int64 if err := tx.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs). Count(&remaining).Error; err != nil { return err } if remaining > 0 { return fmt.Errorf("verification failed: %d rows still point to wrong warehouses via %s.%s", remaining, ref.Table, ref.Column) } } } if len(pwMoves) > 0 { fromIDs := make([]uint, 0, len(pwMoves)) for _, move := range pwMoves { fromIDs = append(fromIDs, move.FromID) } var remaining int64 for _, ref := range refs.ProductWarehouseRefs { if err := tx.WithContext(ctx). Table(ref.Table). Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), fromIDs). Count(&remaining).Error; err != nil { return err } if remaining > 0 { return fmt.Errorf("verification failed: %d rows still point to absorbed product_warehouses via %s.%s", remaining, ref.Table, ref.Column) } } } return nil } func reflowAndRecalculateProductWarehouse( ctx context.Context, fifoSvc commonSvc.FifoStockV2Service, tx *gorm.DB, productWarehouseID uint, ) error { flagGroupCode, err := resolveFlagGroupByProductWarehouse(ctx, tx, productWarehouseID) if err != nil { return fmt.Errorf("resolve flag group for product_warehouse %d: %w", productWarehouseID, err) } if flagGroupCode == "" { return nil } if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ FlagGroupCode: flagGroupCode, ProductWarehouseID: productWarehouseID, Tx: tx, }); err != nil { return fmt.Errorf("reflow product_warehouse %d: %w", productWarehouseID, err) } if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{ ProductWarehouseIDs: []uint{productWarehouseID}, FlagGroupCodes: []string{flagGroupCode}, FixDrift: true, Tx: tx, }); err != nil { return fmt.Errorf("recalculate product_warehouse %d: %w", productWarehouseID, err) } return nil } func resolveFlagGroupByProductWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) (string, error) { type row struct { FlagGroupCode string `gorm:"column:flag_group_code"` } var selected row err := tx.WithContext(ctx). Table("fifo_stock_v2_route_rules rr"). Select("rr.flag_group_code"). Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). Where("rr.is_active = TRUE"). Where("rr.lane = ?", "STOCKABLE"). Where("rr.function_code = ?", "PURCHASE_IN"). Where("rr.source_table = ?", "purchase_items"). Where(` EXISTS ( SELECT 1 FROM product_warehouses pw JOIN flags f ON f.flagable_id = pw.product_id JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE WHERE pw.id = ? AND f.flagable_type = ? AND fm.flag_group_code = rr.flag_group_code ) `, productWarehouseID, entity.FlagableTypeProduct). Order("rr.id ASC"). Limit(1). Take(&selected).Error if err == gorm.ErrRecordNotFound { return "", nil } if err != nil { return "", err } return strings.TrimSpace(selected.FlagGroupCode), nil } func summarizePlan(rows []planRow) applySummary { summary := applySummary{ TargetScope: "farm", PlanRows: len(rows), WrongWarehouses: len(uniqueWrongWarehouseIDs(rows)), SurvivorProductWarehouses: len(uniqueSurvivorIDs(rows)), AbsorbedProductWarehouses: len(uniqueAbsorbedIDs(rows)), NeedsReflowProductWarehouses: len(uniqueDuplicateSurvivorIDs(rows)), } return summary } func renderPlan(mode string, rows []planRow, summary applySummary) { if mode == repointOutputJSON { payload := map[string]any{ "rows": rows, "summary": summary, } enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") _ = enc.Encode(payload) return } w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tWRONG_WAREHOUSE\tTARGET_WAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_PW\tSURVIVOR_QTY\tABSORBED_PW\tABSORBED_QTY\tNEEDS_REFLOW") for _, row := range rows { fmt.Fprintf( w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%s\t%s\t%t\n", row.AreaName, row.KandangLocationName, row.KandangName, row.WrongWarehouseName, row.CorrectWarehouseName, row.ProductName, displayOptionalUint(row.ProjectFlockKandangID), row.SurvivorPWID, row.SurvivorCurrentQty, displayOptionalUint(row.AbsorbedPWID), displayOptionalFloat(row.AbsorbedCurrentQty), row.AbsorbedPWID != nil, ) } _ = w.Flush() fmt.Printf( "\nSummary: target_scope=%s plan_rows=%d wrong_warehouses=%d survivor_pws=%d absorbed_pws=%d needs_reflow_pws=%d deleted_product_warehouses=%d soft_deleted_warehouses=%d\n", summary.TargetScope, summary.PlanRows, summary.WrongWarehouses, summary.SurvivorProductWarehouses, summary.AbsorbedProductWarehouses, summary.NeedsReflowProductWarehouses, summary.DeletedProductWarehouses, summary.SoftDeletedWarehouses, ) if len(summary.UpdatedPWRefs) > 0 { fmt.Println("Updated product_warehouse refs:") printSortedCounts(summary.UpdatedPWRefs) } if len(summary.UpdatedWarehouseRefs) > 0 { fmt.Println("Updated warehouse refs:") printSortedCounts(summary.UpdatedWarehouseRefs) } } func printSortedCounts(values map[string]int64) { keys := make([]string, 0, len(values)) for key := range values { keys = append(keys, key) } sort.Strings(keys) for _, key := range keys { fmt.Printf(" %s=%d\n", key, values[key]) } } func (r reference) key() string { return r.Table + "." + r.Column } func referenceSet(values []reference) map[string]struct{} { out := make(map[string]struct{}, len(values)) for _, value := range values { out[value.key()] = struct{}{} } return out } func mergeReferences(groups ...[]reference) []reference { seen := make(map[string]struct{}) out := make([]reference, 0) for _, group := range groups { for _, ref := range group { if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { continue } if _, ok := seen[ref.key()]; ok { continue } seen[ref.key()] = struct{}{} out = append(out, ref) } } sort.Slice(out, func(i, j int) bool { if out[i].Table == out[j].Table { return out[i].Column < out[j].Column } return out[i].Table < out[j].Table }) return out } func mergeUintSlices(groups ...[]uint) []uint { seen := make(map[uint]struct{}) out := make([]uint, 0) for _, group := range groups { for _, value := range group { if value == 0 { continue } if _, ok := seen[value]; ok { continue } seen[value] = struct{}{} out = append(out, value) } } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func isSafeIdentifier(value string) bool { return sqlIdentifierPattern.MatchString(strings.TrimSpace(value)) } func quotedIdentifier(value string) string { if !isSafeIdentifier(value) { panic(fmt.Sprintf("unsafe SQL identifier: %s", value)) } return `"` + value + `"` } func uniquePWMoves(rows []planRow) []pwMove { seen := make(map[string]struct{}) out := make([]pwMove, 0) for _, row := range rows { if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 || *row.AbsorbedPWID == row.SurvivorPWID { continue } key := fmt.Sprintf("%d:%d", *row.AbsorbedPWID, row.SurvivorPWID) if _, ok := seen[key]; ok { continue } seen[key] = struct{}{} out = append(out, pwMove{FromID: *row.AbsorbedPWID, ToID: row.SurvivorPWID}) } return out } func uniqueWarehouseMoves(rows []planRow) []warehouseMove { seen := make(map[uint]uint) out := make([]warehouseMove, 0) for _, row := range rows { if _, ok := seen[row.WrongWarehouseID]; ok { continue } seen[row.WrongWarehouseID] = row.CorrectWarehouseID out = append(out, warehouseMove{ WrongWarehouseID: row.WrongWarehouseID, CorrectWarehouseID: row.CorrectWarehouseID, }) } return out } func buildSurvivorWarehouseMap(rows []planRow) map[uint]uint { out := make(map[uint]uint) for _, row := range rows { out[row.SurvivorPWID] = row.CorrectWarehouseID } return out } func uniqueWrongWarehouseIDs(rows []planRow) []uint { seen := make(map[uint]struct{}) out := make([]uint, 0) for _, row := range rows { if _, ok := seen[row.WrongWarehouseID]; ok { continue } seen[row.WrongWarehouseID] = struct{}{} out = append(out, row.WrongWarehouseID) } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func uniqueSurvivorIDs(rows []planRow) []uint { seen := make(map[uint]struct{}) out := make([]uint, 0) for _, row := range rows { if _, ok := seen[row.SurvivorPWID]; ok { continue } seen[row.SurvivorPWID] = struct{}{} out = append(out, row.SurvivorPWID) } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func uniqueAbsorbedIDs(rows []planRow) []uint { seen := make(map[uint]struct{}) out := make([]uint, 0) for _, row := range rows { if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { continue } if _, ok := seen[*row.AbsorbedPWID]; ok { continue } seen[*row.AbsorbedPWID] = struct{}{} out = append(out, *row.AbsorbedPWID) } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func uniqueDuplicateSurvivorIDs(rows []planRow) []uint { seen := make(map[uint]struct{}) out := make([]uint, 0) for _, row := range rows { if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { continue } if _, ok := seen[row.SurvivorPWID]; ok { continue } seen[row.SurvivorPWID] = struct{}{} out = append(out, row.SurvivorPWID) } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func andClause(filters []string) string { if len(filters) == 0 { return "" } return " AND " + strings.Join(filters, " AND ") } func displayOptionalUint(value *uint) string { if value == nil || *value == 0 { return "-" } return fmt.Sprintf("%d", *value) } func displayOptionalFloat(value *float64) string { if value == nil { return "-" } return fmt.Sprintf("%.3f", *value) }