diff --git a/cmd/consolidate-kandang-to-farm-stocks/main.go b/cmd/consolidate-kandang-to-farm-stocks/main.go new file mode 100644 index 00000000..8b86f29e --- /dev/null +++ b/cmd/consolidate-kandang-to-farm-stocks/main.go @@ -0,0 +1,1034 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "regexp" + "sort" + "strings" + "text/tabwriter" + "time" + + commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" + "gitlab.com/mbugroup/lti-api.git/internal/config" + "gitlab.com/mbugroup/lti-api.git/internal/database" + entity "gitlab.com/mbugroup/lti-api.git/internal/entities" + "gorm.io/gorm" +) + +const ( + outputTable = "table" + outputJSON = "json" +) + +type options struct { + Apply bool + Output string + AreaName string + KandangLocationName string + DBSSLMode string + DeleteKandangWarehouses bool +} + +type consolidateRow struct { + AreaName string `gorm:"column:area_name" json:"area_name"` + KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"` + KandangID uint `gorm:"column:kandang_id" json:"kandang_id"` + KandangName string `gorm:"column:kandang_name" json:"kandang_name"` + KandangWarehouseID uint `gorm:"column:kandang_warehouse_id" json:"kandang_warehouse_id"` + KandangWarehouseName string `gorm:"column:kandang_warehouse_name" json:"kandang_warehouse_name"` + FarmWarehouseID uint `gorm:"column:farm_warehouse_id" json:"farm_warehouse_id"` + FarmWarehouseName string `gorm:"column:farm_warehouse_name" json:"farm_warehouse_name"` + ProductID uint `gorm:"column:product_id" json:"product_id"` + ProductName string `gorm:"column:product_name" json:"product_name"` + ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id" json:"project_flock_kandang_id,omitempty"` + SurvivorPWID uint `gorm:"column:survivor_pw_id" json:"survivor_pw_id"` + SurvivorCurrentQty float64 `gorm:"column:survivor_current_qty" json:"survivor_current_qty"` + AbsorbedPWID *uint `gorm:"column:absorbed_pw_id" json:"absorbed_pw_id,omitempty"` + AbsorbedCurrentQty *float64 `gorm:"column:absorbed_current_qty" json:"absorbed_current_qty,omitempty"` +} + +type pwMove struct { + FromID uint + ToID uint +} + +type warehouseMove struct { + KandangWarehouseID uint + FarmWarehouseID uint +} + +type consolidateSummary struct { + PlanRows int `json:"plan_rows"` + KandangWarehouses int `json:"kandang_warehouses"` + SurvivorProductWarehouses int `json:"survivor_product_warehouses"` + AbsorbedProductWarehouses int `json:"absorbed_product_warehouses"` + NeedsReflowProductWarehouses int `json:"needs_reflow_product_warehouses"` + UpdatedPWRefs map[string]int64 `json:"updated_pw_refs,omitempty"` + UpdatedWarehouseRefs map[string]int64 `json:"updated_warehouse_refs,omitempty"` + DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"` + SoftDeletedWarehouses int64 `json:"soft_deleted_warehouses,omitempty"` +} + +type reference struct { + Table string + Column string +} + +type referencePlan struct { + ProductWarehouseRefs []reference + WarehouseRefs []reference + BlockedWarehouseRefs []reference +} + +var sqlIdentifierPattern = regexp.MustCompile(`^[a-z_][a-z0-9_]*$`) + +var extraProductWarehouseRefs = []reference{ + {Table: "fifo_stock_v2_operation_log", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, + {Table: "recording_depletions", Column: "source_product_warehouse_id"}, +} + +var allowedWarehouseRefs = []reference{ + {Table: "product_warehouses", Column: "warehouse_id"}, + {Table: "purchase_items", Column: "warehouse_id"}, +} + +var blockedWarehouseRefs = []reference{ + {Table: "stock_transfers", Column: "from_warehouse_id"}, + {Table: "stock_transfers", Column: "to_warehouse_id"}, +} + +func main() { + opts, err := parseFlags() + if err != nil { + log.Fatalf("invalid flags: %v", err) + } + + if opts.DBSSLMode != "" { + config.DBSSLMode = opts.DBSSLMode + } + + ctx := context.Background() + db := database.Connect(config.DBHost, config.DBName) + + rows, err := loadConsolidateRows(ctx, db, opts) + if err != nil { + log.Fatalf("failed to load consolidate rows: %v", err) + } + + if len(rows) == 0 { + fmt.Println("No kandang-level PAKAN/OVK stocks found to consolidate") + return + } + + refs, err := buildReferencePlan(ctx, db) + if err != nil { + log.Fatalf("failed to inspect warehouse references: %v", err) + } + + if err := runPrechecks(ctx, db, rows, refs); err != nil { + log.Fatalf("precheck failed: %v", err) + } + + summary := summarizeConsolidate(rows) + if !opts.Apply { + renderConsolidate(opts.Output, rows, summary) + return + } + + applied, err := applyConsolidate(ctx, db, rows, opts, refs) + if err != nil { + log.Fatalf("apply failed: %v", err) + } + renderConsolidate(opts.Output, rows, applied) +} + +func parseFlags() (*options, error) { + var opts options + flag.BoolVar(&opts.Apply, "apply", false, "Apply the consolidation. If false, run as dry-run") + flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json") + flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter") + flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter") + flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require") + flag.BoolVar(&opts.DeleteKandangWarehouses, "delete-kandang-warehouses", true, "Soft delete kandang warehouse rows after all stocks have been moved") + flag.Parse() + + opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) + opts.AreaName = strings.TrimSpace(opts.AreaName) + opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName) + opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) + + if opts.Output == "" { + opts.Output = outputTable + } + if opts.Output != outputTable && opts.Output != outputJSON { + return nil, fmt.Errorf("unsupported --output=%s", opts.Output) + } + + return &opts, nil +} + +func loadConsolidateRows(ctx context.Context, db *gorm.DB, opts *options) ([]consolidateRow, error) { + filters := make([]string, 0, 2) + args := make([]any, 0, 2) + if opts.AreaName != "" { + filters = append(filters, "a.name = ?") + args = append(args, opts.AreaName) + } + if opts.KandangLocationName != "" { + filters = append(filters, "kl.name = ?") + args = append(args, opts.KandangLocationName) + } + + query := fmt.Sprintf(` +SELECT + a.name AS area_name, + kl.name AS kandang_location_name, + k.id AS kandang_id, + k.name AS kandang_name, + w.id AS kandang_warehouse_id, + w.name AS kandang_warehouse_name, + fw.id AS farm_warehouse_id, + fw.name AS farm_warehouse_name, + p.id AS product_id, + p.name AS product_name, + wp.project_flock_kandang_id, + wp.id AS survivor_pw_id, + COALESCE(wp.qty, 0) AS survivor_current_qty, + fpw.id AS absorbed_pw_id, + fpw.qty AS absorbed_current_qty +FROM warehouses w +JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL +JOIN locations kl + ON kl.id = k.location_id +JOIN areas a + ON a.id = kl.area_id +JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.location_id = k.location_id + AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' + AND w2.deleted_at IS NULL + ORDER BY w2.id ASC + LIMIT 1 +) AS fw ON TRUE +JOIN product_warehouses wp + ON wp.warehouse_id = w.id +JOIN products p + ON p.id = wp.product_id + AND UPPER(COALESCE(p.type, '')) IN ('PAKAN', 'OVK') +LEFT JOIN product_warehouses fpw + ON fpw.product_id = wp.product_id + AND fpw.warehouse_id = fw.id + AND fpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id +WHERE w.deleted_at IS NULL + AND w.kandang_id IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM stock_allocations sa + WHERE sa.stockable_type = 'PURCHASE_ITEMS' + AND sa.stockable_id IN ( + SELECT pi.id + FROM purchase_items pi + WHERE pi.warehouse_id = w.id + AND pi.product_id = p.id + ) + AND sa.status = 'ACTIVE' + AND sa.allocation_purpose = 'CONSUME' + AND sa.deleted_at IS NULL + ) + %s +ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC +`, andClause(filters)) + + rows := make([]consolidateRow, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + return rows, nil +} + +func buildReferencePlan(ctx context.Context, db *gorm.DB) (*referencePlan, error) { + productWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "product_warehouses") + if err != nil { + return nil, err + } + productWarehouseRefs = mergeReferences(productWarehouseRefs, extraProductWarehouseRefs) + if err := ensureHandledColumns(ctx, db, "%product_warehouse_id%", productWarehouseRefs, nil); err != nil { + return nil, err + } + + discoveredWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "warehouses") + if err != nil { + return nil, err + } + + allowedWarehouseSet := referenceSet(allowedWarehouseRefs) + blockedWarehouseSet := referenceSet(blockedWarehouseRefs) + + warehouseRefs := make([]reference, 0, len(discoveredWarehouseRefs)) + for _, ref := range discoveredWarehouseRefs { + key := ref.key() + if _, ok := allowedWarehouseSet[key]; ok { + warehouseRefs = append(warehouseRefs, ref) + continue + } + if _, ok := blockedWarehouseSet[key]; ok { + continue + } + return nil, fmt.Errorf("unsupported warehouse foreign-key reference discovered: %s", key) + } + if err := ensureHandledColumns(ctx, db, "%warehouse_id%", mergeReferences(warehouseRefs, blockedWarehouseRefs), []string{"%product_warehouse_id%"}); err != nil { + return nil, err + } + + return &referencePlan{ + ProductWarehouseRefs: productWarehouseRefs, + WarehouseRefs: warehouseRefs, + BlockedWarehouseRefs: append([]reference(nil), blockedWarehouseRefs...), + }, nil +} + +func runPrechecks(ctx context.Context, db *gorm.DB, rows []consolidateRow, refs *referencePlan) error { + if err := ensureNoBlockedWarehouseRefsConsolidate(ctx, db, rows, refs.BlockedWarehouseRefs); err != nil { + return err + } + if err := ensureNoPurchaseItemWarehouseConflictsConsolidate(ctx, db, rows); err != nil { + return err + } + if err := ensureNoInFlightFifoArtifactsConsolidate(ctx, db, rows); err != nil { + return err + } + return nil +} + +func discoverSingleColumnFKReferences(ctx context.Context, db *gorm.DB, referencedTable string) ([]reference, error) { + type countRow struct { + Count int64 `gorm:"column:cnt"` + } + + var multiColumn countRow + if err := db.WithContext(ctx).Raw(` +SELECT COUNT(*) AS cnt +FROM pg_constraint c +JOIN pg_class ref ON ref.oid = c.confrelid +JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace +WHERE c.contype = 'f' + AND ref_ns.nspname = 'public' + AND ref.relname = ? + AND COALESCE(array_length(c.conkey, 1), 0) <> 1 +`, referencedTable).Scan(&multiColumn).Error; err != nil { + return nil, err + } + if multiColumn.Count > 0 { + return nil, fmt.Errorf("table %s has %d multi-column foreign keys; command only supports single-column rewrites", referencedTable, multiColumn.Count) + } + + type row struct { + Table string `gorm:"column:table_name"` + Column string `gorm:"column:column_name"` + } + selected := make([]row, 0) + if err := db.WithContext(ctx).Raw(` +SELECT + src.relname AS table_name, + src_att.attname AS column_name +FROM pg_constraint c +JOIN pg_class src ON src.oid = c.conrelid +JOIN pg_namespace src_ns ON src_ns.oid = src.relnamespace +JOIN pg_class ref ON ref.oid = c.confrelid +JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace +JOIN pg_attribute src_att ON src_att.attrelid = src.oid AND src_att.attnum = c.conkey[1] +WHERE c.contype = 'f' + AND src_ns.nspname = 'public' + AND ref_ns.nspname = 'public' + AND ref.relname = ? + AND COALESCE(array_length(c.conkey, 1), 0) = 1 +ORDER BY src.relname ASC, src_att.attname ASC +`, referencedTable).Scan(&selected).Error; err != nil { + return nil, err + } + + refs := make([]reference, 0, len(selected)) + for _, item := range selected { + ref := reference{ + Table: strings.TrimSpace(item.Table), + Column: strings.TrimSpace(item.Column), + } + if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { + return nil, fmt.Errorf("unsafe identifier discovered while inspecting %s references: %s.%s", referencedTable, ref.Table, ref.Column) + } + refs = append(refs, ref) + } + return refs, nil +} + +func ensureHandledColumns( + ctx context.Context, + db *gorm.DB, + columnPattern string, + handled []reference, + excludePatterns []string, +) error { + type row struct { + Table string `gorm:"column:table_name"` + Column string `gorm:"column:column_name"` + } + + query := ` +SELECT table_name, column_name +FROM information_schema.columns +WHERE table_schema = 'public' + AND column_name LIKE ? +` + args := []any{columnPattern} + for _, pattern := range excludePatterns { + query += " AND column_name NOT LIKE ?\n" + args = append(args, pattern) + } + query += "ORDER BY table_name ASC, column_name ASC" + + rows := make([]row, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return err + } + + handledSet := referenceSet(handled) + for _, item := range rows { + ref := reference{ + Table: strings.TrimSpace(item.Table), + Column: strings.TrimSpace(item.Column), + } + if _, ok := handledSet[ref.key()]; ok { + continue + } + return fmt.Errorf("unhandled warehouse-related column discovered: %s", ref.key()) + } + return nil +} + +func ensureNoBlockedWarehouseRefsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow, blocked []reference) error { + kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows) + if len(kandangWarehouseIDs) == 0 { + return nil + } + + for _, ref := range blocked { + var count int64 + if err := db.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), kandangWarehouseIDs). + Count(&count).Error; err != nil { + return err + } + if count > 0 { + return fmt.Errorf("found %d rows in %s.%s referencing the kandang warehouse ids; aborting", count, ref.Table, ref.Column) + } + } + return nil +} + +func ensureNoPurchaseItemWarehouseConflictsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow) error { + kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows) + if len(kandangWarehouseIDs) == 0 { + return nil + } + + type countRow struct { + Count int64 `gorm:"column:cnt"` + } + + var row countRow + if err := db.WithContext(ctx).Raw(` +WITH kandang_pairs AS ( + SELECT + pi.id AS kandang_purchase_item_id, + pi.purchase_id, + pi.product_id, + pi.warehouse_id AS kandang_warehouse_id, + fw.id AS farm_warehouse_id + FROM purchase_items pi + JOIN warehouses w + ON w.id = pi.warehouse_id + AND w.deleted_at IS NULL + JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL + JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.location_id = k.location_id + AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' + AND w2.deleted_at IS NULL + ORDER BY w2.id ASC + LIMIT 1 + ) AS fw ON TRUE + WHERE pi.warehouse_id IN ? + AND w.kandang_id IS NOT NULL +) +SELECT COUNT(*) AS cnt +FROM kandang_pairs kp +JOIN purchase_items target + ON target.purchase_id = kp.purchase_id + AND target.product_id = kp.product_id + AND target.warehouse_id = kp.farm_warehouse_id +`, kandangWarehouseIDs).Scan(&row).Error; err != nil { + return err + } + if row.Count > 0 { + return fmt.Errorf("found %d purchase_item uniqueness collisions on (purchase_id, product_id, warehouse_id); aborting", row.Count) + } + return nil +} + +func ensureNoInFlightFifoArtifactsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow) error { + affectedPWIDs := mergeUintSlices(uniqueSurvivorIDsConsolidate(rows), uniqueAbsorbedIDsConsolidate(rows)) + if len(affectedPWIDs) == 0 { + return nil + } + + for _, ref := range []reference{ + {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, + } { + var count int64 + if err := db.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), affectedPWIDs). + Count(&count).Error; err != nil { + return err + } + if count > 0 { + return fmt.Errorf("found %d in-flight FIFO rows in %s for affected product warehouses; aborting", count, ref.Table) + } + } + return nil +} + +func applyConsolidate( + ctx context.Context, + db *gorm.DB, + rows []consolidateRow, + opts *options, + refs *referencePlan, +) (consolidateSummary, error) { + summary := summarizeConsolidate(rows) + summary.UpdatedPWRefs = make(map[string]int64) + summary.UpdatedWarehouseRefs = make(map[string]int64) + + pwMoves := uniquePWMovesConsolidate(rows) + warehouseMoves := uniqueWarehouseMovesConsolidate(rows) + survivorWarehouseMap := buildSurvivorWarehouseMapConsolidate(rows) + duplicateSurvivors := uniqueDuplicateSurvivorIDsConsolidate(rows) + + fifoSvc := commonSvc.NewFifoStockV2Service(db, nil) + now := time.Now().UTC() + + err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + for _, move := range pwMoves { + for _, ref := range refs.ProductWarehouseRefs { + res := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.FromID). + Update(ref.Column, move.ToID) + if res.Error != nil { + return fmt.Errorf("move %s from pw %d to %d: %w", ref.Table+"."+ref.Column, move.FromID, move.ToID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedPWRefs[ref.Table+"."+ref.Column] += res.RowsAffected + } + } + } + + if len(pwMoves) > 0 { + fromIDs := make([]uint, 0, len(pwMoves)) + for _, move := range pwMoves { + fromIDs = append(fromIDs, move.FromID) + } + res := tx.WithContext(ctx). + Where("id IN ?", fromIDs). + Delete(&entity.ProductWarehouse{}) + if res.Error != nil { + return fmt.Errorf("delete absorbed product warehouses: %w", res.Error) + } + summary.DeletedProductWarehouses = res.RowsAffected + } + + for survivorID, farmWarehouseID := range survivorWarehouseMap { + res := tx.WithContext(ctx). + Table("product_warehouses"). + Where("id = ?", survivorID). + Update("warehouse_id", farmWarehouseID) + if res.Error != nil { + return fmt.Errorf("update survivor product_warehouse %d -> warehouse %d: %w", survivorID, farmWarehouseID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedWarehouseRefs["product_warehouses.warehouse_id"] += res.RowsAffected + } + } + + for _, move := range warehouseMoves { + for _, ref := range refs.WarehouseRefs { + res := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.KandangWarehouseID). + Update(ref.Column, move.FarmWarehouseID) + if res.Error != nil { + return fmt.Errorf("update %s from warehouse %d to %d: %w", ref.Table+"."+ref.Column, move.KandangWarehouseID, move.FarmWarehouseID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedWarehouseRefs[ref.Table+"."+ref.Column] += res.RowsAffected + } + } + } + + if len(duplicateSurvivors) > 0 { + if err := recomputeStockLogsConsolidate(ctx, tx, duplicateSurvivors); err != nil { + return err + } + } + + for _, survivorID := range duplicateSurvivors { + if err := reflowAndRecalculateProductWarehouse(ctx, fifoSvc, tx, survivorID); err != nil { + return err + } + } + + if opts.DeleteKandangWarehouses { + kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows) + if len(kandangWarehouseIDs) > 0 { + res := tx.WithContext(ctx). + Table("warehouses"). + Where("id IN ? AND deleted_at IS NULL", kandangWarehouseIDs). + Updates(map[string]any{ + "deleted_at": now, + "updated_at": now, + }) + if res.Error != nil { + return fmt.Errorf("soft delete kandang warehouses: %w", res.Error) + } + summary.SoftDeletedWarehouses = res.RowsAffected + } + } + + if err := verifyNoKandangWarehouseRefsRemain(ctx, tx, rows, pwMoves, refs); err != nil { + return err + } + + return nil + }) + if err != nil { + return consolidateSummary{}, err + } + + return summary, nil +} + +func recomputeStockLogsConsolidate(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error { + if len(productWarehouseIDs) == 0 { + return nil + } + + query := ` +WITH recalculated AS ( + SELECT + id, + SUM(COALESCE(increase, 0) - COALESCE(decrease, 0)) + OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock + FROM stock_logs + WHERE product_warehouse_id IN ? +) +UPDATE stock_logs sl +SET stock = recalculated.running_stock +FROM recalculated +WHERE sl.id = recalculated.id +` + return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error +} + +func verifyNoKandangWarehouseRefsRemain( + ctx context.Context, + tx *gorm.DB, + rows []consolidateRow, + pwMoves []pwMove, + refs *referencePlan, +) error { + kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows) + if len(kandangWarehouseIDs) > 0 { + for _, ref := range refs.WarehouseRefs { + var remaining int64 + if err := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), kandangWarehouseIDs). + Count(&remaining).Error; err != nil { + return err + } + if remaining > 0 { + return fmt.Errorf("verification failed: %d rows still point to kandang warehouses via %s.%s", remaining, ref.Table, ref.Column) + } + } + } + + if len(pwMoves) > 0 { + fromIDs := make([]uint, 0, len(pwMoves)) + for _, move := range pwMoves { + fromIDs = append(fromIDs, move.FromID) + } + + var remaining int64 + for _, ref := range refs.ProductWarehouseRefs { + if err := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), fromIDs). + Count(&remaining).Error; err != nil { + return err + } + if remaining > 0 { + return fmt.Errorf("verification failed: %d rows still point to absorbed product_warehouses via %s.%s", remaining, ref.Table, ref.Column) + } + } + } + + return nil +} + +func reflowAndRecalculateProductWarehouse( + ctx context.Context, + fifoSvc commonSvc.FifoStockV2Service, + tx *gorm.DB, + productWarehouseID uint, +) error { + flagGroupCode, err := resolveFlagGroupByProductWarehouse(ctx, tx, productWarehouseID) + if err != nil { + return fmt.Errorf("resolve flag group for product_warehouse %d: %w", productWarehouseID, err) + } + + if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ + FlagGroupCode: flagGroupCode, + ProductWarehouseID: productWarehouseID, + Tx: tx, + }); err != nil { + return fmt.Errorf("reflow product_warehouse %d: %w", productWarehouseID, err) + } + + if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{ + ProductWarehouseIDs: []uint{productWarehouseID}, + FlagGroupCodes: []string{flagGroupCode}, + FixDrift: true, + Tx: tx, + }); err != nil { + return fmt.Errorf("recalculate product_warehouse %d: %w", productWarehouseID, err) + } + + return nil +} + +func resolveFlagGroupByProductWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) (string, error) { + type row struct { + FlagGroupCode string `gorm:"column:flag_group_code"` + } + + var selected row + err := tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules rr"). + Select("rr.flag_group_code"). + Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). + Where("rr.is_active = TRUE"). + Where("rr.lane = ?", "STOCKABLE"). + Where("rr.function_code = ?", "PURCHASE_IN"). + Where("rr.source_table = ?", "purchase_items"). + Where(` + EXISTS ( + SELECT 1 + FROM product_warehouses pw + JOIN flags f ON f.flagable_id = pw.product_id + JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE + WHERE pw.id = ? + AND f.flagable_type = ? + AND fm.flag_group_code = rr.flag_group_code + ) + `, productWarehouseID, entity.FlagableTypeProduct). + Order("rr.id ASC"). + Limit(1). + Take(&selected).Error + if err != nil { + return "", err + } + + return strings.TrimSpace(selected.FlagGroupCode), nil +} + +func summarizeConsolidate(rows []consolidateRow) consolidateSummary { + summary := consolidateSummary{ + PlanRows: len(rows), + KandangWarehouses: len(uniqueKandangWarehouseIDs(rows)), + SurvivorProductWarehouses: len(uniqueSurvivorIDsConsolidate(rows)), + AbsorbedProductWarehouses: len(uniqueAbsorbedIDsConsolidate(rows)), + NeedsReflowProductWarehouses: len(uniqueDuplicateSurvivorIDsConsolidate(rows)), + } + return summary +} + +func renderConsolidate(mode string, rows []consolidateRow, summary consolidateSummary) { + if mode == outputJSON { + payload := map[string]any{ + "rows": rows, + "summary": summary, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tKANDANG_WAREHOUSE\tFARM_WAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_PW\tSURVIVOR_QTY\tABSORBED_PW\tABSORBED_QTY\tNEEDS_REFLOW") + for _, row := range rows { + fmt.Fprintf( + w, + "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%s\t%s\t%t\n", + row.AreaName, + row.KandangLocationName, + row.KandangName, + row.KandangWarehouseName, + row.FarmWarehouseName, + row.ProductName, + displayOptionalUint(row.ProjectFlockKandangID), + row.SurvivorPWID, + row.SurvivorCurrentQty, + displayOptionalUint(row.AbsorbedPWID), + displayOptionalFloat(row.AbsorbedCurrentQty), + row.AbsorbedPWID != nil, + ) + } + _ = w.Flush() + + fmt.Printf( + "\nSummary: plan_rows=%d kandang_whs=%d survivor_pws=%d absorbed_pws=%d needs_reflow_pws=%d deleted_product_warehouses=%d soft_deleted_warehouses=%d\n", + summary.PlanRows, + summary.KandangWarehouses, + summary.SurvivorProductWarehouses, + summary.AbsorbedProductWarehouses, + summary.NeedsReflowProductWarehouses, + summary.DeletedProductWarehouses, + summary.SoftDeletedWarehouses, + ) + + if len(summary.UpdatedPWRefs) > 0 { + fmt.Println("Updated product_warehouse refs:") + printSortedCounts(summary.UpdatedPWRefs) + } + if len(summary.UpdatedWarehouseRefs) > 0 { + fmt.Println("Updated warehouse refs:") + printSortedCounts(summary.UpdatedWarehouseRefs) + } +} + +func printSortedCounts(values map[string]int64) { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + fmt.Printf(" %s=%d\n", key, values[key]) + } +} + +func (r reference) key() string { + return r.Table + "." + r.Column +} + +func referenceSet(values []reference) map[string]struct{} { + out := make(map[string]struct{}, len(values)) + for _, value := range values { + out[value.key()] = struct{}{} + } + return out +} + +func mergeReferences(groups ...[]reference) []reference { + seen := make(map[string]struct{}) + out := make([]reference, 0) + for _, group := range groups { + for _, ref := range group { + if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { + continue + } + if _, ok := seen[ref.key()]; ok { + continue + } + seen[ref.key()] = struct{}{} + out = append(out, ref) + } + } + sort.Slice(out, func(i, j int) bool { + if out[i].Table == out[j].Table { + return out[i].Column < out[j].Column + } + return out[i].Table < out[j].Table + }) + return out +} + +func mergeUintSlices(groups ...[]uint) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, group := range groups { + for _, value := range group { + if value == 0 { + continue + } + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + out = append(out, value) + } + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func isSafeIdentifier(value string) bool { + return sqlIdentifierPattern.MatchString(strings.TrimSpace(value)) +} + +func quotedIdentifier(value string) string { + if !isSafeIdentifier(value) { + panic(fmt.Sprintf("unsafe SQL identifier: %s", value)) + } + return `"` + value + `"` +} + +func uniquePWMovesConsolidate(rows []consolidateRow) []pwMove { + seen := make(map[string]struct{}) + out := make([]pwMove, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 || *row.AbsorbedPWID == row.SurvivorPWID { + continue + } + key := fmt.Sprintf("%d:%d", *row.AbsorbedPWID, row.SurvivorPWID) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, pwMove{FromID: *row.AbsorbedPWID, ToID: row.SurvivorPWID}) + } + return out +} + +func uniqueWarehouseMovesConsolidate(rows []consolidateRow) []warehouseMove { + seen := make(map[uint]uint) + out := make([]warehouseMove, 0) + for _, row := range rows { + if _, ok := seen[row.KandangWarehouseID]; ok { + continue + } + seen[row.KandangWarehouseID] = row.FarmWarehouseID + out = append(out, warehouseMove{ + KandangWarehouseID: row.KandangWarehouseID, + FarmWarehouseID: row.FarmWarehouseID, + }) + } + return out +} + +func buildSurvivorWarehouseMapConsolidate(rows []consolidateRow) map[uint]uint { + out := make(map[uint]uint) + for _, row := range rows { + out[row.SurvivorPWID] = row.FarmWarehouseID + } + return out +} + +func uniqueKandangWarehouseIDs(rows []consolidateRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if _, ok := seen[row.KandangWarehouseID]; ok { + continue + } + seen[row.KandangWarehouseID] = struct{}{} + out = append(out, row.KandangWarehouseID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueSurvivorIDsConsolidate(rows []consolidateRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if _, ok := seen[row.SurvivorPWID]; ok { + continue + } + seen[row.SurvivorPWID] = struct{}{} + out = append(out, row.SurvivorPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueAbsorbedIDsConsolidate(rows []consolidateRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { + continue + } + if _, ok := seen[*row.AbsorbedPWID]; ok { + continue + } + seen[*row.AbsorbedPWID] = struct{}{} + out = append(out, *row.AbsorbedPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueDuplicateSurvivorIDsConsolidate(rows []consolidateRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { + continue + } + if _, ok := seen[row.SurvivorPWID]; ok { + continue + } + seen[row.SurvivorPWID] = struct{}{} + out = append(out, row.SurvivorPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func andClause(filters []string) string { + if len(filters) == 0 { + return "" + } + return " AND " + strings.Join(filters, " AND ") +} + +func displayOptionalUint(value *uint) string { + if value == nil || *value == 0 { + return "-" + } + return fmt.Sprintf("%d", *value) +} + +func displayOptionalFloat(value *float64) string { + if value == nil { + return "-" + } + return fmt.Sprintf("%.3f", *value) +} diff --git a/cmd/find-wrong-warehouse-records/main.go b/cmd/find-wrong-warehouse-records/main.go new file mode 100644 index 00000000..311e8778 --- /dev/null +++ b/cmd/find-wrong-warehouse-records/main.go @@ -0,0 +1,466 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "strings" + "text/tabwriter" + + "gitlab.com/mbugroup/lti-api.git/internal/config" + "gitlab.com/mbugroup/lti-api.git/internal/database" + "gorm.io/gorm" +) + +const ( + outputModeTable = "table" + outputModeJSON = "json" + + reportUsage = "usage" + reportWarehouses = "warehouses" +) + +type options struct { + Output string + Report string + AreaName string + KandangLocationName string + WrongWarehouseName string + CorrectWarehouseName string + UsableType string + DBSSLMode string +} + +type usageRow struct { + UsableType string `gorm:"column:usable_type" json:"usable_type"` + UsableID uint `gorm:"column:usable_id" json:"usable_id"` + AreaName string `gorm:"column:area_name" json:"area_name"` + LokasiName string `gorm:"column:lokasi_name" json:"lokasi_name"` + KandangName string `gorm:"column:kandang_name" json:"kandang_name"` + WrongWarehouseID uint `gorm:"column:wrong_warehouse_id" json:"wrong_warehouse_id"` + WrongWarehouseName string `gorm:"column:wrong_warehouse_name" json:"wrong_warehouse_name"` + CorrectWarehouseID uint `gorm:"column:correct_warehouse_id" json:"correct_warehouse_id"` + CorrectWarehouseName string `gorm:"column:correct_warehouse_name" json:"correct_warehouse_name"` + ProductNames string `gorm:"column:product_names" json:"product_names"` + SourcePurchaseNumbers string `gorm:"column:source_purchase_numbers" json:"source_purchase_numbers"` + SourcePurchaseItemIDs string `gorm:"column:source_purchase_item_ids" json:"source_purchase_item_ids"` + QtyFromWrongStock float64 `gorm:"column:qty_from_wrong_stock" json:"qty_from_wrong_stock"` + RecordingID *uint `gorm:"column:recording_id" json:"recording_id,omitempty"` + RecordingDate *string `gorm:"column:recording_date" json:"recording_date,omitempty"` + SoNumber *string `gorm:"column:so_number" json:"so_number,omitempty"` + SoDate *string `gorm:"column:so_date" json:"so_date,omitempty"` +} + +type warehouseMismatchRow struct { + AreaName string `gorm:"column:area_name" json:"area_name"` + WrongLocationName string `gorm:"column:wrong_location_name" json:"wrong_location_name"` + KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"` + KandangID uint `gorm:"column:kandang_id" json:"kandang_id"` + KandangName string `gorm:"column:kandang_name" json:"kandang_name"` + WrongWarehouseID uint `gorm:"column:wrong_warehouse_id" json:"wrong_warehouse_id"` + WrongWarehouseName string `gorm:"column:wrong_warehouse_name" json:"wrong_warehouse_name"` + WrongWarehouseType string `gorm:"column:wrong_warehouse_type" json:"wrong_warehouse_type"` + CorrectWarehouseID uint `gorm:"column:correct_warehouse_id" json:"correct_warehouse_id"` + CorrectWarehouseName string `gorm:"column:correct_warehouse_name" json:"correct_warehouse_name"` +} + +type summary struct { + Rows int `json:"rows"` + TotalQty float64 `json:"total_qty,omitempty"` +} + +func main() { + opts, err := parseFlags() + if err != nil { + log.Fatalf("invalid flags: %v", err) + } + + ctx := context.Background() + if opts.DBSSLMode != "" { + config.DBSSLMode = opts.DBSSLMode + } + db := database.Connect(config.DBHost, config.DBName) + + switch opts.Report { + case reportUsage: + rows, err := loadUsageRows(ctx, db, opts) + if err != nil { + log.Fatalf("failed loading usage rows: %v", err) + } + renderUsageReport(opts.Output, rows) + case reportWarehouses: + rows, err := loadWarehouseMismatchRows(ctx, db, opts) + if err != nil { + log.Fatalf("failed loading warehouse mismatch rows: %v", err) + } + renderWarehouseReport(opts.Output, rows) + default: + log.Fatalf("unsupported --report=%s", opts.Report) + } +} + +func parseFlags() (*options, error) { + var opts options + flag.StringVar(&opts.Output, "output", outputModeTable, "Output format: table or json") + flag.StringVar(&opts.Report, "report", reportUsage, "Report type: usage or warehouses") + flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter") + flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter") + flag.StringVar(&opts.WrongWarehouseName, "wrong-warehouse-name", "", "Optional exact wrong warehouse name filter") + flag.StringVar(&opts.CorrectWarehouseName, "correct-warehouse-name", "", "Optional exact correct warehouse name filter") + flag.StringVar(&opts.UsableType, "usable-type", "", "Optional usage type filter: RECORDING_STOCK or MARKETING_DELIVERY") + flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require") + flag.Parse() + + opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) + opts.Report = strings.ToLower(strings.TrimSpace(opts.Report)) + opts.AreaName = strings.TrimSpace(opts.AreaName) + opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName) + opts.WrongWarehouseName = strings.TrimSpace(opts.WrongWarehouseName) + opts.CorrectWarehouseName = strings.TrimSpace(opts.CorrectWarehouseName) + opts.UsableType = strings.ToUpper(strings.TrimSpace(opts.UsableType)) + opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) + + if opts.Output == "" { + opts.Output = outputModeTable + } + if opts.Output != outputModeTable && opts.Output != outputModeJSON { + return nil, fmt.Errorf("unsupported --output=%s", opts.Output) + } + if opts.Report == "" { + opts.Report = reportUsage + } + if opts.Report != reportUsage && opts.Report != reportWarehouses { + return nil, fmt.Errorf("unsupported --report=%s", opts.Report) + } + if opts.UsableType != "" && opts.UsableType != "RECORDING_STOCK" && opts.UsableType != "MARKETING_DELIVERY" { + return nil, fmt.Errorf("unsupported --usable-type=%s", opts.UsableType) + } + + return &opts, nil +} + +func loadUsageRows(ctx context.Context, db *gorm.DB, opts *options) ([]usageRow, error) { + warehouseFilters, warehouseArgs := buildWarehouseFilters(opts) + + usageFilters := make([]string, 0, 1) + usageArgs := make([]any, 0, 1) + if opts.UsableType != "" { + usageFilters = append(usageFilters, "sa.usable_type = ?") + usageArgs = append(usageArgs, opts.UsableType) + } + + args := append([]any{}, warehouseArgs...) + args = append(args, usageArgs...) + + query := fmt.Sprintf(` +WITH wrong_warehouses AS ( + SELECT + w.id AS wrong_warehouse_id, + w.name AS wrong_warehouse_name, + k.id AS kandang_id, + k.name AS kandang_name, + a.name AS area_name, + kl.name AS kandang_location_name, + correct_w.id AS correct_warehouse_id, + correct_w.name AS correct_warehouse_name + FROM warehouses w + JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL + JOIN locations kl + ON kl.id = k.location_id + JOIN areas a + ON a.id = kl.area_id + JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.kandang_id = w.kandang_id + AND w2.location_id = k.location_id + AND w2.deleted_at IS NULL + AND w2.id <> w.id + ORDER BY w2.id ASC + LIMIT 1 + ) AS correct_w ON TRUE + WHERE w.deleted_at IS NULL + AND w.kandang_id IS NOT NULL + AND w.location_id IS DISTINCT FROM k.location_id + %s +), +wrong_allocs AS ( + SELECT + sa.usable_type, + sa.usable_id, + sa.qty, + pi.id AS purchase_item_id, + COALESCE(p.po_number, p.pr_number) AS purchase_number, + pr.name AS product_name, + ww.area_name, + ww.kandang_location_name, + ww.kandang_name, + ww.wrong_warehouse_id, + ww.wrong_warehouse_name, + ww.correct_warehouse_id, + ww.correct_warehouse_name + FROM stock_allocations sa + JOIN purchase_items pi + ON pi.id = sa.stockable_id + JOIN purchases p + ON p.id = pi.purchase_id + AND p.deleted_at IS NULL + JOIN products pr + ON pr.id = pi.product_id + JOIN wrong_warehouses ww + ON ww.wrong_warehouse_id = pi.warehouse_id + WHERE sa.stockable_type = 'PURCHASE_ITEMS' + AND sa.status = 'ACTIVE' + AND sa.allocation_purpose = 'CONSUME' + AND sa.deleted_at IS NULL + %s +) +SELECT + wa.usable_type, + wa.usable_id, + wa.area_name, + wa.kandang_location_name AS lokasi_name, + wa.kandang_name, + wa.wrong_warehouse_id, + wa.wrong_warehouse_name, + wa.correct_warehouse_id, + wa.correct_warehouse_name, + STRING_AGG(DISTINCT wa.product_name, ' | ') AS product_names, + STRING_AGG(DISTINCT wa.purchase_number, ', ') AS source_purchase_numbers, + STRING_AGG(DISTINCT wa.purchase_item_id::text, ', ') AS source_purchase_item_ids, + SUM(wa.qty) AS qty_from_wrong_stock, + rs.recording_id, + TO_CHAR(r.record_datetime::date, 'YYYY-MM-DD') AS recording_date, + m.so_number, + TO_CHAR(m.so_date::date, 'YYYY-MM-DD') AS so_date +FROM wrong_allocs wa +LEFT JOIN recording_stocks rs + ON wa.usable_type = 'RECORDING_STOCK' + AND rs.id = wa.usable_id +LEFT JOIN recordings r + ON r.id = rs.recording_id +LEFT JOIN marketing_delivery_products mdp + ON wa.usable_type = 'MARKETING_DELIVERY' + AND mdp.id = wa.usable_id +LEFT JOIN marketing_products mp + ON mp.id = mdp.marketing_product_id +LEFT JOIN marketings m + ON m.id = mp.marketing_id +GROUP BY + wa.usable_type, + wa.usable_id, + wa.area_name, + wa.kandang_location_name, + wa.kandang_name, + wa.wrong_warehouse_id, + wa.wrong_warehouse_name, + wa.correct_warehouse_id, + wa.correct_warehouse_name, + rs.recording_id, + r.record_datetime, + m.so_number, + m.so_date +ORDER BY + wa.area_name ASC, + wa.kandang_location_name ASC, + wa.wrong_warehouse_name ASC, + wa.usable_type ASC, + wa.usable_id ASC +`, andClause(warehouseFilters), andClause(usageFilters)) + + rows := make([]usageRow, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + + return rows, nil +} + +func loadWarehouseMismatchRows(ctx context.Context, db *gorm.DB, opts *options) ([]warehouseMismatchRow, error) { + warehouseFilters, args := buildWarehouseFilters(opts) + + query := fmt.Sprintf(` +SELECT + a.name AS area_name, + wl.name AS wrong_location_name, + kl.name AS kandang_location_name, + k.id AS kandang_id, + k.name AS kandang_name, + w.id AS wrong_warehouse_id, + w.name AS wrong_warehouse_name, + w.type AS wrong_warehouse_type, + correct_w.id AS correct_warehouse_id, + correct_w.name AS correct_warehouse_name +FROM warehouses w +JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL +JOIN locations kl + ON kl.id = k.location_id +JOIN areas a + ON a.id = kl.area_id +LEFT JOIN locations wl + ON wl.id = w.location_id +JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.kandang_id = w.kandang_id + AND w2.location_id = k.location_id + AND w2.deleted_at IS NULL + AND w2.id <> w.id + ORDER BY w2.id ASC + LIMIT 1 +) AS correct_w ON TRUE +WHERE w.deleted_at IS NULL + AND w.kandang_id IS NOT NULL + AND w.location_id IS DISTINCT FROM k.location_id + %s +ORDER BY a.name ASC, kl.name ASC, k.name ASC, w.id ASC +`, andClause(warehouseFilters)) + + rows := make([]warehouseMismatchRow, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + + return rows, nil +} + +func buildWarehouseFilters(opts *options) ([]string, []any) { + filters := make([]string, 0, 4) + args := make([]any, 0, 4) + + if opts == nil { + return filters, args + } + if opts.AreaName != "" { + filters = append(filters, "a.name = ?") + args = append(args, opts.AreaName) + } + if opts.KandangLocationName != "" { + filters = append(filters, "kl.name = ?") + args = append(args, opts.KandangLocationName) + } + if opts.WrongWarehouseName != "" { + filters = append(filters, "w.name = ?") + args = append(args, opts.WrongWarehouseName) + } + if opts.CorrectWarehouseName != "" { + filters = append(filters, "correct_w.name = ?") + args = append(args, opts.CorrectWarehouseName) + } + + return filters, args +} + +func andClause(filters []string) string { + if len(filters) == 0 { + return "" + } + return " AND " + strings.Join(filters, " AND ") +} + +func renderUsageReport(mode string, rows []usageRow) { + if mode == outputModeJSON { + payload := map[string]any{ + "report": reportUsage, + "rows": rows, + "summary": summarizeUsage(rows), + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "USABLE_TYPE\tUSABLE_ID\tAREA\tLOKASI\tKANDANG\tWRONG_WAREHOUSE\tCORRECT_WAREHOUSE\tPRODUCTS\tQTY_FROM_WRONG_STOCK\tRECORDING_ID\tRECORDING_DATE\tSO_NUMBER\tSO_DATE\tSOURCE_PURCHASES\tSOURCE_PURCHASE_ITEM_IDS") + for _, row := range rows { + fmt.Fprintf( + w, + "%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%.3f\t%s\t%s\t%s\t%s\t%s\t%s\n", + row.UsableType, + row.UsableID, + row.AreaName, + row.LokasiName, + row.KandangName, + row.WrongWarehouseName, + row.CorrectWarehouseName, + row.ProductNames, + row.QtyFromWrongStock, + displayOptionalUint(row.RecordingID), + displayOptionalString(row.RecordingDate), + displayOptionalString(row.SoNumber), + displayOptionalString(row.SoDate), + row.SourcePurchaseNumbers, + row.SourcePurchaseItemIDs, + ) + } + _ = w.Flush() + + s := summarizeUsage(rows) + fmt.Printf("\nSummary: rows=%d total_qty=%.3f\n", s.Rows, s.TotalQty) +} + +func renderWarehouseReport(mode string, rows []warehouseMismatchRow) { + if mode == outputModeJSON { + payload := map[string]any{ + "report": reportWarehouses, + "rows": rows, + "summary": summary{Rows: len(rows)}, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tKANDANG_LOCATION\tKANDANG_ID\tKANDANG\tWRONG_LOCATION\tWRONG_WAREHOUSE_ID\tWRONG_WAREHOUSE\tWRONG_WAREHOUSE_TYPE\tCORRECT_WAREHOUSE_ID\tCORRECT_WAREHOUSE") + for _, row := range rows { + fmt.Fprintf( + w, + "%s\t%s\t%d\t%s\t%s\t%d\t%s\t%s\t%d\t%s\n", + row.AreaName, + row.KandangLocationName, + row.KandangID, + row.KandangName, + row.WrongLocationName, + row.WrongWarehouseID, + row.WrongWarehouseName, + row.WrongWarehouseType, + row.CorrectWarehouseID, + row.CorrectWarehouseName, + ) + } + _ = w.Flush() + + fmt.Printf("\nSummary: rows=%d\n", len(rows)) +} + +func summarizeUsage(rows []usageRow) summary { + out := summary{Rows: len(rows)} + for _, row := range rows { + out.TotalQty += row.QtyFromWrongStock + } + return out +} + +func displayOptionalUint(value *uint) string { + if value == nil || *value == 0 { + return "-" + } + return fmt.Sprintf("%d", *value) +} + +func displayOptionalString(value *string) string { + if value == nil || strings.TrimSpace(*value) == "" { + return "-" + } + return *value +} diff --git a/cmd/repoint-wrong-warehouse-relations/main.go b/cmd/repoint-wrong-warehouse-relations/main.go new file mode 100644 index 00000000..92bca49b --- /dev/null +++ b/cmd/repoint-wrong-warehouse-relations/main.go @@ -0,0 +1,1040 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "regexp" + "sort" + "strings" + "text/tabwriter" + "time" + + commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" + "gitlab.com/mbugroup/lti-api.git/internal/config" + "gitlab.com/mbugroup/lti-api.git/internal/database" + entity "gitlab.com/mbugroup/lti-api.git/internal/entities" + "gorm.io/gorm" +) + +const ( + repointOutputTable = "table" + repointOutputJSON = "json" +) + +type options struct { + Apply bool + Output string + AreaName string + KandangLocationName string + DBSSLMode string + DeleteWrongWarehouses bool +} + +type planRow struct { + AreaName string `gorm:"column:area_name" json:"area_name"` + KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"` + KandangID uint `gorm:"column:kandang_id" json:"kandang_id"` + KandangName string `gorm:"column:kandang_name" json:"kandang_name"` + WrongWarehouseID uint `gorm:"column:wrong_warehouse_id" json:"wrong_warehouse_id"` + WrongWarehouseName string `gorm:"column:wrong_warehouse_name" json:"wrong_warehouse_name"` + CorrectWarehouseID uint `gorm:"column:correct_warehouse_id" json:"correct_warehouse_id"` + CorrectWarehouseName string `gorm:"column:correct_warehouse_name" json:"correct_warehouse_name"` + ProductID uint `gorm:"column:product_id" json:"product_id"` + ProductName string `gorm:"column:product_name" json:"product_name"` + ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id" json:"project_flock_kandang_id,omitempty"` + SurvivorPWID uint `gorm:"column:survivor_pw_id" json:"survivor_pw_id"` + SurvivorCurrentQty float64 `gorm:"column:survivor_current_qty" json:"survivor_current_qty"` + AbsorbedPWID *uint `gorm:"column:absorbed_pw_id" json:"absorbed_pw_id,omitempty"` + AbsorbedCurrentQty *float64 `gorm:"column:absorbed_current_qty" json:"absorbed_current_qty,omitempty"` +} + +type pwMove struct { + FromID uint + ToID uint +} + +type warehouseMove struct { + WrongWarehouseID uint + CorrectWarehouseID uint +} + +type applySummary struct { + TargetScope string `json:"target_scope,omitempty"` + PlanRows int `json:"plan_rows"` + WrongWarehouses int `json:"wrong_warehouses"` + SurvivorProductWarehouses int `json:"survivor_product_warehouses"` + AbsorbedProductWarehouses int `json:"absorbed_product_warehouses"` + NeedsReflowProductWarehouses int `json:"needs_reflow_product_warehouses"` + UpdatedPWRefs map[string]int64 `json:"updated_pw_refs,omitempty"` + UpdatedWarehouseRefs map[string]int64 `json:"updated_warehouse_refs,omitempty"` + DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"` + SoftDeletedWarehouses int64 `json:"soft_deleted_warehouses,omitempty"` +} + +type reference struct { + Table string + Column string +} + +type referencePlan struct { + ProductWarehouseRefs []reference + WarehouseRefs []reference + BlockedWarehouseRefs []reference +} + +var sqlIdentifierPattern = regexp.MustCompile(`^[a-z_][a-z0-9_]*$`) + +var extraProductWarehouseRefs = []reference{ + {Table: "fifo_stock_v2_operation_log", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, + {Table: "recording_depletions", Column: "source_product_warehouse_id"}, +} + +var allowedWarehouseRefs = []reference{ + {Table: "product_warehouses", Column: "warehouse_id"}, + {Table: "purchase_items", Column: "warehouse_id"}, +} + +var blockedWarehouseRefs = []reference{ + {Table: "stock_transfers", Column: "from_warehouse_id"}, + {Table: "stock_transfers", Column: "to_warehouse_id"}, +} + +func main() { + opts, err := parseFlags() + if err != nil { + log.Fatalf("invalid flags: %v", err) + } + + if opts.DBSSLMode != "" { + config.DBSSLMode = opts.DBSSLMode + } + + ctx := context.Background() + db := database.Connect(config.DBHost, config.DBName) + + rows, err := loadPlanRows(ctx, db, opts) + if err != nil { + log.Fatalf("failed to load plan rows: %v", err) + } + + if len(rows) == 0 { + fmt.Println("No misplaced PAKAN/OVK stocks found in wrong-location warehouses") + return + } + + refs, err := buildReferencePlan(ctx, db) + if err != nil { + log.Fatalf("failed to inspect warehouse references: %v", err) + } + + if err := runPrechecks(ctx, db, rows, refs, opts); err != nil { + log.Fatalf("precheck failed: %v", err) + } + + summary := summarizePlan(rows) + if !opts.Apply { + renderPlan(opts.Output, rows, summary) + return + } + + applied, err := applyPlan(ctx, db, rows, opts, refs) + if err != nil { + log.Fatalf("apply failed: %v", err) + } + renderPlan(opts.Output, rows, applied) +} + +func parseFlags() (*options, error) { + var opts options + flag.BoolVar(&opts.Apply, "apply", false, "Apply the migration. If false, run as dry-run") + flag.StringVar(&opts.Output, "output", repointOutputTable, "Output format: table or json") + flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter") + flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter") + flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require") + flag.BoolVar(&opts.DeleteWrongWarehouses, "delete-wrong-warehouses", true, "Soft delete wrong warehouse rows after all references have been moved") + flag.Parse() + + opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) + opts.AreaName = strings.TrimSpace(opts.AreaName) + opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName) + opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) + + if opts.Output == "" { + opts.Output = repointOutputTable + } + if opts.Output != repointOutputTable && opts.Output != repointOutputJSON { + return nil, fmt.Errorf("unsupported --output=%s", opts.Output) + } + + return &opts, nil +} + +func loadPlanRows(ctx context.Context, db *gorm.DB, opts *options) ([]planRow, error) { + filters := make([]string, 0, 2) + args := make([]any, 0, 2) + if opts.AreaName != "" { + filters = append(filters, "a.name = ?") + args = append(args, opts.AreaName) + } + if opts.KandangLocationName != "" { + filters = append(filters, "kl.name = ?") + args = append(args, opts.KandangLocationName) + } + + query := fmt.Sprintf(` +SELECT + a.name AS area_name, + kl.name AS kandang_location_name, + k.id AS kandang_id, + k.name AS kandang_name, + w.id AS wrong_warehouse_id, + w.name AS wrong_warehouse_name, + correct_w.id AS correct_warehouse_id, + correct_w.name AS correct_warehouse_name, + p.id AS product_id, + p.name AS product_name, + wp.project_flock_kandang_id, + wp.id AS survivor_pw_id, + COALESCE(wp.qty, 0) AS survivor_current_qty, + cpw.id AS absorbed_pw_id, + cpw.qty AS absorbed_current_qty +FROM warehouses w +JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL +JOIN locations kl + ON kl.id = k.location_id +JOIN areas a + ON a.id = kl.area_id +JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.location_id = k.location_id + AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' + AND w2.deleted_at IS NULL + ORDER BY w2.id ASC + LIMIT 1 +) AS correct_w ON TRUE +JOIN product_warehouses wp + ON wp.warehouse_id = w.id +JOIN products p + ON p.id = wp.product_id + AND UPPER(COALESCE(p.type, '')) IN ('PAKAN', 'OVK') +LEFT JOIN product_warehouses cpw + ON cpw.product_id = wp.product_id + AND cpw.warehouse_id = correct_w.id + AND cpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id +WHERE w.deleted_at IS NULL + AND w.kandang_id IS NOT NULL + AND w.location_id IS DISTINCT FROM k.location_id + AND NOT EXISTS ( + SELECT 1 + FROM stock_allocations sa + WHERE sa.stockable_type = 'PURCHASE_ITEMS' + AND sa.stockable_id IN ( + SELECT pi.id + FROM purchase_items pi + WHERE pi.warehouse_id = w.id + AND pi.product_id = p.id + ) + AND sa.status = 'ACTIVE' + AND sa.allocation_purpose = 'CONSUME' + AND sa.deleted_at IS NULL + ) + %s +ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC +`, andClause(filters)) + + rows := make([]planRow, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return nil, err + } + return rows, nil +} + + +func buildReferencePlan(ctx context.Context, db *gorm.DB) (*referencePlan, error) { + productWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "product_warehouses") + if err != nil { + return nil, err + } + productWarehouseRefs = mergeReferences(productWarehouseRefs, extraProductWarehouseRefs) + if err := ensureHandledColumns(ctx, db, "%product_warehouse_id%", productWarehouseRefs, nil); err != nil { + return nil, err + } + + discoveredWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "warehouses") + if err != nil { + return nil, err + } + + allowedWarehouseSet := referenceSet(allowedWarehouseRefs) + blockedWarehouseSet := referenceSet(blockedWarehouseRefs) + + warehouseRefs := make([]reference, 0, len(discoveredWarehouseRefs)) + for _, ref := range discoveredWarehouseRefs { + key := ref.key() + if _, ok := allowedWarehouseSet[key]; ok { + warehouseRefs = append(warehouseRefs, ref) + continue + } + if _, ok := blockedWarehouseSet[key]; ok { + continue + } + return nil, fmt.Errorf("unsupported warehouse foreign-key reference discovered: %s", key) + } + if err := ensureHandledColumns(ctx, db, "%warehouse_id%", mergeReferences(warehouseRefs, blockedWarehouseRefs), []string{"%product_warehouse_id%"}); err != nil { + return nil, err + } + + return &referencePlan{ + ProductWarehouseRefs: productWarehouseRefs, + WarehouseRefs: warehouseRefs, + BlockedWarehouseRefs: append([]reference(nil), blockedWarehouseRefs...), + }, nil +} + +func runPrechecks(ctx context.Context, db *gorm.DB, rows []planRow, refs *referencePlan, opts *options) error { + if err := ensureNoBlockedWarehouseRefs(ctx, db, rows, refs.BlockedWarehouseRefs); err != nil { + return err + } + if err := ensureNoPurchaseItemWarehouseConflicts(ctx, db, rows); err != nil { + return err + } + if err := ensureNoInFlightFifoArtifacts(ctx, db, rows); err != nil { + return err + } + return nil +} + +func discoverSingleColumnFKReferences(ctx context.Context, db *gorm.DB, referencedTable string) ([]reference, error) { + type countRow struct { + Count int64 `gorm:"column:cnt"` + } + + var multiColumn countRow + if err := db.WithContext(ctx).Raw(` +SELECT COUNT(*) AS cnt +FROM pg_constraint c +JOIN pg_class ref ON ref.oid = c.confrelid +JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace +WHERE c.contype = 'f' + AND ref_ns.nspname = 'public' + AND ref.relname = ? + AND COALESCE(array_length(c.conkey, 1), 0) <> 1 +`, referencedTable).Scan(&multiColumn).Error; err != nil { + return nil, err + } + if multiColumn.Count > 0 { + return nil, fmt.Errorf("table %s has %d multi-column foreign keys; command only supports single-column rewrites", referencedTable, multiColumn.Count) + } + + type row struct { + Table string `gorm:"column:table_name"` + Column string `gorm:"column:column_name"` + } + selected := make([]row, 0) + if err := db.WithContext(ctx).Raw(` +SELECT + src.relname AS table_name, + src_att.attname AS column_name +FROM pg_constraint c +JOIN pg_class src ON src.oid = c.conrelid +JOIN pg_namespace src_ns ON src_ns.oid = src.relnamespace +JOIN pg_class ref ON ref.oid = c.confrelid +JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace +JOIN pg_attribute src_att ON src_att.attrelid = src.oid AND src_att.attnum = c.conkey[1] +WHERE c.contype = 'f' + AND src_ns.nspname = 'public' + AND ref_ns.nspname = 'public' + AND ref.relname = ? + AND COALESCE(array_length(c.conkey, 1), 0) = 1 +ORDER BY src.relname ASC, src_att.attname ASC +`, referencedTable).Scan(&selected).Error; err != nil { + return nil, err + } + + refs := make([]reference, 0, len(selected)) + for _, item := range selected { + ref := reference{ + Table: strings.TrimSpace(item.Table), + Column: strings.TrimSpace(item.Column), + } + if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { + return nil, fmt.Errorf("unsafe identifier discovered while inspecting %s references: %s.%s", referencedTable, ref.Table, ref.Column) + } + refs = append(refs, ref) + } + return refs, nil +} + +func ensureHandledColumns( + ctx context.Context, + db *gorm.DB, + columnPattern string, + handled []reference, + excludePatterns []string, +) error { + type row struct { + Table string `gorm:"column:table_name"` + Column string `gorm:"column:column_name"` + } + + query := ` +SELECT table_name, column_name +FROM information_schema.columns +WHERE table_schema = 'public' + AND column_name LIKE ? +` + args := []any{columnPattern} + for _, pattern := range excludePatterns { + query += " AND column_name NOT LIKE ?\n" + args = append(args, pattern) + } + query += "ORDER BY table_name ASC, column_name ASC" + + rows := make([]row, 0) + if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil { + return err + } + + handledSet := referenceSet(handled) + for _, item := range rows { + ref := reference{ + Table: strings.TrimSpace(item.Table), + Column: strings.TrimSpace(item.Column), + } + if _, ok := handledSet[ref.key()]; ok { + continue + } + return fmt.Errorf("unhandled warehouse-related column discovered: %s", ref.key()) + } + return nil +} + +func ensureNoBlockedWarehouseRefs(ctx context.Context, db *gorm.DB, rows []planRow, blocked []reference) error { + wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) + if len(wrongWarehouseIDs) == 0 { + return nil + } + + for _, ref := range blocked { + var count int64 + if err := db.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs). + Count(&count).Error; err != nil { + return err + } + if count > 0 { + return fmt.Errorf("found %d rows in %s.%s referencing the wrong warehouse ids; aborting", count, ref.Table, ref.Column) + } + } + return nil +} + +func ensureNoPurchaseItemWarehouseConflicts(ctx context.Context, db *gorm.DB, rows []planRow) error { + wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) + if len(wrongWarehouseIDs) == 0 { + return nil + } + + type countRow struct { + Count int64 `gorm:"column:cnt"` + } + + var row countRow + if err := db.WithContext(ctx).Raw(` +WITH wrong_pairs AS ( + SELECT + pi.id AS wrong_purchase_item_id, + pi.purchase_id, + pi.product_id, + pi.warehouse_id AS wrong_warehouse_id, + w2.id AS correct_warehouse_id + FROM purchase_items pi + JOIN warehouses w + ON w.id = pi.warehouse_id + AND w.deleted_at IS NULL + JOIN kandangs k + ON k.id = w.kandang_id + AND k.deleted_at IS NULL + JOIN LATERAL ( + SELECT w2.id, w2.name + FROM warehouses w2 + WHERE w2.location_id = k.location_id + AND UPPER(COALESCE(w2.type, '')) = 'LOKASI' + AND w2.deleted_at IS NULL + ORDER BY w2.id ASC + LIMIT 1 + ) AS w2 ON TRUE + WHERE pi.warehouse_id IN ? + AND w.kandang_id IS NOT NULL + AND w.location_id IS DISTINCT FROM k.location_id +) +SELECT COUNT(*) AS cnt +FROM wrong_pairs wp +JOIN purchase_items target + ON target.purchase_id = wp.purchase_id + AND target.product_id = wp.product_id + AND target.warehouse_id = wp.correct_warehouse_id +`, wrongWarehouseIDs).Scan(&row).Error; err != nil { + return err + } + if row.Count > 0 { + return fmt.Errorf("found %d purchase_item uniqueness collisions on (purchase_id, product_id, warehouse_id); aborting", row.Count) + } + return nil +} + +func ensureNoInFlightFifoArtifacts(ctx context.Context, db *gorm.DB, rows []planRow) error { + affectedPWIDs := mergeUintSlices(uniqueSurvivorIDs(rows), uniqueAbsorbedIDs(rows)) + if len(affectedPWIDs) == 0 { + return nil + } + + for _, ref := range []reference{ + {Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"}, + {Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"}, + } { + var count int64 + if err := db.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), affectedPWIDs). + Count(&count).Error; err != nil { + return err + } + if count > 0 { + return fmt.Errorf("found %d in-flight FIFO rows in %s for affected product warehouses; aborting", count, ref.Table) + } + } + return nil +} + +func applyPlan( + ctx context.Context, + db *gorm.DB, + rows []planRow, + opts *options, + refs *referencePlan, +) (applySummary, error) { + summary := summarizePlan(rows) + summary.UpdatedPWRefs = make(map[string]int64) + summary.UpdatedWarehouseRefs = make(map[string]int64) + + pwMoves := uniquePWMoves(rows) + warehouseMoves := uniqueWarehouseMoves(rows) + survivorWarehouseMap := buildSurvivorWarehouseMap(rows) + duplicateSurvivors := uniqueDuplicateSurvivorIDs(rows) + + fifoSvc := commonSvc.NewFifoStockV2Service(db, nil) + now := time.Now().UTC() + + err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + for _, move := range pwMoves { + for _, ref := range refs.ProductWarehouseRefs { + res := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.FromID). + Update(ref.Column, move.ToID) + if res.Error != nil { + return fmt.Errorf("move %s from pw %d to %d: %w", ref.Table+"."+ref.Column, move.FromID, move.ToID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedPWRefs[ref.Table+"."+ref.Column] += res.RowsAffected + } + } + } + + if len(pwMoves) > 0 { + fromIDs := make([]uint, 0, len(pwMoves)) + for _, move := range pwMoves { + fromIDs = append(fromIDs, move.FromID) + } + res := tx.WithContext(ctx). + Where("id IN ?", fromIDs). + Delete(&entity.ProductWarehouse{}) + if res.Error != nil { + return fmt.Errorf("delete absorbed product warehouses: %w", res.Error) + } + summary.DeletedProductWarehouses = res.RowsAffected + } + + for survivorID, correctWarehouseID := range survivorWarehouseMap { + res := tx.WithContext(ctx). + Table("product_warehouses"). + Where("id = ?", survivorID). + Update("warehouse_id", correctWarehouseID) + if res.Error != nil { + return fmt.Errorf("update survivor product_warehouse %d -> warehouse %d: %w", survivorID, correctWarehouseID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedWarehouseRefs["product_warehouses.warehouse_id"] += res.RowsAffected + } + } + + for _, move := range warehouseMoves { + for _, ref := range refs.WarehouseRefs { + res := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.WrongWarehouseID). + Update(ref.Column, move.CorrectWarehouseID) + if res.Error != nil { + return fmt.Errorf("update %s from warehouse %d to %d: %w", ref.Table+"."+ref.Column, move.WrongWarehouseID, move.CorrectWarehouseID, res.Error) + } + if res.RowsAffected > 0 { + summary.UpdatedWarehouseRefs[ref.Table+"."+ref.Column] += res.RowsAffected + } + } + } + + if len(duplicateSurvivors) > 0 { + if err := recomputeStockLogs(ctx, tx, duplicateSurvivors); err != nil { + return err + } + } + + for _, survivorID := range duplicateSurvivors { + if err := reflowAndRecalculateProductWarehouse(ctx, fifoSvc, tx, survivorID); err != nil { + return err + } + } + + if opts.DeleteWrongWarehouses { + wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) + if len(wrongWarehouseIDs) > 0 { + res := tx.WithContext(ctx). + Table("warehouses"). + Where("id IN ? AND deleted_at IS NULL", wrongWarehouseIDs). + Updates(map[string]any{ + "deleted_at": now, + "updated_at": now, + }) + if res.Error != nil { + return fmt.Errorf("soft delete wrong warehouses: %w", res.Error) + } + summary.SoftDeletedWarehouses = res.RowsAffected + } + } + + if err := verifyNoWrongWarehouseRefsRemain(ctx, tx, rows, pwMoves, refs); err != nil { + return err + } + + return nil + }) + if err != nil { + return applySummary{}, err + } + + return summary, nil +} + +func recomputeStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error { + if len(productWarehouseIDs) == 0 { + return nil + } + + query := ` +WITH recalculated AS ( + SELECT + id, + SUM(COALESCE(increase, 0) - COALESCE(decrease, 0)) + OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock + FROM stock_logs + WHERE product_warehouse_id IN ? +) +UPDATE stock_logs sl +SET stock = recalculated.running_stock +FROM recalculated +WHERE sl.id = recalculated.id +` + return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error +} + +func verifyNoWrongWarehouseRefsRemain( + ctx context.Context, + tx *gorm.DB, + rows []planRow, + pwMoves []pwMove, + refs *referencePlan, +) error { + wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows) + if len(wrongWarehouseIDs) > 0 { + for _, ref := range refs.WarehouseRefs { + var remaining int64 + if err := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs). + Count(&remaining).Error; err != nil { + return err + } + if remaining > 0 { + return fmt.Errorf("verification failed: %d rows still point to wrong warehouses via %s.%s", remaining, ref.Table, ref.Column) + } + } + } + + if len(pwMoves) > 0 { + fromIDs := make([]uint, 0, len(pwMoves)) + for _, move := range pwMoves { + fromIDs = append(fromIDs, move.FromID) + } + + var remaining int64 + for _, ref := range refs.ProductWarehouseRefs { + if err := tx.WithContext(ctx). + Table(ref.Table). + Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), fromIDs). + Count(&remaining).Error; err != nil { + return err + } + if remaining > 0 { + return fmt.Errorf("verification failed: %d rows still point to absorbed product_warehouses via %s.%s", remaining, ref.Table, ref.Column) + } + } + } + + return nil +} + +func reflowAndRecalculateProductWarehouse( + ctx context.Context, + fifoSvc commonSvc.FifoStockV2Service, + tx *gorm.DB, + productWarehouseID uint, +) error { + flagGroupCode, err := resolveFlagGroupByProductWarehouse(ctx, tx, productWarehouseID) + if err != nil { + return fmt.Errorf("resolve flag group for product_warehouse %d: %w", productWarehouseID, err) + } + + if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ + FlagGroupCode: flagGroupCode, + ProductWarehouseID: productWarehouseID, + Tx: tx, + }); err != nil { + return fmt.Errorf("reflow product_warehouse %d: %w", productWarehouseID, err) + } + + if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{ + ProductWarehouseIDs: []uint{productWarehouseID}, + FlagGroupCodes: []string{flagGroupCode}, + FixDrift: true, + Tx: tx, + }); err != nil { + return fmt.Errorf("recalculate product_warehouse %d: %w", productWarehouseID, err) + } + + return nil +} + +func resolveFlagGroupByProductWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) (string, error) { + type row struct { + FlagGroupCode string `gorm:"column:flag_group_code"` + } + + var selected row + err := tx.WithContext(ctx). + Table("fifo_stock_v2_route_rules rr"). + Select("rr.flag_group_code"). + Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). + Where("rr.is_active = TRUE"). + Where("rr.lane = ?", "STOCKABLE"). + Where("rr.function_code = ?", "PURCHASE_IN"). + Where("rr.source_table = ?", "purchase_items"). + Where(` + EXISTS ( + SELECT 1 + FROM product_warehouses pw + JOIN flags f ON f.flagable_id = pw.product_id + JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE + WHERE pw.id = ? + AND f.flagable_type = ? + AND fm.flag_group_code = rr.flag_group_code + ) + `, productWarehouseID, entity.FlagableTypeProduct). + Order("rr.id ASC"). + Limit(1). + Take(&selected).Error + if err != nil { + return "", err + } + + return strings.TrimSpace(selected.FlagGroupCode), nil +} + +func summarizePlan(rows []planRow) applySummary { + summary := applySummary{ + TargetScope: "farm", + PlanRows: len(rows), + WrongWarehouses: len(uniqueWrongWarehouseIDs(rows)), + SurvivorProductWarehouses: len(uniqueSurvivorIDs(rows)), + AbsorbedProductWarehouses: len(uniqueAbsorbedIDs(rows)), + NeedsReflowProductWarehouses: len(uniqueDuplicateSurvivorIDs(rows)), + } + return summary +} + +func renderPlan(mode string, rows []planRow, summary applySummary) { + if mode == repointOutputJSON { + payload := map[string]any{ + "rows": rows, + "summary": summary, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tWRONG_WAREHOUSE\tTARGET_WAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_PW\tSURVIVOR_QTY\tABSORBED_PW\tABSORBED_QTY\tNEEDS_REFLOW") + for _, row := range rows { + fmt.Fprintf( + w, + "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%s\t%s\t%t\n", + row.AreaName, + row.KandangLocationName, + row.KandangName, + row.WrongWarehouseName, + row.CorrectWarehouseName, + row.ProductName, + displayOptionalUint(row.ProjectFlockKandangID), + row.SurvivorPWID, + row.SurvivorCurrentQty, + displayOptionalUint(row.AbsorbedPWID), + displayOptionalFloat(row.AbsorbedCurrentQty), + row.AbsorbedPWID != nil, + ) + } + _ = w.Flush() + + fmt.Printf( + "\nSummary: target_scope=%s plan_rows=%d wrong_warehouses=%d survivor_pws=%d absorbed_pws=%d needs_reflow_pws=%d deleted_product_warehouses=%d soft_deleted_warehouses=%d\n", + summary.TargetScope, + summary.PlanRows, + summary.WrongWarehouses, + summary.SurvivorProductWarehouses, + summary.AbsorbedProductWarehouses, + summary.NeedsReflowProductWarehouses, + summary.DeletedProductWarehouses, + summary.SoftDeletedWarehouses, + ) + + if len(summary.UpdatedPWRefs) > 0 { + fmt.Println("Updated product_warehouse refs:") + printSortedCounts(summary.UpdatedPWRefs) + } + if len(summary.UpdatedWarehouseRefs) > 0 { + fmt.Println("Updated warehouse refs:") + printSortedCounts(summary.UpdatedWarehouseRefs) + } +} + +func printSortedCounts(values map[string]int64) { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + fmt.Printf(" %s=%d\n", key, values[key]) + } +} + +func (r reference) key() string { + return r.Table + "." + r.Column +} + +func referenceSet(values []reference) map[string]struct{} { + out := make(map[string]struct{}, len(values)) + for _, value := range values { + out[value.key()] = struct{}{} + } + return out +} + +func mergeReferences(groups ...[]reference) []reference { + seen := make(map[string]struct{}) + out := make([]reference, 0) + for _, group := range groups { + for _, ref := range group { + if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) { + continue + } + if _, ok := seen[ref.key()]; ok { + continue + } + seen[ref.key()] = struct{}{} + out = append(out, ref) + } + } + sort.Slice(out, func(i, j int) bool { + if out[i].Table == out[j].Table { + return out[i].Column < out[j].Column + } + return out[i].Table < out[j].Table + }) + return out +} + +func mergeUintSlices(groups ...[]uint) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, group := range groups { + for _, value := range group { + if value == 0 { + continue + } + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + out = append(out, value) + } + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func isSafeIdentifier(value string) bool { + return sqlIdentifierPattern.MatchString(strings.TrimSpace(value)) +} + +func quotedIdentifier(value string) string { + if !isSafeIdentifier(value) { + panic(fmt.Sprintf("unsafe SQL identifier: %s", value)) + } + return `"` + value + `"` +} + +func uniquePWMoves(rows []planRow) []pwMove { + seen := make(map[string]struct{}) + out := make([]pwMove, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 || *row.AbsorbedPWID == row.SurvivorPWID { + continue + } + key := fmt.Sprintf("%d:%d", *row.AbsorbedPWID, row.SurvivorPWID) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, pwMove{FromID: *row.AbsorbedPWID, ToID: row.SurvivorPWID}) + } + return out +} + +func uniqueWarehouseMoves(rows []planRow) []warehouseMove { + seen := make(map[uint]uint) + out := make([]warehouseMove, 0) + for _, row := range rows { + if _, ok := seen[row.WrongWarehouseID]; ok { + continue + } + seen[row.WrongWarehouseID] = row.CorrectWarehouseID + out = append(out, warehouseMove{ + WrongWarehouseID: row.WrongWarehouseID, + CorrectWarehouseID: row.CorrectWarehouseID, + }) + } + return out +} + +func buildSurvivorWarehouseMap(rows []planRow) map[uint]uint { + out := make(map[uint]uint) + for _, row := range rows { + out[row.SurvivorPWID] = row.CorrectWarehouseID + } + return out +} + +func uniqueWrongWarehouseIDs(rows []planRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if _, ok := seen[row.WrongWarehouseID]; ok { + continue + } + seen[row.WrongWarehouseID] = struct{}{} + out = append(out, row.WrongWarehouseID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueSurvivorIDs(rows []planRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if _, ok := seen[row.SurvivorPWID]; ok { + continue + } + seen[row.SurvivorPWID] = struct{}{} + out = append(out, row.SurvivorPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueAbsorbedIDs(rows []planRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { + continue + } + if _, ok := seen[*row.AbsorbedPWID]; ok { + continue + } + seen[*row.AbsorbedPWID] = struct{}{} + out = append(out, *row.AbsorbedPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func uniqueDuplicateSurvivorIDs(rows []planRow) []uint { + seen := make(map[uint]struct{}) + out := make([]uint, 0) + for _, row := range rows { + if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 { + continue + } + if _, ok := seen[row.SurvivorPWID]; ok { + continue + } + seen[row.SurvivorPWID] = struct{}{} + out = append(out, row.SurvivorPWID) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func andClause(filters []string) string { + if len(filters) == 0 { + return "" + } + return " AND " + strings.Join(filters, " AND ") +} + +func displayOptionalUint(value *uint) string { + if value == nil || *value == 0 { + return "-" + } + return fmt.Sprintf("%d", *value) +} + +func displayOptionalFloat(value *float64) string { + if value == nil { + return "-" + } + return fmt.Sprintf("%.3f", *value) +} diff --git a/cmd/verify-stock-consolidation/main.go b/cmd/verify-stock-consolidation/main.go new file mode 100644 index 00000000..3e267bcc --- /dev/null +++ b/cmd/verify-stock-consolidation/main.go @@ -0,0 +1,505 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "strings" + "text/tabwriter" + + "gitlab.com/mbugroup/lti-api.git/internal/config" + "gitlab.com/mbugroup/lti-api.git/internal/database" + "gorm.io/gorm" +) + +const ( + outputTable = "table" + outputJSON = "json" + + caseA = "A" + caseB = "B" + caseAll = "all" +) + +type options struct { + Output string + AreaName string + KandangLocationName string + DBSSLMode string + VerifyCase string +} + +type sourceWarehouseCheck struct { + AreaName string `json:"area_name"` + KandangLocationName string `json:"kandang_location_name"` + KandangID uint `json:"kandang_id"` + KandangName string `json:"kandang_name"` + SourceWarehouseID uint `json:"source_warehouse_id"` + SourceWarehouseName string `json:"source_warehouse_name"` + Case string `json:"case"` + DeletedAt *string `json:"deleted_at"` + StockInProductWH float64 `json:"stock_in_product_wh"` + ActivePurchaseItems int64 `json:"active_purchase_items"` + Status string `json:"status"` +} + +type destinationWarehouseCheck struct { + AreaName string `json:"area_name"` + KandangLocationName string `json:"kandang_location_name"` + FarmWarehouseID uint `json:"farm_warehouse_id"` + FarmWarehouseName string `json:"farm_warehouse_name"` + ProductID uint `json:"product_id"` + ProductName string `json:"product_name"` + CurrentQty float64 `json:"current_qty"` + StockLogsTotal float64 `json:"stock_logs_total"` + StockLogsCount int64 `json:"stock_logs_count"` + Status string `json:"status"` +} + +type orphanedReferenceCheck struct { + Table string `json:"table"` + Column string `json:"column"` + ReferenceCount int64 `json:"reference_count"` + DeletedWarehouseIDs string `json:"deleted_warehouse_ids"` +} + +type verificationSummary struct { + TotalSourceWarehouses int `json:"total_source_warehouses"` + CleanSourceWarehouses int `json:"clean_source_warehouses"` + DirtySourceWarehouses int `json:"dirty_source_warehouses"` + TotalDestinationWarehouses int `json:"total_destination_warehouses"` + MatchingDestinations int `json:"matching_destinations"` + DiscrepancyDestinations int `json:"discrepancy_destinations"` + TotalOrphanedReferences int64 `json:"total_orphaned_references"` + OverallStatus string `json:"overall_status"` +} + +func main() { + opts, err := parseFlags() + if err != nil { + log.Fatalf("invalid flags: %v", err) + } + + if opts.DBSSLMode != "" { + config.DBSSLMode = opts.DBSSLMode + } + + ctx := context.Background() + db := database.Connect(config.DBHost, config.DBName) + + // Verify source warehouses + sourceChecks, err := verifySourceWarehouses(ctx, db, opts) + if err != nil { + log.Fatalf("failed to verify source warehouses: %v", err) + } + + // Verify destination warehouses + destChecks, err := verifyDestinationWarehouses(ctx, db, opts, sourceChecks) + if err != nil { + log.Fatalf("failed to verify destination warehouses: %v", err) + } + + // Verify no orphaned references + orphanedRefs, err := verifyOrphanedReferences(ctx, db, sourceChecks) + if err != nil { + log.Fatalf("failed to verify orphaned references: %v", err) + } + + // Render results + summary := buildSummary(sourceChecks, destChecks, orphanedRefs) + renderVerification(opts.Output, sourceChecks, destChecks, orphanedRefs, summary) +} + +func parseFlags() (*options, error) { + var opts options + flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json") + flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter") + flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter") + flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require") + flag.StringVar(&opts.VerifyCase, "verify-case", caseAll, "Verify specific case: A, B, or all") + flag.Parse() + + opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) + opts.AreaName = strings.TrimSpace(opts.AreaName) + opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName) + opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode) + opts.VerifyCase = strings.ToUpper(strings.TrimSpace(opts.VerifyCase)) + + if opts.Output == "" { + opts.Output = outputTable + } + if opts.Output != outputTable && opts.Output != outputJSON { + return nil, fmt.Errorf("unsupported --output=%s", opts.Output) + } + if opts.VerifyCase == "" { + opts.VerifyCase = caseAll + } + if opts.VerifyCase != caseA && opts.VerifyCase != caseB && opts.VerifyCase != caseAll { + return nil, fmt.Errorf("unsupported --verify-case=%s", opts.VerifyCase) + } + + return &opts, nil +} + +func verifySourceWarehouses(ctx context.Context, db *gorm.DB, opts *options) ([]sourceWarehouseCheck, error) { + filters := buildFilters(opts) + query := fmt.Sprintf(` +WITH case_a_warehouses AS ( + -- Case A: Kandang-level warehouses (type != 'LOKASI' or NULL) + SELECT + w.id, + a.name AS area_name, + kl.name AS kandang_location_name, + k.id, + k.name, + 'A'::text AS case_type + FROM warehouses w + JOIN kandangs k ON k.id = w.kandang_id AND k.deleted_at IS NULL + JOIN locations kl ON kl.id = k.location_id + JOIN areas a ON a.id = kl.area_id + WHERE w.deleted_at IS NOT NULL + AND w.kandang_id IS NOT NULL + AND UPPER(COALESCE(w.type, '')) <> 'LOKASI' +), +case_b_warehouses AS ( + -- Case B: Wrong-location warehouses (location_id != kandang.location_id) + SELECT + w.id, + a.name AS area_name, + kl.name AS kandang_location_name, + k.id, + k.name, + 'B'::text AS case_type + FROM warehouses w + JOIN kandangs k ON k.id = w.kandang_id AND k.deleted_at IS NULL + JOIN locations kl ON kl.id = k.location_id + JOIN areas a ON a.id = kl.area_id + WHERE w.deleted_at IS NOT NULL + AND w.kandang_id IS NOT NULL + AND w.location_id IS DISTINCT FROM k.location_id +), +all_source_warehouses AS ( + SELECT id, area_name, kandang_location_name, id AS kandang_id, name, case_type FROM case_a_warehouses + UNION ALL + SELECT id, area_name, kandang_location_name, id AS kandang_id, name, case_type FROM case_b_warehouses +) +SELECT + asw.area_name, + asw.kandang_location_name, + asw.kandang_id, + asw.name AS kandang_name, + w.id AS source_warehouse_id, + w.name AS source_warehouse_name, + asw.case_type, + TO_CHAR(w.deleted_at, 'YYYY-MM-DD') AS deleted_at, + COALESCE(SUM(pw.qty), 0) AS stock_in_product_wh, + COUNT(DISTINCT pi.id) AS active_purchase_items +FROM all_source_warehouses asw +JOIN warehouses w ON w.id = asw.id +LEFT JOIN product_warehouses pw ON pw.warehouse_id = w.id +LEFT JOIN purchase_items pi ON pi.warehouse_id = w.id +WHERE true + %s +GROUP BY + asw.area_name, + asw.kandang_location_name, + asw.kandang_id, + asw.name, + w.id, + w.name, + asw.case_type, + w.deleted_at +ORDER BY asw.area_name ASC, asw.kandang_location_name ASC, w.name ASC +`, andClause(filters)) + + rows := make([]sourceWarehouseCheck, 0) + if err := db.WithContext(ctx).Raw(query).Scan(&rows).Error; err != nil { + return nil, err + } + + // Determine status for each row + for i := range rows { + if rows[i].StockInProductWH == 0 && rows[i].ActivePurchaseItems == 0 { + rows[i].Status = "CLEAN" + } else { + rows[i].Status = "DIRTY" + } + + // Filter by case if requested + if opts.VerifyCase != caseAll && rows[i].Case != opts.VerifyCase { + rows = append(rows[:i], rows[i+1:]...) + i-- + } + } + + return rows, nil +} + +func verifyDestinationWarehouses(ctx context.Context, db *gorm.DB, opts *options, sourceChecks []sourceWarehouseCheck) ([]destinationWarehouseCheck, error) { + filters := buildFilters(opts) + query := fmt.Sprintf(` +SELECT + a.name AS area_name, + kl.name AS kandang_location_name, + fw.id AS farm_warehouse_id, + fw.name AS farm_warehouse_name, + p.id AS product_id, + p.name AS product_name, + COALESCE(pw.qty, 0) AS current_qty, + COALESCE(SUM(sl.stock), 0) AS stock_logs_total, + COUNT(DISTINCT sl.id) AS stock_logs_count +FROM warehouses fw +JOIN locations loc ON loc.id = fw.location_id +JOIN areas a ON a.id = loc.area_id +JOIN kandangs k ON k.location_id = fw.location_id AND k.deleted_at IS NULL +JOIN locations kl ON kl.id = k.location_id +JOIN products p ON UPPER(COALESCE(p.type, '')) IN ('PAKAN', 'OVK') +LEFT JOIN product_warehouses pw ON pw.warehouse_id = fw.id AND pw.product_id = p.id +LEFT JOIN stock_logs sl ON sl.product_warehouse_id = pw.id +WHERE fw.deleted_at IS NULL + AND UPPER(COALESCE(fw.type, '')) = 'LOKASI' + %s +GROUP BY + a.name, + kl.name, + fw.id, + fw.name, + p.id, + p.name, + pw.qty +ORDER BY a.name ASC, kl.name ASC, fw.name ASC, p.name ASC +`, andClause(filters)) + + rows := make([]destinationWarehouseCheck, 0) + if err := db.WithContext(ctx).Raw(query).Scan(&rows).Error; err != nil { + return nil, err + } + + // Determine status: check if current_qty matches stock_logs + for i := range rows { + if rows[i].CurrentQty > 0 { + // Allow small floating point discrepancies + if abs(rows[i].CurrentQty-rows[i].StockLogsTotal) < 0.001 { + rows[i].Status = "MATCHED" + } else { + rows[i].Status = "DISCREPANCY" + } + } else { + rows[i].Status = "EMPTY" + } + } + + return rows, nil +} + +func verifyOrphanedReferences(ctx context.Context, db *gorm.DB, sourceChecks []sourceWarehouseCheck) ([]orphanedReferenceCheck, error) { + if len(sourceChecks) == 0 { + return []orphanedReferenceCheck{}, nil + } + + // Get unique warehouse IDs from source checks + warehouseIDs := make([]uint, 0) + for _, check := range sourceChecks { + warehouseIDs = append(warehouseIDs, check.SourceWarehouseID) + } + + // Check common references + var results []orphanedReferenceCheck + + refChecks := []struct { + table string + column string + }{ + {"purchase_items", "warehouse_id"}, + {"stock_transfers", "from_warehouse_id"}, + {"stock_transfers", "to_warehouse_id"}, + {"fifo_stock_v2_operation_log", "warehouse_id"}, + } + + for _, ref := range refChecks { + var count int64 + if err := db.Table(ref.table). + Where(fmt.Sprintf("%s IN ?", ref.column), warehouseIDs). + Count(&count).Error; err != nil { + return nil, err + } + + if count > 0 { + // Get the specific warehouse IDs + var ids []uint + if err := db.Table(ref.table). + Where(fmt.Sprintf("%s IN ?", ref.column), warehouseIDs). + Pluck(ref.column, &ids).Error; err != nil { + return nil, err + } + + idStrs := make([]string, len(ids)) + for i, id := range ids { + idStrs[i] = fmt.Sprintf("%d", id) + } + + results = append(results, orphanedReferenceCheck{ + Table: ref.table, + Column: ref.column, + ReferenceCount: count, + DeletedWarehouseIDs: strings.Join(idStrs, ", "), + }) + } + } + + return results, nil +} + +func buildFilters(opts *options) []string { + filters := make([]string, 0, 2) + if opts.AreaName != "" { + filters = append(filters, fmt.Sprintf("a.name = '%s'", opts.AreaName)) + } + if opts.KandangLocationName != "" { + filters = append(filters, fmt.Sprintf("kl.name = '%s'", opts.KandangLocationName)) + } + return filters +} + +func andClause(filters []string) string { + if len(filters) == 0 { + return "" + } + return " AND " + strings.Join(filters, " AND ") +} + +func buildSummary(sourceChecks []sourceWarehouseCheck, destChecks []destinationWarehouseCheck, orphanedRefs []orphanedReferenceCheck) verificationSummary { + summary := verificationSummary{ + TotalSourceWarehouses: len(sourceChecks), + OverallStatus: "PASS", + } + + for _, check := range sourceChecks { + if check.Status == "CLEAN" { + summary.CleanSourceWarehouses++ + } else { + summary.DirtySourceWarehouses++ + summary.OverallStatus = "FAIL" + } + } + + summary.TotalDestinationWarehouses = len(destChecks) + for _, check := range destChecks { + if check.Status == "MATCHED" || check.Status == "EMPTY" { + summary.MatchingDestinations++ + } else if check.Status == "DISCREPANCY" { + summary.DiscrepancyDestinations++ + summary.OverallStatus = "FAIL" + } + } + + for _, ref := range orphanedRefs { + summary.TotalOrphanedReferences += ref.ReferenceCount + summary.OverallStatus = "FAIL" + } + + return summary +} + +func renderVerification(mode string, sourceChecks []sourceWarehouseCheck, destChecks []destinationWarehouseCheck, orphanedRefs []orphanedReferenceCheck, summary verificationSummary) { + if mode == outputJSON { + payload := map[string]any{ + "source_warehouses": sourceChecks, + "destination_warehouses": destChecks, + "orphaned_references": orphanedRefs, + "summary": summary, + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(payload) + return + } + + // Table mode + fmt.Println("\n=== SOURCE WAREHOUSES VERIFICATION ===") + if len(sourceChecks) == 0 { + fmt.Println("No deleted warehouses found") + } else { + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tWAREHOUSE\tCASE\tDELETED_AT\tSTOCK_IN_PW\tPURCHASE_ITEMS\tSTATUS") + for _, check := range sourceChecks { + fmt.Fprintf( + w, + "%s\t%s\t%s\t%s\t%s\t%s\t%.3f\t%d\t%s\n", + check.AreaName, + check.KandangLocationName, + check.KandangName, + check.SourceWarehouseName, + check.Case, + displayOptionalString(check.DeletedAt), + check.StockInProductWH, + check.ActivePurchaseItems, + check.Status, + ) + } + _ = w.Flush() + } + + fmt.Println("\n=== DESTINATION WAREHOUSES VERIFICATION ===") + if len(destChecks) == 0 { + fmt.Println("No destination warehouses found") + } else { + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "AREA\tLOKASI\tFARM_WAREHOUSE\tPRODUCT\tCURRENT_QTY\tSTOCK_LOGS_TOTAL\tLOGS_COUNT\tSTATUS") + for _, check := range destChecks { + fmt.Fprintf( + w, + "%s\t%s\t%s\t%s\t%.3f\t%.3f\t%d\t%s\n", + check.AreaName, + check.KandangLocationName, + check.FarmWarehouseName, + check.ProductName, + check.CurrentQty, + check.StockLogsTotal, + check.StockLogsCount, + check.Status, + ) + } + _ = w.Flush() + } + + if len(orphanedRefs) > 0 { + fmt.Println("\n=== ORPHANED REFERENCES (ERRORS) ===") + w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0) + fmt.Fprintln(w, "TABLE\tCOLUMN\tCOUNT\tWAREHOUSE_IDS") + for _, ref := range orphanedRefs { + fmt.Fprintf( + w, + "%s\t%s\t%d\t%s\n", + ref.Table, + ref.Column, + ref.ReferenceCount, + ref.DeletedWarehouseIDs, + ) + } + _ = w.Flush() + } + + fmt.Printf("\n=== SUMMARY ===\n") + fmt.Printf("Source Warehouses: %d total, %d clean, %d dirty\n", summary.TotalSourceWarehouses, summary.CleanSourceWarehouses, summary.DirtySourceWarehouses) + fmt.Printf("Destination Warehouses: %d total, %d matching, %d discrepancies\n", summary.TotalDestinationWarehouses, summary.MatchingDestinations, summary.DiscrepancyDestinations) + fmt.Printf("Orphaned References: %d\n", summary.TotalOrphanedReferences) + fmt.Printf("Overall Status: %s\n", summary.OverallStatus) +} + +func displayOptionalString(value *string) string { + if value == nil || strings.TrimSpace(*value) == "" { + return "-" + } + return *value +} + +func abs(x float64) float64 { + if x < 0 { + return -x + } + return x +} diff --git a/docs/STOCK_CONSOLIDATION_GUIDE.md b/docs/STOCK_CONSOLIDATION_GUIDE.md new file mode 100644 index 00000000..10a3147b --- /dev/null +++ b/docs/STOCK_CONSOLIDATION_GUIDE.md @@ -0,0 +1,460 @@ +# Stock Consolidation Operations Guide + +This guide explains how to use the warehouse consolidation commands to fix misplaced PAKAN/OVK stocks and migrate them to the correct farm-level warehouses. + +## Overview + +The stock consolidation system handles two main scenarios: + +| Case | Scenario | Root Cause | Solution | +|------|----------|-----------|----------| +| **Case B** | Invalid kandang references | Purchases pointed to warehouses with location mismatch | Move unused stocks to correct farm-level warehouse | +| **Case A** | General kandang cleanup | Any kandang-level warehouse with unused PAKAN/OVK stocks | Consolidate to farm-level warehouse | + +## Recommended Execution Order + +For a complete stock consolidation operation, follow this sequence: + +``` +1. find-wrong-warehouse-records ← Diagnose issues +2. repoint-wrong-warehouse-relations ← Fix Case B (invalid references) +3. consolidate-kandang-to-farm-stocks ← Fix Case A (general cleanup) +4. verify-stock-consolidation ← Audit and verify results +``` + +--- + +## Command Reference + +### 1. `find-wrong-warehouse-records` — Diagnostic Tool + +**Purpose:** Identify problematic warehouses and their associated stocks before making any changes. + +**Applies to:** Both Case A and Case B scenarios + +**What it does:** +- Lists warehouses with location mismatches (Case B) +- Shows stock allocations that reference wrong warehouses +- Helps identify scope of work needed + +#### Usage: + +```bash +# Report 1: Find warehouses with location mismatches (Case B issues) +./find-wrong-warehouse-records --report=warehouses + +# Report 2: Find stock allocations in wrong warehouses (Case B impact) +./find-wrong-warehouse-records --report=usage + +# Filter by area +./find-wrong-warehouse-records --report=warehouses --area-name "East Region" + +# Filter by kandang location +./find-wrong-warehouse-records --report=usage --kandang-location-name "Location 1" + +# Filter by product type +./find-wrong-warehouse-records --report=usage --usable-type=RECORDING_STOCK + +# JSON output for analysis +./find-wrong-warehouse-records --report=usage --output=json > analysis.json +``` + +#### Output Columns (Warehouses Report): +- **AREA**: Geographic area +- **KANDANG_LOCATION**: Kandang's intended location +- **KANDANG**: Kandang name +- **WRONG_LOCATION**: Where the warehouse actually is +- **WRONG_WAREHOUSE**: Problematic warehouse name +- **CORRECT_WAREHOUSE**: Where stocks should be + +#### Output Columns (Usage Report): +- **USABLE_TYPE**: RECORDING_STOCK or MARKETING_DELIVERY +- **PRODUCTS**: Which products are affected +- **QTY_FROM_WRONG_STOCK**: How much stock is misplaced +- **SOURCE_PURCHASES**: Which purchase orders are affected + +**When to use:** +- Before starting any consolidation +- To understand the scope of issues +- To get metrics on how much stock needs moving +- To identify which areas are most affected + +--- + +### 2. `repoint-wrong-warehouse-relations` — Fix Case B (Invalid References) + +**Purpose:** Fix purchases pointed to invalid kandang warehouses (location mismatch). + +**Applies to:** Case B only + +**Cases it handles:** +- ✅ Warehouses with `location_id ≠ kandang.location_id` (location mismatch) +- ✅ Only PAKAN/OVK products +- ✅ Only unused/leftover stocks (no active allocations) +- ✅ Moves to farm-level warehouse at correct location + +**What it does:** +1. Finds product_warehouses in wrong locations +2. Consolidates duplicates into survivor warehouses +3. Updates all references across the system +4. Recalculates FIFO stocks if needed +5. Optionally soft-deletes the wrong warehouse + +#### Usage: + +```bash +# Dry-run: See what would be moved (always run first!) +./repoint-wrong-warehouse-relations + +# Dry-run with specific filters +./repoint-wrong-warehouse-relations --area-name "East Region" +./repoint-wrong-warehouse-relations --kandang-location-name "Location 1" + +# Actually apply the migration +./repoint-wrong-warehouse-relations --apply + +# Apply but keep the wrong warehouses (for audit trail) +./repoint-wrong-warehouse-relations --apply --delete-wrong-warehouses=false + +# JSON output for automation/logging +./repoint-wrong-warehouse-relations --apply --output=json > migration.json +``` + +#### Flags: +- `--apply`: Apply changes (omit for dry-run) +- `--output`: `table` (default) or `json` +- `--area-name`: Filter by exact area name +- `--kandang-location-name`: Filter by exact location name +- `--delete-wrong-warehouses`: Soft-delete wrong warehouses (default: true) +- `--db-sslmode`: PostgreSQL SSL mode override (e.g., `require`) + +#### Output: + +**Table mode shows:** +- AREA, LOCATION, KANDANG: Where the issue is +- WRONG_WAREHOUSE: Source (will be deleted) +- TARGET_WAREHOUSE: Destination (farm-level) +- PRODUCT: What's being moved +- SURVIVOR_PW / ABSORBED_PW: Consolidation details +- NEEDS_REFLOW: Whether FIFO recalculation is needed + +**Summary shows:** +``` +Summary: plan_rows=15 wrong_warehouses=3 survivor_pws=12 absorbed_pws=5 + needs_reflow_pws=3 deleted_product_warehouses=5 soft_deleted_warehouses=3 + +Updated product_warehouse refs: + fifo_stock_v2_operation_log.product_warehouse_id=8 + fifo_stock_v2_reflow_checkpoints.product_warehouse_id=3 + purchase_items.warehouse_id=12 + +Updated warehouse refs: + purchase_items.warehouse_id=12 +``` + +#### Safety Features: +- **Dry-run first**: Always preview before applying +- **Prechecks**: Verifies no blocked references or FIFO conflicts +- **Atomic transactions**: All-or-nothing database updates +- **Reference verification**: Confirms all references were updated +- **Stock log recalculation**: Ensures FIFO accuracy after moves + +--- + +### 3. `consolidate-kandang-to-farm-stocks` — Fix Case A (General Cleanup) + +**Purpose:** Consolidate ALL kandang-level PAKAN/OVK stocks to farm-level warehouse. + +**Applies to:** Case A only + +**Cases it handles:** +- ✅ ALL kandang-level warehouses (type ≠ 'LOKASI') +- ✅ Only PAKAN/OVK products +- ✅ Only unused/leftover stocks (no active allocations) +- ✅ Moves to farm-level warehouse regardless of warehouse validity +- ✅ No location validation (processes all kandang warehouses) + +**What it does:** +1. Finds all kandang-level warehouses with unused stocks +2. Consolidates duplicates into survivor warehouses +3. Updates all references across the system +4. Recalculates FIFO stocks if needed +5. Optionally soft-deletes the kandang warehouse + +#### Usage: + +```bash +# Dry-run: See what would be consolidated +./consolidate-kandang-to-farm-stocks + +# Dry-run with filters +./consolidate-kandang-to-farm-stocks --area-name "East Region" +./consolidate-kandang-to-farm-stocks --kandang-location-name "Location 1" + +# Actually apply the consolidation +./consolidate-kandang-to-farm-stocks --apply + +# Apply but keep kandang warehouses +./consolidate-kandang-to-farm-stocks --apply --delete-kandang-warehouses=false + +# JSON output for logging +./consolidate-kandang-to-farm-stocks --apply --output=json > consolidation.json +``` + +#### Flags: +- `--apply`: Apply changes (omit for dry-run) +- `--output`: `table` (default) or `json` +- `--area-name`: Filter by exact area name +- `--kandang-location-name`: Filter by exact location name +- `--delete-kandang-warehouses`: Soft-delete kandang warehouses (default: true) +- `--db-sslmode`: PostgreSQL SSL mode override + +#### Output Format: +Similar to Case B, shows: +- Source kandang warehouse → Destination farm warehouse +- Product and quantity details +- Consolidation and FIFO reflow information + +#### Key Differences from Case B: +| Aspect | Case B | Case A | +|--------|--------|--------| +| Scope | Wrong-location warehouses only | ALL kandang-level warehouses | +| Validation | Checks location mismatch | No validation checks | +| When to use | After finding mismatches | General cleanup/consolidation | +| Risk level | Lower (targeted fix) | Higher (broader scope) | + +--- + +### 4. `verify-stock-consolidation` — Audit and Verify + +**Purpose:** Verify that stock consolidations were successful and no stocks were lost. + +**Applies to:** Both Case A and Case B (post-migration verification) + +**What it checks:** + +#### ✅ Source Warehouse Verification +Ensures deleted warehouses are clean: +- **CLEAN**: No remaining stock or purchase references +- **DIRTY**: Still has orphaned data (migration incomplete) + +#### ✅ Destination Warehouse Verification +Ensures farm-level warehouses received stocks correctly: +- **MATCHED**: Quantity in product_warehouse matches stock_logs +- **DISCREPANCY**: Quantity mismatch (data integrity issue!) +- **EMPTY**: No stocks (correct if nothing was supposed to move) + +#### ✅ Orphaned Reference Detection +Finds any remaining references to deleted warehouses in: +- `purchase_items.warehouse_id` +- `stock_transfers.from/to_warehouse_id` +- `fifo_stock_v2_operation_log.warehouse_id` + +#### Usage: + +```bash +# Verify all consolidations (Case A + B together) +./verify-stock-consolidation + +# Verify only Case B results +./verify-stock-consolidation --verify-case=B + +# Verify only Case A results +./verify-stock-consolidation --verify-case=A + +# Filter by area +./verify-stock-consolidation --area-name "East Region" + +# Filter by location +./verify-stock-consolidation --kandang-location-name "Location 1" + +# JSON output for reporting +./verify-stock-consolidation --output=json > verification_report.json +``` + +#### Flags: +- `--verify-case`: `A`, `B`, or `all` (default) +- `--output`: `table` (default) or `json` +- `--area-name`: Filter by exact area name +- `--kandang-location-name`: Filter by exact location name +- `--db-sslmode`: PostgreSQL SSL mode override + +#### Output Sections: + +**1. Source Warehouses** +``` +AREA LOKASI KANDANG WAREHOUSE CASE DELETED_AT STOCK PURCHASES STATUS +Area A Location 1 Kandang A KWH-A-01 A 2026-04-23 0.000 0 CLEAN +Area A Location 1 Kandang B WH-WRONG-001 B 2026-04-23 2.500 1 DIRTY ❌ +``` + +**2. Destination Warehouses** +``` +AREA LOKASI FARM_WAREHOUSE PRODUCT QTY LOGS_TOTAL LOGS STATUS +Area A Location 1 FWH-LOC-001 PAKAN A 2.500 2.500 3 MATCHED ✅ +Area A Location 1 FWH-LOC-001 OVK B 5.000 4.999 5 DISCREPANCY ❌ +``` + +**3. Orphaned References** (if any) +``` +TABLE COLUMN COUNT WAREHOUSE_IDS +purchase_items warehouse_id 3 1001, 1002, 1003 +stock_transfers from_warehouse_id 1 1001 +``` + +**4. Summary** +``` +Source Warehouses: 10 total, 8 clean, 2 dirty +Destination Warehouses: 15 total, 14 matching, 1 discrepancy +Orphaned References: 4 +Overall Status: FAIL ❌ +``` + +#### Interpreting Results: + +| Scenario | Meaning | Action | +|----------|---------|--------| +| ✅ Overall Status: PASS | All migrations successful | No action needed | +| ❌ Dirty Source Warehouses | Stocks not fully moved | Re-run repoint/consolidate | +| ❌ Discrepancy Destinations | Quantity mismatch | Investigate data integrity | +| ❌ Orphaned References | Broken references remain | Manual cleanup needed | + +--- + +## Complete Workflow Example + +### Scenario: Consolidate East Region stocks + +```bash +# Step 1: Understand the scope (Case B issues) +./find-wrong-warehouse-records --report=warehouses --area-name "East Region" +./find-wrong-warehouse-records --report=usage --area-name "East Region" + +# Review the output to understand: +# - How many wrong warehouses +# - How much stock needs moving +# - Which products are affected + +# Step 2: Fix Case B (invalid kandang references) +./repoint-wrong-warehouse-relations --area-name "East Region" +# Review dry-run output + +./repoint-wrong-warehouse-relations --apply --area-name "East Region" +# Watch for summary - should show successful updates + +# Step 3: Fix Case A (general kandang cleanup) +./consolidate-kandang-to-farm-stocks --area-name "East Region" +# Review dry-run output + +./consolidate-kandang-to-farm-stocks --apply --area-name "East Region" +# Watch for summary - should show consolidation complete + +# Step 4: Verify everything worked +./verify-stock-consolidation --area-name "East Region" +# Should show: +# - All source warehouses: CLEAN +# - All destination warehouses: MATCHED +# - Orphaned references: 0 +# - Overall Status: PASS ✅ +``` + +--- + +## Flags Reference + +### Common Flags (All Commands) + +| Flag | Description | Example | +|------|-------------|---------| +| `--output` | Output format | `--output=json` | +| `--area-name` | Filter by area | `--area-name "East Region"` | +| `--kandang-location-name` | Filter by location | `--kandang-location-name "Location 1"` | +| `--db-sslmode` | PostgreSQL SSL mode | `--db-sslmode=require` | + +### Migration-Specific Flags + +| Command | Flag | Description | +|---------|------|-------------| +| `repoint-wrong-warehouse-relations` | `--apply` | Apply changes | +| `repoint-wrong-warehouse-relations` | `--delete-wrong-warehouses` | Delete wrong warehouses (default: true) | +| `consolidate-kandang-to-farm-stocks` | `--apply` | Apply changes | +| `consolidate-kandang-to-farm-stocks` | `--delete-kandang-warehouses` | Delete kandang warehouses (default: true) | +| `verify-stock-consolidation` | `--verify-case` | Verify specific case (A, B, or all) | + +--- + +## Best Practices + +### Before Running Any Command + +1. **Back up the database** — These operations modify stock data +2. **Run in dry-run mode first** — Always preview changes before applying +3. **Check during low-traffic periods** — Avoid peak hours +4. **Have a rollback plan** — Know how to restore from backup if needed + +### When Running Migrations + +1. **Start small** — Use `--area-name` to test on one area first +2. **Check the summary** — Verify numbers make sense +3. **Watch for errors** — Stop if you see unexpected error messages +4. **Run verification immediately after** — Don't wait to verify + +### Red Flags (Stop and Investigate) + +- ❌ More rows affected than expected +- ❌ Negative quantities or zero counts where expecting data +- ❌ Errors about blocked references +- ❌ FIFO conflicts or in-flight artifacts +- ❌ Very large numbers in NEEDS_REFLOW + +### JSON Output for Automation + +All commands support `--output=json` for: +- Piping to other tools +- Parsing in scripts +- Generating reports +- Integration with monitoring systems + +```bash +# Example: Extract all affected warehouses to CSV +./find-wrong-warehouse-records --report=warehouses --output=json \ + | jq -r '.rows[] | [.area_name, .kandang_name, .wrong_warehouse_name] | @csv' \ + > affected_warehouses.csv +``` + +--- + +## Troubleshooting + +### Issue: "No wrong warehouse relations found" +- **Cause**: No matching Case B issues in the filter scope +- **Solution**: Remove filters or use different criteria + +### Issue: "found X rows still point to wrong warehouses" +- **Cause**: References not fully migrated +- **Solution**: Check for blocked references, re-run command + +### Issue: "discrepancy_destinations > 0" in verification +- **Cause**: Quantity mismatch in farm warehouse +- **Solution**: Investigate manually or rollback and retry + +### Issue: "DIRTY source warehouses" in verification +- **Cause**: Deleted warehouses still have stock/references +- **Solution**: May need manual cleanup or re-run migrations + +--- + +## Performance Notes + +- Commands use efficient SQL queries with proper filtering +- Large operations (100K+ rows) may take a few minutes +- Use area/location filters to reduce scope for testing +- Dry-runs don't modify database and complete quickly + +## Support + +For issues or questions: +1. Review the relevant section of this guide +2. Check the command output for specific error messages +3. Run verification to diagnose state issues +4. Contact the development team with JSON outputs from failed operations diff --git a/repoint-wrong-warehouse-relations b/repoint-wrong-warehouse-relations new file mode 100755 index 00000000..5e2f81aa Binary files /dev/null and b/repoint-wrong-warehouse-relations differ