// Command reconcile-fifo-total-used memperbaiki "phantom total_used" pada // stockable lot FIFO v2 (recording_eggs, stock_transfer_details, dst.). // // LATAR BELAKANG // Sebelum fix di population_allocation.go, ReleaseByUsable melepas SEMUA alokasi // CONSUME sebuah usable (termasuk RECORDING_EGG / STOCK_TRANSFER_IN) tanpa // men-decrement total_used stockable-nya. Akibatnya total_used "nyangkut" lebih // besar dari jumlah alokasi ACTIVE yang membackup-nya (phantom) → available // dihitung 0 padahal stok fisik ada → Delivery Order telur nyangkut di pending. // // PERBAIKAN // Sumber kebenaran konsumsi = stock_allocations status ACTIVE & purpose CONSUME. // Command ini menyetel ulang total_used setiap lot = SUM(alokasi ACTIVE CONSUME // untuk lot itu), lalu menjalankan FIFO v2 Reflow per (PW, flag group) sehingga // pending dialokasi ulang ke stok yang kini available dan product_warehouses.qty // dihitung ulang. // // PENTING: jalankan command ini SETELAH fix kode (population_allocation.go) // ter-deploy, dan SEBELUM mengaktifkan blok over-sell telur. // // Cara pakai: // // go run ./cmd/reconcile-fifo-total-used/ -pw=1292 # dry-run 1 PW // go run ./cmd/reconcile-fifo-total-used/ -pw=1292 -apply # apply 1 PW // go run ./cmd/reconcile-fifo-total-used/ -pw=1292,1296,1268 -apply // go run ./cmd/reconcile-fifo-total-used/ -pw=1292 -apply -output=json package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "regexp" "strconv" "strings" "time" commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" "gitlab.com/mbugroup/lti-api.git/internal/config" "gitlab.com/mbugroup/lti-api.git/internal/database" "github.com/sirupsen/logrus" "gorm.io/gorm" gormlogger "gorm.io/gorm/logger" ) const ( outputTable = "table" outputJSON = "json" ) var identifierRe = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) type options struct { Apply bool Output string DBSSLMode string PWs []uint } // stockableRule menggambarkan satu jenis stockable (mis. RECORDING_EGG) beserta // tabel & kolom yang dipakai FIFO v2 untuk melacak stok masuk. type stockableRule struct { LegacyTypeKey string SourceTable string SourceIDColumn string UsedQuantityCol string ProductWarehouseCol string QuantityCol string ScopeSQL string } type pwResult struct { ProductWarehouseID uint `json:"product_warehouse_id"` Product string `json:"product"` Warehouse string `json:"warehouse"` FlagGroups []string `json:"flag_groups"` QtyBefore float64 `json:"qty_before"` TotalUsedBefore float64 `json:"total_used_before"` ActiveConsume float64 `json:"active_consume"` Phantom float64 `json:"phantom"` PendingBefore float64 `json:"pending_before"` QtyAfter float64 `json:"qty_after,omitempty"` PendingAfter float64 `json:"pending_after,omitempty"` Status string `json:"status"` Error string `json:"error,omitempty"` } type runSummary struct { Mode string `json:"mode"` TargetPWs []uint `json:"target_pws"` Results []pwResult `json:"results"` DurationSeconds float64 `json:"duration_seconds"` OverallStatus string `json:"overall_status"` } func main() { opts, err := parseFlags() if err != nil { log.Fatalf("invalid flags: %v", err) } if opts.DBSSLMode != "" { config.DBSSLMode = opts.DBSSLMode } ctx := context.Background() db := database.Connect(config.DBHost, config.DBName) // Quiet the per-query GORM logging; this command emits its own summary and // the reflow step would otherwise produce a very noisy query log. db = db.Session(&gorm.Session{Logger: gormlogger.Default.LogMode(gormlogger.Silent)}) logger := logrus.New() logger.SetLevel(logrus.WarnLevel) svc := commonSvc.NewFifoStockV2Service(db, logger) start := time.Now() stockableRules, err := loadStockableRules(ctx, db) if err != nil { log.Fatalf("failed to load stockable route rules: %v", err) } pendingRules, err := loadUsablePendingRules(ctx, db) if err != nil { log.Fatalf("failed to load usable route rules: %v", err) } summary := runSummary{ Mode: modeLabel(opts.Apply), TargetPWs: opts.PWs, OverallStatus: "PASS", } for _, pw := range opts.PWs { res := reconcilePW(ctx, db, svc, pw, stockableRules, pendingRules, opts.Apply) if res.Status == "FAIL" { summary.OverallStatus = "FAIL" } summary.Results = append(summary.Results, res) } summary.DurationSeconds = time.Since(start).Seconds() render(opts.Output, summary) if !opts.Apply { fmt.Println("\nDry-run only. Re-run with -apply to reset total_used and reflow the PW(s) above.") } if summary.OverallStatus == "FAIL" { os.Exit(1) } } // reconcilePW mengukur kondisi PW, lalu (jika -apply) menyetel ulang total_used // tiap lot dan menjalankan reflow, semuanya dalam satu transaksi. func reconcilePW( ctx context.Context, db *gorm.DB, svc commonSvc.FifoStockV2Service, pw uint, stockableRules []stockableRule, pendingRules []stockableRule, apply bool, ) pwResult { res := pwResult{ProductWarehouseID: pw, Status: "OK"} if name, wh, err := loadPWIdentity(ctx, db, pw); err != nil { res.Status = "FAIL" res.Error = fmt.Sprintf("load identity: %v", err) return res } else { res.Product, res.Warehouse = name, wh } flagGroups, err := loadFlagGroups(ctx, db, pw) if err != nil { res.Status = "FAIL" res.Error = fmt.Sprintf("load flag groups: %v", err) return res } res.FlagGroups = flagGroups res.QtyBefore, _ = loadQty(ctx, db, pw) res.TotalUsedBefore, _ = sumStockableUsed(ctx, db, pw, stockableRules) res.ActiveConsume, _ = loadActiveConsume(ctx, db, pw) res.PendingBefore, _ = sumPending(ctx, db, pw, pendingRules) res.Phantom = res.TotalUsedBefore - res.ActiveConsume if !apply { return res } err = db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { for _, rule := range stockableRules { if err := recomputeUsed(ctx, tx, rule, pw); err != nil { return fmt.Errorf("recompute %s: %w", rule.LegacyTypeKey, err) } } for _, fg := range flagGroups { if _, err := svc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{ FlagGroupCode: fg, ProductWarehouseID: pw, Tx: tx, }); err != nil { return fmt.Errorf("reflow flag_group=%s: %w", fg, err) } } return nil }) if err != nil { res.Status = "FAIL" res.Error = err.Error() return res } res.QtyAfter, _ = loadQty(ctx, db, pw) res.PendingAfter, _ = sumPending(ctx, db, pw, pendingRules) return res } func recomputeUsed(ctx context.Context, tx *gorm.DB, rule stockableRule, pw uint) error { q := fmt.Sprintf(` UPDATE %s t SET %s = COALESCE(( SELECT SUM(sa.qty) FROM stock_allocations sa WHERE sa.stockable_type = ? AND sa.stockable_id = t.%s AND sa.status = 'ACTIVE' AND sa.allocation_purpose = 'CONSUME' ), 0) WHERE t.%s = ?`, rule.SourceTable, rule.UsedQuantityCol, rule.SourceIDColumn, rule.ProductWarehouseCol) if strings.TrimSpace(rule.ScopeSQL) != "" { q += " AND (" + rule.ScopeSQL + ")" } return tx.WithContext(ctx).Exec(q, rule.LegacyTypeKey, pw).Error } // ---- loaders ---- func loadStockableRules(ctx context.Context, db *gorm.DB) ([]stockableRule, error) { type row struct { LegacyTypeKey string `gorm:"column:legacy_type_key"` SourceTable string `gorm:"column:source_table"` SourceIDColumn string `gorm:"column:source_id_column"` UsedQuantityCol string `gorm:"column:used_quantity_col"` ProductWarehouseCol string `gorm:"column:product_warehouse_col"` QuantityCol string `gorm:"column:quantity_col"` ScopeSQL string `gorm:"column:scope_sql"` } var rows []row err := db.WithContext(ctx). Table("fifo_stock_v2_route_rules"). Select("DISTINCT legacy_type_key, source_table, source_id_column, COALESCE(used_quantity_col,'') AS used_quantity_col, product_warehouse_col, COALESCE(quantity_col,'') AS quantity_col, COALESCE(scope_sql,'') AS scope_sql"). Where("lane = ? AND is_active = TRUE", "STOCKABLE"). Where("used_quantity_col IS NOT NULL AND used_quantity_col <> ''"). Scan(&rows).Error if err != nil { return nil, err } out := make([]stockableRule, 0, len(rows)) seen := map[string]bool{} for _, r := range rows { if !validIdentifiers(r.SourceTable, r.SourceIDColumn, r.UsedQuantityCol, r.ProductWarehouseCol) { return nil, fmt.Errorf("unsafe identifier in route rule %s (table=%s used=%s pw=%s)", r.LegacyTypeKey, r.SourceTable, r.UsedQuantityCol, r.ProductWarehouseCol) } key := r.LegacyTypeKey + "|" + r.SourceTable + "|" + r.UsedQuantityCol + "|" + r.ProductWarehouseCol if seen[key] { continue } seen[key] = true out = append(out, stockableRule(r)) } return out, nil } func loadUsablePendingRules(ctx context.Context, db *gorm.DB) ([]stockableRule, error) { type row struct { SourceTable string `gorm:"column:source_table"` ProductWarehouseCol string `gorm:"column:product_warehouse_col"` PendingCol string `gorm:"column:pending_quantity_col"` ScopeSQL string `gorm:"column:scope_sql"` } var rows []row err := db.WithContext(ctx). Table("fifo_stock_v2_route_rules"). Select("DISTINCT source_table, product_warehouse_col, pending_quantity_col, COALESCE(scope_sql,'') AS scope_sql"). Where("lane = ? AND is_active = TRUE", "USABLE"). Where("pending_quantity_col IS NOT NULL AND pending_quantity_col <> ''"). Scan(&rows).Error if err != nil { return nil, err } out := make([]stockableRule, 0, len(rows)) seen := map[string]bool{} for _, r := range rows { if !validIdentifiers(r.SourceTable, r.ProductWarehouseCol, r.PendingCol) { return nil, fmt.Errorf("unsafe identifier in usable rule (table=%s pw=%s pending=%s)", r.SourceTable, r.ProductWarehouseCol, r.PendingCol) } key := r.SourceTable + "|" + r.PendingCol + "|" + r.ProductWarehouseCol if seen[key] { continue } seen[key] = true out = append(out, stockableRule{ SourceTable: r.SourceTable, ProductWarehouseCol: r.ProductWarehouseCol, UsedQuantityCol: r.PendingCol, // reuse field as the column to SUM ScopeSQL: r.ScopeSQL, }) } return out, nil } func loadPWIdentity(ctx context.Context, db *gorm.DB, pw uint) (string, string, error) { type row struct { Product string `gorm:"column:product"` Warehouse string `gorm:"column:warehouse"` } var out row err := db.WithContext(ctx). Table("product_warehouses pw"). Select("p.name AS product, w.name AS warehouse"). Joins("JOIN products p ON p.id = pw.product_id"). Joins("JOIN warehouses w ON w.id = pw.warehouse_id"). Where("pw.id = ?", pw). Take(&out).Error return out.Product, out.Warehouse, err } func loadFlagGroups(ctx context.Context, db *gorm.DB, pw uint) ([]string, error) { var groups []string err := db.WithContext(ctx). Table("stock_allocations"). Distinct("flag_group_code"). Where("product_warehouse_id = ? AND flag_group_code IS NOT NULL AND flag_group_code <> ''", pw). Order("flag_group_code ASC"). Scan(&groups).Error return groups, err } func loadQty(ctx context.Context, db *gorm.DB, pw uint) (float64, error) { var v float64 err := db.WithContext(ctx). Table("product_warehouses"). Select("COALESCE(qty,0)"). Where("id = ?", pw). Scan(&v).Error return v, err } func loadActiveConsume(ctx context.Context, db *gorm.DB, pw uint) (float64, error) { var v float64 err := db.WithContext(ctx). Table("stock_allocations"). Select("COALESCE(SUM(qty),0)"). Where("product_warehouse_id = ? AND status = 'ACTIVE' AND allocation_purpose = 'CONSUME'", pw). Scan(&v).Error return v, err } func sumStockableUsed(ctx context.Context, db *gorm.DB, pw uint, rules []stockableRule) (float64, error) { total := 0.0 for _, rule := range rules { v, err := sumColumn(ctx, db, rule.SourceTable, rule.UsedQuantityCol, rule.ProductWarehouseCol, rule.ScopeSQL, pw) if err != nil { return total, err } total += v } return total, nil } func sumPending(ctx context.Context, db *gorm.DB, pw uint, rules []stockableRule) (float64, error) { total := 0.0 for _, rule := range rules { v, err := sumColumn(ctx, db, rule.SourceTable, rule.UsedQuantityCol, rule.ProductWarehouseCol, rule.ScopeSQL, pw) if err != nil { return total, err } total += v } return total, nil } func sumColumn(ctx context.Context, db *gorm.DB, table, col, pwCol, scope string, pw uint) (float64, error) { q := fmt.Sprintf("SELECT COALESCE(SUM(%s),0) FROM %s WHERE %s = ?", col, table, pwCol) if strings.TrimSpace(scope) != "" { q += " AND (" + scope + ")" } var v float64 err := db.WithContext(ctx).Raw(q, pw).Scan(&v).Error return v, err } // ---- flags / render ---- func parseFlags() (*options, error) { var opts options var pwsRaw string flag.BoolVar(&opts.Apply, "apply", false, "Apply the reconciliation (omit for dry-run)") flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json") flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override") flag.StringVar(&pwsRaw, "pw", "", "Comma-separated product_warehouse ids to reconcile (required)") flag.Parse() opts.Output = strings.ToLower(strings.TrimSpace(opts.Output)) if opts.Output == "" { opts.Output = outputTable } if opts.Output != outputTable && opts.Output != outputJSON { return nil, fmt.Errorf("unsupported --output=%s", opts.Output) } pwsRaw = strings.TrimSpace(pwsRaw) if pwsRaw == "" { return nil, fmt.Errorf("-pw is required (e.g. -pw=1292 or -pw=1292,1296)") } for _, part := range strings.Split(pwsRaw, ",") { part = strings.TrimSpace(part) if part == "" { continue } id, err := strconv.ParseUint(part, 10, 64) if err != nil || id == 0 { return nil, fmt.Errorf("invalid product_warehouse id %q", part) } opts.PWs = append(opts.PWs, uint(id)) } if len(opts.PWs) == 0 { return nil, fmt.Errorf("no valid product_warehouse ids parsed from -pw") } return &opts, nil } func validIdentifiers(ids ...string) bool { for _, id := range ids { if !identifierRe.MatchString(id) { return false } } return true } func modeLabel(apply bool) string { if apply { return "APPLY" } return "DRY_RUN" } func render(mode string, summary runSummary) { if mode == outputJSON { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") _ = enc.Encode(summary) return } fmt.Printf("=== Reconcile FIFO total_used ===\n") fmt.Printf("Mode : %s\n", summary.Mode) for _, r := range summary.Results { fmt.Printf("\n--- PW %d (%s @ %s) [%s] ---\n", r.ProductWarehouseID, r.Product, r.Warehouse, r.Status) if r.Error != "" { fmt.Printf("ERROR : %s\n", r.Error) } fmt.Printf("Flag groups : %s\n", strings.Join(r.FlagGroups, ", ")) fmt.Printf("qty (before) : %.3f\n", r.QtyBefore) fmt.Printf("Σ total_used : %.3f\n", r.TotalUsedBefore) fmt.Printf("Σ active CONSUME: %.3f\n", r.ActiveConsume) fmt.Printf("PHANTOM : %.3f (total_used yang akan dilepas)\n", r.Phantom) fmt.Printf("pending (before): %.3f\n", r.PendingBefore) if summary.Mode == "APPLY" && r.Status == "OK" { fmt.Printf("qty (after) : %.3f\n", r.QtyAfter) fmt.Printf("pending (after) : %.3f\n", r.PendingAfter) } } fmt.Printf("\nDuration : %.2fs\n", summary.DurationSeconds) fmt.Printf("Overall status : %s\n", summary.OverallStatus) }