Files
lti-api/cmd/repoint-wrong-warehouse-relations/main.go
T
2026-04-23 22:41:32 +07:00

1044 lines
30 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"sort"
"strings"
"text/tabwriter"
"time"
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
"gitlab.com/mbugroup/lti-api.git/internal/config"
"gitlab.com/mbugroup/lti-api.git/internal/database"
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
"gorm.io/gorm"
)
const (
repointOutputTable = "table"
repointOutputJSON = "json"
)
type options struct {
Apply bool
Output string
AreaName string
KandangLocationName string
DBSSLMode string
DeleteWrongWarehouses bool
}
type planRow struct {
AreaName string `gorm:"column:area_name" json:"area_name"`
KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"`
KandangID uint `gorm:"column:kandang_id" json:"kandang_id"`
KandangName string `gorm:"column:kandang_name" json:"kandang_name"`
WrongWarehouseID uint `gorm:"column:wrong_warehouse_id" json:"wrong_warehouse_id"`
WrongWarehouseName string `gorm:"column:wrong_warehouse_name" json:"wrong_warehouse_name"`
CorrectWarehouseID uint `gorm:"column:correct_warehouse_id" json:"correct_warehouse_id"`
CorrectWarehouseName string `gorm:"column:correct_warehouse_name" json:"correct_warehouse_name"`
ProductID uint `gorm:"column:product_id" json:"product_id"`
ProductName string `gorm:"column:product_name" json:"product_name"`
ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id" json:"project_flock_kandang_id,omitempty"`
SurvivorPWID uint `gorm:"column:survivor_pw_id" json:"survivor_pw_id"`
SurvivorCurrentQty float64 `gorm:"column:survivor_current_qty" json:"survivor_current_qty"`
AbsorbedPWID *uint `gorm:"column:absorbed_pw_id" json:"absorbed_pw_id,omitempty"`
AbsorbedCurrentQty *float64 `gorm:"column:absorbed_current_qty" json:"absorbed_current_qty,omitempty"`
}
type pwMove struct {
FromID uint
ToID uint
}
type warehouseMove struct {
WrongWarehouseID uint
CorrectWarehouseID uint
}
type applySummary struct {
TargetScope string `json:"target_scope,omitempty"`
PlanRows int `json:"plan_rows"`
WrongWarehouses int `json:"wrong_warehouses"`
SurvivorProductWarehouses int `json:"survivor_product_warehouses"`
AbsorbedProductWarehouses int `json:"absorbed_product_warehouses"`
NeedsReflowProductWarehouses int `json:"needs_reflow_product_warehouses"`
UpdatedPWRefs map[string]int64 `json:"updated_pw_refs,omitempty"`
UpdatedWarehouseRefs map[string]int64 `json:"updated_warehouse_refs,omitempty"`
DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"`
SoftDeletedWarehouses int64 `json:"soft_deleted_warehouses,omitempty"`
}
type reference struct {
Table string
Column string
}
type referencePlan struct {
ProductWarehouseRefs []reference
WarehouseRefs []reference
BlockedWarehouseRefs []reference
}
var sqlIdentifierPattern = regexp.MustCompile(`^[a-z_][a-z0-9_]*$`)
var extraProductWarehouseRefs = []reference{
{Table: "fifo_stock_v2_operation_log", Column: "product_warehouse_id"},
{Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"},
{Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"},
{Table: "recording_depletions", Column: "source_product_warehouse_id"},
}
var allowedWarehouseRefs = []reference{
{Table: "product_warehouses", Column: "warehouse_id"},
{Table: "purchase_items", Column: "warehouse_id"},
}
var blockedWarehouseRefs = []reference{
{Table: "stock_transfers", Column: "from_warehouse_id"},
{Table: "stock_transfers", Column: "to_warehouse_id"},
}
func main() {
opts, err := parseFlags()
if err != nil {
log.Fatalf("invalid flags: %v", err)
}
if opts.DBSSLMode != "" {
config.DBSSLMode = opts.DBSSLMode
}
ctx := context.Background()
db := database.Connect(config.DBHost, config.DBName)
rows, err := loadPlanRows(ctx, db, opts)
if err != nil {
log.Fatalf("failed to load plan rows: %v", err)
}
if len(rows) == 0 {
fmt.Println("No misplaced PAKAN/OVK stocks found in wrong-location warehouses")
return
}
refs, err := buildReferencePlan(ctx, db)
if err != nil {
log.Fatalf("failed to inspect warehouse references: %v", err)
}
if err := runPrechecks(ctx, db, rows, refs, opts); err != nil {
log.Fatalf("precheck failed: %v", err)
}
summary := summarizePlan(rows)
if !opts.Apply {
renderPlan(opts.Output, rows, summary)
return
}
applied, err := applyPlan(ctx, db, rows, opts, refs)
if err != nil {
log.Fatalf("apply failed: %v", err)
}
renderPlan(opts.Output, rows, applied)
}
func parseFlags() (*options, error) {
var opts options
flag.BoolVar(&opts.Apply, "apply", false, "Apply the migration. If false, run as dry-run")
flag.StringVar(&opts.Output, "output", repointOutputTable, "Output format: table or json")
flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter")
flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter")
flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require")
flag.BoolVar(&opts.DeleteWrongWarehouses, "delete-wrong-warehouses", true, "Soft delete wrong warehouse rows after all references have been moved")
flag.Parse()
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
opts.AreaName = strings.TrimSpace(opts.AreaName)
opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName)
opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode)
if opts.Output == "" {
opts.Output = repointOutputTable
}
if opts.Output != repointOutputTable && opts.Output != repointOutputJSON {
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
}
return &opts, nil
}
func loadPlanRows(ctx context.Context, db *gorm.DB, opts *options) ([]planRow, error) {
filters := make([]string, 0, 2)
args := make([]any, 0, 2)
if opts.AreaName != "" {
filters = append(filters, "a.name = ?")
args = append(args, opts.AreaName)
}
if opts.KandangLocationName != "" {
filters = append(filters, "kl.name = ?")
args = append(args, opts.KandangLocationName)
}
query := fmt.Sprintf(`
SELECT
a.name AS area_name,
kl.name AS kandang_location_name,
k.id AS kandang_id,
k.name AS kandang_name,
w.id AS wrong_warehouse_id,
w.name AS wrong_warehouse_name,
correct_w.id AS correct_warehouse_id,
correct_w.name AS correct_warehouse_name,
p.id AS product_id,
p.name AS product_name,
wp.project_flock_kandang_id,
wp.id AS survivor_pw_id,
COALESCE(wp.qty, 0) AS survivor_current_qty,
cpw.id AS absorbed_pw_id,
cpw.qty AS absorbed_current_qty
FROM warehouses w
JOIN kandangs k
ON k.id = w.kandang_id
AND k.deleted_at IS NULL
JOIN locations kl
ON kl.id = k.location_id
JOIN areas a
ON a.id = kl.area_id
JOIN LATERAL (
SELECT w2.id, w2.name
FROM warehouses w2
WHERE w2.location_id = k.location_id
AND UPPER(COALESCE(w2.type, '')) = 'LOKASI'
AND w2.deleted_at IS NULL
ORDER BY w2.id ASC
LIMIT 1
) AS correct_w ON TRUE
JOIN product_warehouses wp
ON wp.warehouse_id = w.id
JOIN products p
ON p.id = wp.product_id
JOIN flags f
ON f.flagable_id = p.id
AND f.flagable_type = 'products'
AND UPPER(f.name) IN ('PAKAN', 'OVK')
LEFT JOIN product_warehouses cpw
ON cpw.product_id = wp.product_id
AND cpw.warehouse_id = correct_w.id
AND cpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id
WHERE w.deleted_at IS NULL
AND w.kandang_id IS NOT NULL
AND w.location_id IS DISTINCT FROM k.location_id
AND NOT EXISTS (
SELECT 1
FROM stock_allocations sa
WHERE sa.stockable_type = 'PURCHASE_ITEMS'
AND sa.stockable_id IN (
SELECT pi.id
FROM purchase_items pi
WHERE pi.warehouse_id = w.id
AND pi.product_id = p.id
)
AND sa.status = 'ACTIVE'
AND sa.allocation_purpose = 'CONSUME'
AND sa.deleted_at IS NULL
)
%s
ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC
`, andClause(filters))
rows := make([]planRow, 0)
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
return nil, err
}
return rows, nil
}
func buildReferencePlan(ctx context.Context, db *gorm.DB) (*referencePlan, error) {
productWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "product_warehouses")
if err != nil {
return nil, err
}
productWarehouseRefs = mergeReferences(productWarehouseRefs, extraProductWarehouseRefs)
if err := ensureHandledColumns(ctx, db, "%product_warehouse_id%", productWarehouseRefs, nil); err != nil {
return nil, err
}
discoveredWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "warehouses")
if err != nil {
return nil, err
}
allowedWarehouseSet := referenceSet(allowedWarehouseRefs)
blockedWarehouseSet := referenceSet(blockedWarehouseRefs)
warehouseRefs := make([]reference, 0, len(discoveredWarehouseRefs))
for _, ref := range discoveredWarehouseRefs {
key := ref.key()
if _, ok := allowedWarehouseSet[key]; ok {
warehouseRefs = append(warehouseRefs, ref)
continue
}
if _, ok := blockedWarehouseSet[key]; ok {
continue
}
return nil, fmt.Errorf("unsupported warehouse foreign-key reference discovered: %s", key)
}
if err := ensureHandledColumns(ctx, db, "%warehouse_id%", mergeReferences(warehouseRefs, blockedWarehouseRefs), []string{"%product_warehouse_id%"}); err != nil {
return nil, err
}
return &referencePlan{
ProductWarehouseRefs: productWarehouseRefs,
WarehouseRefs: warehouseRefs,
BlockedWarehouseRefs: append([]reference(nil), blockedWarehouseRefs...),
}, nil
}
func runPrechecks(ctx context.Context, db *gorm.DB, rows []planRow, refs *referencePlan, opts *options) error {
if err := ensureNoBlockedWarehouseRefs(ctx, db, rows, refs.BlockedWarehouseRefs); err != nil {
return err
}
if err := ensureNoPurchaseItemWarehouseConflicts(ctx, db, rows); err != nil {
return err
}
if err := ensureNoInFlightFifoArtifacts(ctx, db, rows); err != nil {
return err
}
return nil
}
func discoverSingleColumnFKReferences(ctx context.Context, db *gorm.DB, referencedTable string) ([]reference, error) {
type countRow struct {
Count int64 `gorm:"column:cnt"`
}
var multiColumn countRow
if err := db.WithContext(ctx).Raw(`
SELECT COUNT(*) AS cnt
FROM pg_constraint c
JOIN pg_class ref ON ref.oid = c.confrelid
JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace
WHERE c.contype = 'f'
AND ref_ns.nspname = 'public'
AND ref.relname = ?
AND COALESCE(array_length(c.conkey, 1), 0) <> 1
`, referencedTable).Scan(&multiColumn).Error; err != nil {
return nil, err
}
if multiColumn.Count > 0 {
return nil, fmt.Errorf("table %s has %d multi-column foreign keys; command only supports single-column rewrites", referencedTable, multiColumn.Count)
}
type row struct {
Table string `gorm:"column:table_name"`
Column string `gorm:"column:column_name"`
}
selected := make([]row, 0)
if err := db.WithContext(ctx).Raw(`
SELECT
src.relname AS table_name,
src_att.attname AS column_name
FROM pg_constraint c
JOIN pg_class src ON src.oid = c.conrelid
JOIN pg_namespace src_ns ON src_ns.oid = src.relnamespace
JOIN pg_class ref ON ref.oid = c.confrelid
JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace
JOIN pg_attribute src_att ON src_att.attrelid = src.oid AND src_att.attnum = c.conkey[1]
WHERE c.contype = 'f'
AND src_ns.nspname = 'public'
AND ref_ns.nspname = 'public'
AND ref.relname = ?
AND COALESCE(array_length(c.conkey, 1), 0) = 1
ORDER BY src.relname ASC, src_att.attname ASC
`, referencedTable).Scan(&selected).Error; err != nil {
return nil, err
}
refs := make([]reference, 0, len(selected))
for _, item := range selected {
ref := reference{
Table: strings.TrimSpace(item.Table),
Column: strings.TrimSpace(item.Column),
}
if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) {
return nil, fmt.Errorf("unsafe identifier discovered while inspecting %s references: %s.%s", referencedTable, ref.Table, ref.Column)
}
refs = append(refs, ref)
}
return refs, nil
}
func ensureHandledColumns(
ctx context.Context,
db *gorm.DB,
columnPattern string,
handled []reference,
excludePatterns []string,
) error {
type row struct {
Table string `gorm:"column:table_name"`
Column string `gorm:"column:column_name"`
}
query := `
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND column_name LIKE ?
`
args := []any{columnPattern}
for _, pattern := range excludePatterns {
query += " AND column_name NOT LIKE ?\n"
args = append(args, pattern)
}
query += "ORDER BY table_name ASC, column_name ASC"
rows := make([]row, 0)
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
return err
}
handledSet := referenceSet(handled)
for _, item := range rows {
ref := reference{
Table: strings.TrimSpace(item.Table),
Column: strings.TrimSpace(item.Column),
}
if _, ok := handledSet[ref.key()]; ok {
continue
}
return fmt.Errorf("unhandled warehouse-related column discovered: %s", ref.key())
}
return nil
}
func ensureNoBlockedWarehouseRefs(ctx context.Context, db *gorm.DB, rows []planRow, blocked []reference) error {
wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows)
if len(wrongWarehouseIDs) == 0 {
return nil
}
for _, ref := range blocked {
var count int64
if err := db.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs).
Count(&count).Error; err != nil {
return err
}
if count > 0 {
return fmt.Errorf("found %d rows in %s.%s referencing the wrong warehouse ids; aborting", count, ref.Table, ref.Column)
}
}
return nil
}
func ensureNoPurchaseItemWarehouseConflicts(ctx context.Context, db *gorm.DB, rows []planRow) error {
wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows)
if len(wrongWarehouseIDs) == 0 {
return nil
}
type countRow struct {
Count int64 `gorm:"column:cnt"`
}
var row countRow
if err := db.WithContext(ctx).Raw(`
WITH wrong_pairs AS (
SELECT
pi.id AS wrong_purchase_item_id,
pi.purchase_id,
pi.product_id,
pi.warehouse_id AS wrong_warehouse_id,
w2.id AS correct_warehouse_id
FROM purchase_items pi
JOIN warehouses w
ON w.id = pi.warehouse_id
AND w.deleted_at IS NULL
JOIN kandangs k
ON k.id = w.kandang_id
AND k.deleted_at IS NULL
JOIN LATERAL (
SELECT w2.id, w2.name
FROM warehouses w2
WHERE w2.location_id = k.location_id
AND UPPER(COALESCE(w2.type, '')) = 'LOKASI'
AND w2.deleted_at IS NULL
ORDER BY w2.id ASC
LIMIT 1
) AS w2 ON TRUE
WHERE pi.warehouse_id IN ?
AND w.kandang_id IS NOT NULL
AND w.location_id IS DISTINCT FROM k.location_id
)
SELECT COUNT(*) AS cnt
FROM wrong_pairs wp
JOIN purchase_items target
ON target.purchase_id = wp.purchase_id
AND target.product_id = wp.product_id
AND target.warehouse_id = wp.correct_warehouse_id
`, wrongWarehouseIDs).Scan(&row).Error; err != nil {
return err
}
if row.Count > 0 {
return fmt.Errorf("found %d purchase_item uniqueness collisions on (purchase_id, product_id, warehouse_id); aborting", row.Count)
}
return nil
}
func ensureNoInFlightFifoArtifacts(ctx context.Context, db *gorm.DB, rows []planRow) error {
affectedPWIDs := mergeUintSlices(uniqueSurvivorIDs(rows), uniqueAbsorbedIDs(rows))
if len(affectedPWIDs) == 0 {
return nil
}
for _, ref := range []reference{
{Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"},
{Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"},
} {
var count int64
if err := db.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), affectedPWIDs).
Count(&count).Error; err != nil {
return err
}
if count > 0 {
return fmt.Errorf("found %d in-flight FIFO rows in %s for affected product warehouses; aborting", count, ref.Table)
}
}
return nil
}
func applyPlan(
ctx context.Context,
db *gorm.DB,
rows []planRow,
opts *options,
refs *referencePlan,
) (applySummary, error) {
summary := summarizePlan(rows)
summary.UpdatedPWRefs = make(map[string]int64)
summary.UpdatedWarehouseRefs = make(map[string]int64)
pwMoves := uniquePWMoves(rows)
warehouseMoves := uniqueWarehouseMoves(rows)
survivorWarehouseMap := buildSurvivorWarehouseMap(rows)
duplicateSurvivors := uniqueDuplicateSurvivorIDs(rows)
fifoSvc := commonSvc.NewFifoStockV2Service(db, nil)
now := time.Now().UTC()
err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, move := range pwMoves {
for _, ref := range refs.ProductWarehouseRefs {
res := tx.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.FromID).
Update(ref.Column, move.ToID)
if res.Error != nil {
return fmt.Errorf("move %s from pw %d to %d: %w", ref.Table+"."+ref.Column, move.FromID, move.ToID, res.Error)
}
if res.RowsAffected > 0 {
summary.UpdatedPWRefs[ref.Table+"."+ref.Column] += res.RowsAffected
}
}
}
if len(pwMoves) > 0 {
fromIDs := make([]uint, 0, len(pwMoves))
for _, move := range pwMoves {
fromIDs = append(fromIDs, move.FromID)
}
res := tx.WithContext(ctx).
Where("id IN ?", fromIDs).
Delete(&entity.ProductWarehouse{})
if res.Error != nil {
return fmt.Errorf("delete absorbed product warehouses: %w", res.Error)
}
summary.DeletedProductWarehouses = res.RowsAffected
}
for survivorID, correctWarehouseID := range survivorWarehouseMap {
res := tx.WithContext(ctx).
Table("product_warehouses").
Where("id = ?", survivorID).
Update("warehouse_id", correctWarehouseID)
if res.Error != nil {
return fmt.Errorf("update survivor product_warehouse %d -> warehouse %d: %w", survivorID, correctWarehouseID, res.Error)
}
if res.RowsAffected > 0 {
summary.UpdatedWarehouseRefs["product_warehouses.warehouse_id"] += res.RowsAffected
}
}
for _, move := range warehouseMoves {
for _, ref := range refs.WarehouseRefs {
res := tx.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.WrongWarehouseID).
Update(ref.Column, move.CorrectWarehouseID)
if res.Error != nil {
return fmt.Errorf("update %s from warehouse %d to %d: %w", ref.Table+"."+ref.Column, move.WrongWarehouseID, move.CorrectWarehouseID, res.Error)
}
if res.RowsAffected > 0 {
summary.UpdatedWarehouseRefs[ref.Table+"."+ref.Column] += res.RowsAffected
}
}
}
if len(duplicateSurvivors) > 0 {
if err := recomputeStockLogs(ctx, tx, duplicateSurvivors); err != nil {
return err
}
}
for _, survivorID := range duplicateSurvivors {
if err := reflowAndRecalculateProductWarehouse(ctx, fifoSvc, tx, survivorID); err != nil {
return err
}
}
if opts.DeleteWrongWarehouses {
wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows)
if len(wrongWarehouseIDs) > 0 {
res := tx.WithContext(ctx).
Table("warehouses").
Where("id IN ? AND deleted_at IS NULL", wrongWarehouseIDs).
Updates(map[string]any{
"deleted_at": now,
"updated_at": now,
})
if res.Error != nil {
return fmt.Errorf("soft delete wrong warehouses: %w", res.Error)
}
summary.SoftDeletedWarehouses = res.RowsAffected
}
}
if err := verifyNoWrongWarehouseRefsRemain(ctx, tx, rows, pwMoves, refs); err != nil {
return err
}
return nil
})
if err != nil {
return applySummary{}, err
}
return summary, nil
}
func recomputeStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error {
if len(productWarehouseIDs) == 0 {
return nil
}
query := `
WITH recalculated AS (
SELECT
id,
SUM(COALESCE(increase, 0) - COALESCE(decrease, 0))
OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock
FROM stock_logs
WHERE product_warehouse_id IN ?
)
UPDATE stock_logs sl
SET stock = recalculated.running_stock
FROM recalculated
WHERE sl.id = recalculated.id
`
return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error
}
func verifyNoWrongWarehouseRefsRemain(
ctx context.Context,
tx *gorm.DB,
rows []planRow,
pwMoves []pwMove,
refs *referencePlan,
) error {
wrongWarehouseIDs := uniqueWrongWarehouseIDs(rows)
if len(wrongWarehouseIDs) > 0 {
for _, ref := range refs.WarehouseRefs {
var remaining int64
if err := tx.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), wrongWarehouseIDs).
Count(&remaining).Error; err != nil {
return err
}
if remaining > 0 {
return fmt.Errorf("verification failed: %d rows still point to wrong warehouses via %s.%s", remaining, ref.Table, ref.Column)
}
}
}
if len(pwMoves) > 0 {
fromIDs := make([]uint, 0, len(pwMoves))
for _, move := range pwMoves {
fromIDs = append(fromIDs, move.FromID)
}
var remaining int64
for _, ref := range refs.ProductWarehouseRefs {
if err := tx.WithContext(ctx).
Table(ref.Table).
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), fromIDs).
Count(&remaining).Error; err != nil {
return err
}
if remaining > 0 {
return fmt.Errorf("verification failed: %d rows still point to absorbed product_warehouses via %s.%s", remaining, ref.Table, ref.Column)
}
}
}
return nil
}
func reflowAndRecalculateProductWarehouse(
ctx context.Context,
fifoSvc commonSvc.FifoStockV2Service,
tx *gorm.DB,
productWarehouseID uint,
) error {
flagGroupCode, err := resolveFlagGroupByProductWarehouse(ctx, tx, productWarehouseID)
if err != nil {
return fmt.Errorf("resolve flag group for product_warehouse %d: %w", productWarehouseID, err)
}
if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{
FlagGroupCode: flagGroupCode,
ProductWarehouseID: productWarehouseID,
Tx: tx,
}); err != nil {
return fmt.Errorf("reflow product_warehouse %d: %w", productWarehouseID, err)
}
if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{
ProductWarehouseIDs: []uint{productWarehouseID},
FlagGroupCodes: []string{flagGroupCode},
FixDrift: true,
Tx: tx,
}); err != nil {
return fmt.Errorf("recalculate product_warehouse %d: %w", productWarehouseID, err)
}
return nil
}
func resolveFlagGroupByProductWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) (string, error) {
type row struct {
FlagGroupCode string `gorm:"column:flag_group_code"`
}
var selected row
err := tx.WithContext(ctx).
Table("fifo_stock_v2_route_rules rr").
Select("rr.flag_group_code").
Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE").
Where("rr.is_active = TRUE").
Where("rr.lane = ?", "STOCKABLE").
Where("rr.function_code = ?", "PURCHASE_IN").
Where("rr.source_table = ?", "purchase_items").
Where(`
EXISTS (
SELECT 1
FROM product_warehouses pw
JOIN flags f ON f.flagable_id = pw.product_id
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
WHERE pw.id = ?
AND f.flagable_type = ?
AND fm.flag_group_code = rr.flag_group_code
)
`, productWarehouseID, entity.FlagableTypeProduct).
Order("rr.id ASC").
Limit(1).
Take(&selected).Error
if err != nil {
return "", err
}
return strings.TrimSpace(selected.FlagGroupCode), nil
}
func summarizePlan(rows []planRow) applySummary {
summary := applySummary{
TargetScope: "farm",
PlanRows: len(rows),
WrongWarehouses: len(uniqueWrongWarehouseIDs(rows)),
SurvivorProductWarehouses: len(uniqueSurvivorIDs(rows)),
AbsorbedProductWarehouses: len(uniqueAbsorbedIDs(rows)),
NeedsReflowProductWarehouses: len(uniqueDuplicateSurvivorIDs(rows)),
}
return summary
}
func renderPlan(mode string, rows []planRow, summary applySummary) {
if mode == repointOutputJSON {
payload := map[string]any{
"rows": rows,
"summary": summary,
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(payload)
return
}
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tWRONG_WAREHOUSE\tTARGET_WAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_PW\tSURVIVOR_QTY\tABSORBED_PW\tABSORBED_QTY\tNEEDS_REFLOW")
for _, row := range rows {
fmt.Fprintf(
w,
"%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%s\t%s\t%t\n",
row.AreaName,
row.KandangLocationName,
row.KandangName,
row.WrongWarehouseName,
row.CorrectWarehouseName,
row.ProductName,
displayOptionalUint(row.ProjectFlockKandangID),
row.SurvivorPWID,
row.SurvivorCurrentQty,
displayOptionalUint(row.AbsorbedPWID),
displayOptionalFloat(row.AbsorbedCurrentQty),
row.AbsorbedPWID != nil,
)
}
_ = w.Flush()
fmt.Printf(
"\nSummary: target_scope=%s plan_rows=%d wrong_warehouses=%d survivor_pws=%d absorbed_pws=%d needs_reflow_pws=%d deleted_product_warehouses=%d soft_deleted_warehouses=%d\n",
summary.TargetScope,
summary.PlanRows,
summary.WrongWarehouses,
summary.SurvivorProductWarehouses,
summary.AbsorbedProductWarehouses,
summary.NeedsReflowProductWarehouses,
summary.DeletedProductWarehouses,
summary.SoftDeletedWarehouses,
)
if len(summary.UpdatedPWRefs) > 0 {
fmt.Println("Updated product_warehouse refs:")
printSortedCounts(summary.UpdatedPWRefs)
}
if len(summary.UpdatedWarehouseRefs) > 0 {
fmt.Println("Updated warehouse refs:")
printSortedCounts(summary.UpdatedWarehouseRefs)
}
}
func printSortedCounts(values map[string]int64) {
keys := make([]string, 0, len(values))
for key := range values {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
fmt.Printf(" %s=%d\n", key, values[key])
}
}
func (r reference) key() string {
return r.Table + "." + r.Column
}
func referenceSet(values []reference) map[string]struct{} {
out := make(map[string]struct{}, len(values))
for _, value := range values {
out[value.key()] = struct{}{}
}
return out
}
func mergeReferences(groups ...[]reference) []reference {
seen := make(map[string]struct{})
out := make([]reference, 0)
for _, group := range groups {
for _, ref := range group {
if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) {
continue
}
if _, ok := seen[ref.key()]; ok {
continue
}
seen[ref.key()] = struct{}{}
out = append(out, ref)
}
}
sort.Slice(out, func(i, j int) bool {
if out[i].Table == out[j].Table {
return out[i].Column < out[j].Column
}
return out[i].Table < out[j].Table
})
return out
}
func mergeUintSlices(groups ...[]uint) []uint {
seen := make(map[uint]struct{})
out := make([]uint, 0)
for _, group := range groups {
for _, value := range group {
if value == 0 {
continue
}
if _, ok := seen[value]; ok {
continue
}
seen[value] = struct{}{}
out = append(out, value)
}
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func isSafeIdentifier(value string) bool {
return sqlIdentifierPattern.MatchString(strings.TrimSpace(value))
}
func quotedIdentifier(value string) string {
if !isSafeIdentifier(value) {
panic(fmt.Sprintf("unsafe SQL identifier: %s", value))
}
return `"` + value + `"`
}
func uniquePWMoves(rows []planRow) []pwMove {
seen := make(map[string]struct{})
out := make([]pwMove, 0)
for _, row := range rows {
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 || *row.AbsorbedPWID == row.SurvivorPWID {
continue
}
key := fmt.Sprintf("%d:%d", *row.AbsorbedPWID, row.SurvivorPWID)
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, pwMove{FromID: *row.AbsorbedPWID, ToID: row.SurvivorPWID})
}
return out
}
func uniqueWarehouseMoves(rows []planRow) []warehouseMove {
seen := make(map[uint]uint)
out := make([]warehouseMove, 0)
for _, row := range rows {
if _, ok := seen[row.WrongWarehouseID]; ok {
continue
}
seen[row.WrongWarehouseID] = row.CorrectWarehouseID
out = append(out, warehouseMove{
WrongWarehouseID: row.WrongWarehouseID,
CorrectWarehouseID: row.CorrectWarehouseID,
})
}
return out
}
func buildSurvivorWarehouseMap(rows []planRow) map[uint]uint {
out := make(map[uint]uint)
for _, row := range rows {
out[row.SurvivorPWID] = row.CorrectWarehouseID
}
return out
}
func uniqueWrongWarehouseIDs(rows []planRow) []uint {
seen := make(map[uint]struct{})
out := make([]uint, 0)
for _, row := range rows {
if _, ok := seen[row.WrongWarehouseID]; ok {
continue
}
seen[row.WrongWarehouseID] = struct{}{}
out = append(out, row.WrongWarehouseID)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func uniqueSurvivorIDs(rows []planRow) []uint {
seen := make(map[uint]struct{})
out := make([]uint, 0)
for _, row := range rows {
if _, ok := seen[row.SurvivorPWID]; ok {
continue
}
seen[row.SurvivorPWID] = struct{}{}
out = append(out, row.SurvivorPWID)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func uniqueAbsorbedIDs(rows []planRow) []uint {
seen := make(map[uint]struct{})
out := make([]uint, 0)
for _, row := range rows {
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 {
continue
}
if _, ok := seen[*row.AbsorbedPWID]; ok {
continue
}
seen[*row.AbsorbedPWID] = struct{}{}
out = append(out, *row.AbsorbedPWID)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func uniqueDuplicateSurvivorIDs(rows []planRow) []uint {
seen := make(map[uint]struct{})
out := make([]uint, 0)
for _, row := range rows {
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 {
continue
}
if _, ok := seen[row.SurvivorPWID]; ok {
continue
}
seen[row.SurvivorPWID] = struct{}{}
out = append(out, row.SurvivorPWID)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}
func andClause(filters []string) string {
if len(filters) == 0 {
return ""
}
return " AND " + strings.Join(filters, " AND ")
}
func displayOptionalUint(value *uint) string {
if value == nil || *value == 0 {
return "-"
}
return fmt.Sprintf("%d", *value)
}
func displayOptionalFloat(value *float64) string {
if value == nil {
return "-"
}
return fmt.Sprintf("%.3f", *value)
}