Files
lti-api/cmd/reconcile-fifo-total-used/main.go
T

485 lines
15 KiB
Go

// Command reconcile-fifo-total-used memperbaiki "phantom total_used" pada
// stockable lot FIFO v2 (recording_eggs, stock_transfer_details, dst.).
//
// LATAR BELAKANG
// Sebelum fix di population_allocation.go, ReleaseByUsable melepas SEMUA alokasi
// CONSUME sebuah usable (termasuk RECORDING_EGG / STOCK_TRANSFER_IN) tanpa
// men-decrement total_used stockable-nya. Akibatnya total_used "nyangkut" lebih
// besar dari jumlah alokasi ACTIVE yang membackup-nya (phantom) → available
// dihitung 0 padahal stok fisik ada → Delivery Order telur nyangkut di pending.
//
// PERBAIKAN
// Sumber kebenaran konsumsi = stock_allocations status ACTIVE & purpose CONSUME.
// Command ini menyetel ulang total_used setiap lot = SUM(alokasi ACTIVE CONSUME
// untuk lot itu), lalu menjalankan FIFO v2 Reflow per (PW, flag group) sehingga
// pending dialokasi ulang ke stok yang kini available dan product_warehouses.qty
// dihitung ulang.
//
// PENTING: jalankan command ini SETELAH fix kode (population_allocation.go)
// ter-deploy, dan SEBELUM mengaktifkan blok over-sell telur.
//
// Cara pakai:
//
// go run ./cmd/reconcile-fifo-total-used/ -pw=1292 # dry-run 1 PW
// go run ./cmd/reconcile-fifo-total-used/ -pw=1292 -apply # apply 1 PW
// go run ./cmd/reconcile-fifo-total-used/ -pw=1292,1296,1268 -apply
// go run ./cmd/reconcile-fifo-total-used/ -pw=1292 -apply -output=json
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
"gitlab.com/mbugroup/lti-api.git/internal/config"
"gitlab.com/mbugroup/lti-api.git/internal/database"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
)
const (
outputTable = "table"
outputJSON = "json"
)
var identifierRe = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
type options struct {
Apply bool
Output string
DBSSLMode string
PWs []uint
}
// stockableRule menggambarkan satu jenis stockable (mis. RECORDING_EGG) beserta
// tabel & kolom yang dipakai FIFO v2 untuk melacak stok masuk.
type stockableRule struct {
LegacyTypeKey string
SourceTable string
SourceIDColumn string
UsedQuantityCol string
ProductWarehouseCol string
QuantityCol string
ScopeSQL string
}
type pwResult struct {
ProductWarehouseID uint `json:"product_warehouse_id"`
Product string `json:"product"`
Warehouse string `json:"warehouse"`
FlagGroups []string `json:"flag_groups"`
QtyBefore float64 `json:"qty_before"`
TotalUsedBefore float64 `json:"total_used_before"`
ActiveConsume float64 `json:"active_consume"`
Phantom float64 `json:"phantom"`
PendingBefore float64 `json:"pending_before"`
QtyAfter float64 `json:"qty_after,omitempty"`
PendingAfter float64 `json:"pending_after,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
type runSummary struct {
Mode string `json:"mode"`
TargetPWs []uint `json:"target_pws"`
Results []pwResult `json:"results"`
DurationSeconds float64 `json:"duration_seconds"`
OverallStatus string `json:"overall_status"`
}
func main() {
opts, err := parseFlags()
if err != nil {
log.Fatalf("invalid flags: %v", err)
}
if opts.DBSSLMode != "" {
config.DBSSLMode = opts.DBSSLMode
}
ctx := context.Background()
db := database.Connect(config.DBHost, config.DBName)
// Quiet the per-query GORM logging; this command emits its own summary and
// the reflow step would otherwise produce a very noisy query log.
db = db.Session(&gorm.Session{Logger: gormlogger.Default.LogMode(gormlogger.Silent)})
logger := logrus.New()
logger.SetLevel(logrus.WarnLevel)
svc := commonSvc.NewFifoStockV2Service(db, logger)
start := time.Now()
stockableRules, err := loadStockableRules(ctx, db)
if err != nil {
log.Fatalf("failed to load stockable route rules: %v", err)
}
pendingRules, err := loadUsablePendingRules(ctx, db)
if err != nil {
log.Fatalf("failed to load usable route rules: %v", err)
}
summary := runSummary{
Mode: modeLabel(opts.Apply),
TargetPWs: opts.PWs,
OverallStatus: "PASS",
}
for _, pw := range opts.PWs {
res := reconcilePW(ctx, db, svc, pw, stockableRules, pendingRules, opts.Apply)
if res.Status == "FAIL" {
summary.OverallStatus = "FAIL"
}
summary.Results = append(summary.Results, res)
}
summary.DurationSeconds = time.Since(start).Seconds()
render(opts.Output, summary)
if !opts.Apply {
fmt.Println("\nDry-run only. Re-run with -apply to reset total_used and reflow the PW(s) above.")
}
if summary.OverallStatus == "FAIL" {
os.Exit(1)
}
}
// reconcilePW mengukur kondisi PW, lalu (jika -apply) menyetel ulang total_used
// tiap lot dan menjalankan reflow, semuanya dalam satu transaksi.
func reconcilePW(
ctx context.Context,
db *gorm.DB,
svc commonSvc.FifoStockV2Service,
pw uint,
stockableRules []stockableRule,
pendingRules []stockableRule,
apply bool,
) pwResult {
res := pwResult{ProductWarehouseID: pw, Status: "OK"}
if name, wh, err := loadPWIdentity(ctx, db, pw); err != nil {
res.Status = "FAIL"
res.Error = fmt.Sprintf("load identity: %v", err)
return res
} else {
res.Product, res.Warehouse = name, wh
}
flagGroups, err := loadFlagGroups(ctx, db, pw)
if err != nil {
res.Status = "FAIL"
res.Error = fmt.Sprintf("load flag groups: %v", err)
return res
}
res.FlagGroups = flagGroups
res.QtyBefore, _ = loadQty(ctx, db, pw)
res.TotalUsedBefore, _ = sumStockableUsed(ctx, db, pw, stockableRules)
res.ActiveConsume, _ = loadActiveConsume(ctx, db, pw)
res.PendingBefore, _ = sumPending(ctx, db, pw, pendingRules)
res.Phantom = res.TotalUsedBefore - res.ActiveConsume
if !apply {
return res
}
err = db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, rule := range stockableRules {
if err := recomputeUsed(ctx, tx, rule, pw); err != nil {
return fmt.Errorf("recompute %s: %w", rule.LegacyTypeKey, err)
}
}
for _, fg := range flagGroups {
if _, err := svc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{
FlagGroupCode: fg,
ProductWarehouseID: pw,
Tx: tx,
}); err != nil {
return fmt.Errorf("reflow flag_group=%s: %w", fg, err)
}
}
return nil
})
if err != nil {
res.Status = "FAIL"
res.Error = err.Error()
return res
}
res.QtyAfter, _ = loadQty(ctx, db, pw)
res.PendingAfter, _ = sumPending(ctx, db, pw, pendingRules)
return res
}
func recomputeUsed(ctx context.Context, tx *gorm.DB, rule stockableRule, pw uint) error {
q := fmt.Sprintf(`
UPDATE %s t
SET %s = COALESCE((
SELECT SUM(sa.qty) FROM stock_allocations sa
WHERE sa.stockable_type = ?
AND sa.stockable_id = t.%s
AND sa.status = 'ACTIVE'
AND sa.allocation_purpose = 'CONSUME'
), 0)
WHERE t.%s = ?`, rule.SourceTable, rule.UsedQuantityCol, rule.SourceIDColumn, rule.ProductWarehouseCol)
if strings.TrimSpace(rule.ScopeSQL) != "" {
q += " AND (" + rule.ScopeSQL + ")"
}
return tx.WithContext(ctx).Exec(q, rule.LegacyTypeKey, pw).Error
}
// ---- loaders ----
func loadStockableRules(ctx context.Context, db *gorm.DB) ([]stockableRule, error) {
type row struct {
LegacyTypeKey string `gorm:"column:legacy_type_key"`
SourceTable string `gorm:"column:source_table"`
SourceIDColumn string `gorm:"column:source_id_column"`
UsedQuantityCol string `gorm:"column:used_quantity_col"`
ProductWarehouseCol string `gorm:"column:product_warehouse_col"`
QuantityCol string `gorm:"column:quantity_col"`
ScopeSQL string `gorm:"column:scope_sql"`
}
var rows []row
err := db.WithContext(ctx).
Table("fifo_stock_v2_route_rules").
Select("DISTINCT legacy_type_key, source_table, source_id_column, COALESCE(used_quantity_col,'') AS used_quantity_col, product_warehouse_col, COALESCE(quantity_col,'') AS quantity_col, COALESCE(scope_sql,'') AS scope_sql").
Where("lane = ? AND is_active = TRUE", "STOCKABLE").
Where("used_quantity_col IS NOT NULL AND used_quantity_col <> ''").
Scan(&rows).Error
if err != nil {
return nil, err
}
out := make([]stockableRule, 0, len(rows))
seen := map[string]bool{}
for _, r := range rows {
if !validIdentifiers(r.SourceTable, r.SourceIDColumn, r.UsedQuantityCol, r.ProductWarehouseCol) {
return nil, fmt.Errorf("unsafe identifier in route rule %s (table=%s used=%s pw=%s)", r.LegacyTypeKey, r.SourceTable, r.UsedQuantityCol, r.ProductWarehouseCol)
}
key := r.LegacyTypeKey + "|" + r.SourceTable + "|" + r.UsedQuantityCol + "|" + r.ProductWarehouseCol
if seen[key] {
continue
}
seen[key] = true
out = append(out, stockableRule(r))
}
return out, nil
}
func loadUsablePendingRules(ctx context.Context, db *gorm.DB) ([]stockableRule, error) {
type row struct {
SourceTable string `gorm:"column:source_table"`
ProductWarehouseCol string `gorm:"column:product_warehouse_col"`
PendingCol string `gorm:"column:pending_quantity_col"`
ScopeSQL string `gorm:"column:scope_sql"`
}
var rows []row
err := db.WithContext(ctx).
Table("fifo_stock_v2_route_rules").
Select("DISTINCT source_table, product_warehouse_col, pending_quantity_col, COALESCE(scope_sql,'') AS scope_sql").
Where("lane = ? AND is_active = TRUE", "USABLE").
Where("pending_quantity_col IS NOT NULL AND pending_quantity_col <> ''").
Scan(&rows).Error
if err != nil {
return nil, err
}
out := make([]stockableRule, 0, len(rows))
seen := map[string]bool{}
for _, r := range rows {
if !validIdentifiers(r.SourceTable, r.ProductWarehouseCol, r.PendingCol) {
return nil, fmt.Errorf("unsafe identifier in usable rule (table=%s pw=%s pending=%s)", r.SourceTable, r.ProductWarehouseCol, r.PendingCol)
}
key := r.SourceTable + "|" + r.PendingCol + "|" + r.ProductWarehouseCol
if seen[key] {
continue
}
seen[key] = true
out = append(out, stockableRule{
SourceTable: r.SourceTable,
ProductWarehouseCol: r.ProductWarehouseCol,
UsedQuantityCol: r.PendingCol, // reuse field as the column to SUM
ScopeSQL: r.ScopeSQL,
})
}
return out, nil
}
func loadPWIdentity(ctx context.Context, db *gorm.DB, pw uint) (string, string, error) {
type row struct {
Product string `gorm:"column:product"`
Warehouse string `gorm:"column:warehouse"`
}
var out row
err := db.WithContext(ctx).
Table("product_warehouses pw").
Select("p.name AS product, w.name AS warehouse").
Joins("JOIN products p ON p.id = pw.product_id").
Joins("JOIN warehouses w ON w.id = pw.warehouse_id").
Where("pw.id = ?", pw).
Take(&out).Error
return out.Product, out.Warehouse, err
}
func loadFlagGroups(ctx context.Context, db *gorm.DB, pw uint) ([]string, error) {
var groups []string
err := db.WithContext(ctx).
Table("stock_allocations").
Distinct("flag_group_code").
Where("product_warehouse_id = ? AND flag_group_code IS NOT NULL AND flag_group_code <> ''", pw).
Order("flag_group_code ASC").
Scan(&groups).Error
return groups, err
}
func loadQty(ctx context.Context, db *gorm.DB, pw uint) (float64, error) {
var v float64
err := db.WithContext(ctx).
Table("product_warehouses").
Select("COALESCE(qty,0)").
Where("id = ?", pw).
Scan(&v).Error
return v, err
}
func loadActiveConsume(ctx context.Context, db *gorm.DB, pw uint) (float64, error) {
var v float64
err := db.WithContext(ctx).
Table("stock_allocations").
Select("COALESCE(SUM(qty),0)").
Where("product_warehouse_id = ? AND status = 'ACTIVE' AND allocation_purpose = 'CONSUME'", pw).
Scan(&v).Error
return v, err
}
func sumStockableUsed(ctx context.Context, db *gorm.DB, pw uint, rules []stockableRule) (float64, error) {
total := 0.0
for _, rule := range rules {
v, err := sumColumn(ctx, db, rule.SourceTable, rule.UsedQuantityCol, rule.ProductWarehouseCol, rule.ScopeSQL, pw)
if err != nil {
return total, err
}
total += v
}
return total, nil
}
func sumPending(ctx context.Context, db *gorm.DB, pw uint, rules []stockableRule) (float64, error) {
total := 0.0
for _, rule := range rules {
v, err := sumColumn(ctx, db, rule.SourceTable, rule.UsedQuantityCol, rule.ProductWarehouseCol, rule.ScopeSQL, pw)
if err != nil {
return total, err
}
total += v
}
return total, nil
}
func sumColumn(ctx context.Context, db *gorm.DB, table, col, pwCol, scope string, pw uint) (float64, error) {
q := fmt.Sprintf("SELECT COALESCE(SUM(%s),0) FROM %s WHERE %s = ?", col, table, pwCol)
if strings.TrimSpace(scope) != "" {
q += " AND (" + scope + ")"
}
var v float64
err := db.WithContext(ctx).Raw(q, pw).Scan(&v).Error
return v, err
}
// ---- flags / render ----
func parseFlags() (*options, error) {
var opts options
var pwsRaw string
flag.BoolVar(&opts.Apply, "apply", false, "Apply the reconciliation (omit for dry-run)")
flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json")
flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override")
flag.StringVar(&pwsRaw, "pw", "", "Comma-separated product_warehouse ids to reconcile (required)")
flag.Parse()
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
if opts.Output == "" {
opts.Output = outputTable
}
if opts.Output != outputTable && opts.Output != outputJSON {
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
}
pwsRaw = strings.TrimSpace(pwsRaw)
if pwsRaw == "" {
return nil, fmt.Errorf("-pw is required (e.g. -pw=1292 or -pw=1292,1296)")
}
for _, part := range strings.Split(pwsRaw, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
id, err := strconv.ParseUint(part, 10, 64)
if err != nil || id == 0 {
return nil, fmt.Errorf("invalid product_warehouse id %q", part)
}
opts.PWs = append(opts.PWs, uint(id))
}
if len(opts.PWs) == 0 {
return nil, fmt.Errorf("no valid product_warehouse ids parsed from -pw")
}
return &opts, nil
}
func validIdentifiers(ids ...string) bool {
for _, id := range ids {
if !identifierRe.MatchString(id) {
return false
}
}
return true
}
func modeLabel(apply bool) string {
if apply {
return "APPLY"
}
return "DRY_RUN"
}
func render(mode string, summary runSummary) {
if mode == outputJSON {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(summary)
return
}
fmt.Printf("=== Reconcile FIFO total_used ===\n")
fmt.Printf("Mode : %s\n", summary.Mode)
for _, r := range summary.Results {
fmt.Printf("\n--- PW %d (%s @ %s) [%s] ---\n", r.ProductWarehouseID, r.Product, r.Warehouse, r.Status)
if r.Error != "" {
fmt.Printf("ERROR : %s\n", r.Error)
}
fmt.Printf("Flag groups : %s\n", strings.Join(r.FlagGroups, ", "))
fmt.Printf("qty (before) : %.3f\n", r.QtyBefore)
fmt.Printf("Σ total_used : %.3f\n", r.TotalUsedBefore)
fmt.Printf("Σ active CONSUME: %.3f\n", r.ActiveConsume)
fmt.Printf("PHANTOM : %.3f (total_used yang akan dilepas)\n", r.Phantom)
fmt.Printf("pending (before): %.3f\n", r.PendingBefore)
if summary.Mode == "APPLY" && r.Status == "OK" {
fmt.Printf("qty (after) : %.3f\n", r.QtyAfter)
fmt.Printf("pending (after) : %.3f\n", r.PendingAfter)
}
}
fmt.Printf("\nDuration : %.2fs\n", summary.DurationSeconds)
fmt.Printf("Overall status : %s\n", summary.OverallStatus)
}