mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
1054 lines
31 KiB
Go
1054 lines
31 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
|
|
"gitlab.com/mbugroup/lti-api.git/internal/config"
|
|
"gitlab.com/mbugroup/lti-api.git/internal/database"
|
|
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
const (
|
|
outputTable = "table"
|
|
outputJSON = "json"
|
|
)
|
|
|
|
type options struct {
|
|
Apply bool
|
|
Output string
|
|
AreaName string
|
|
KandangLocationName string
|
|
DBSSLMode string
|
|
DeleteKandangWarehouses bool
|
|
SkipBlockedRefsCheck bool
|
|
SkipIncompleteLocations bool
|
|
}
|
|
|
|
type consolidateRow struct {
|
|
AreaName string `gorm:"column:area_name" json:"area_name"`
|
|
KandangLocationName string `gorm:"column:kandang_location_name" json:"kandang_location_name"`
|
|
KandangID uint `gorm:"column:kandang_id" json:"kandang_id"`
|
|
KandangName string `gorm:"column:kandang_name" json:"kandang_name"`
|
|
KandangWarehouseID uint `gorm:"column:kandang_warehouse_id" json:"kandang_warehouse_id"`
|
|
KandangWarehouseName string `gorm:"column:kandang_warehouse_name" json:"kandang_warehouse_name"`
|
|
FarmWarehouseID uint `gorm:"column:farm_warehouse_id" json:"farm_warehouse_id"`
|
|
FarmWarehouseName string `gorm:"column:farm_warehouse_name" json:"farm_warehouse_name"`
|
|
ProductID uint `gorm:"column:product_id" json:"product_id"`
|
|
ProductName string `gorm:"column:product_name" json:"product_name"`
|
|
ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id" json:"project_flock_kandang_id,omitempty"`
|
|
SurvivorPWID uint `gorm:"column:survivor_pw_id" json:"survivor_pw_id"`
|
|
SurvivorCurrentQty float64 `gorm:"column:survivor_current_qty" json:"survivor_current_qty"`
|
|
AbsorbedPWID *uint `gorm:"column:absorbed_pw_id" json:"absorbed_pw_id,omitempty"`
|
|
AbsorbedCurrentQty *float64 `gorm:"column:absorbed_current_qty" json:"absorbed_current_qty,omitempty"`
|
|
}
|
|
|
|
type pwMove struct {
|
|
FromID uint
|
|
ToID uint
|
|
}
|
|
|
|
type warehouseMove struct {
|
|
KandangWarehouseID uint
|
|
FarmWarehouseID uint
|
|
}
|
|
|
|
type consolidateSummary struct {
|
|
PlanRows int `json:"plan_rows"`
|
|
KandangWarehouses int `json:"kandang_warehouses"`
|
|
SurvivorProductWarehouses int `json:"survivor_product_warehouses"`
|
|
AbsorbedProductWarehouses int `json:"absorbed_product_warehouses"`
|
|
NeedsReflowProductWarehouses int `json:"needs_reflow_product_warehouses"`
|
|
UpdatedPWRefs map[string]int64 `json:"updated_pw_refs,omitempty"`
|
|
UpdatedWarehouseRefs map[string]int64 `json:"updated_warehouse_refs,omitempty"`
|
|
DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"`
|
|
SoftDeletedWarehouses int64 `json:"soft_deleted_warehouses,omitempty"`
|
|
}
|
|
|
|
type reference struct {
|
|
Table string
|
|
Column string
|
|
}
|
|
|
|
type referencePlan struct {
|
|
ProductWarehouseRefs []reference
|
|
WarehouseRefs []reference
|
|
BlockedWarehouseRefs []reference
|
|
}
|
|
|
|
var sqlIdentifierPattern = regexp.MustCompile(`^[a-z_][a-z0-9_]*$`)
|
|
|
|
var extraProductWarehouseRefs = []reference{
|
|
{Table: "fifo_stock_v2_operation_log", Column: "product_warehouse_id"},
|
|
{Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"},
|
|
{Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"},
|
|
{Table: "recording_depletions", Column: "source_product_warehouse_id"},
|
|
}
|
|
|
|
var allowedWarehouseRefs = []reference{
|
|
{Table: "product_warehouses", Column: "warehouse_id"},
|
|
{Table: "purchase_items", Column: "warehouse_id"},
|
|
}
|
|
|
|
var blockedWarehouseRefs = []reference{
|
|
{Table: "stock_transfers", Column: "from_warehouse_id"},
|
|
{Table: "stock_transfers", Column: "to_warehouse_id"},
|
|
}
|
|
|
|
func main() {
|
|
opts, err := parseFlags()
|
|
if err != nil {
|
|
log.Fatalf("invalid flags: %v", err)
|
|
}
|
|
|
|
if opts.DBSSLMode != "" {
|
|
config.DBSSLMode = opts.DBSSLMode
|
|
}
|
|
|
|
ctx := context.Background()
|
|
db := database.Connect(config.DBHost, config.DBName)
|
|
|
|
rows, err := loadConsolidateRows(ctx, db, opts)
|
|
if err != nil {
|
|
log.Fatalf("failed to load consolidate rows: %v", err)
|
|
}
|
|
|
|
if len(rows) == 0 {
|
|
fmt.Println("No kandang-level PAKAN/OVK stocks found to consolidate")
|
|
return
|
|
}
|
|
|
|
refs, err := buildReferencePlan(ctx, db)
|
|
if err != nil {
|
|
log.Fatalf("failed to inspect warehouse references: %v", err)
|
|
}
|
|
|
|
if err := runPrechecks(ctx, db, rows, refs, opts); err != nil {
|
|
log.Fatalf("precheck failed: %v", err)
|
|
}
|
|
|
|
summary := summarizeConsolidate(rows)
|
|
if !opts.Apply {
|
|
renderConsolidate(opts.Output, rows, summary)
|
|
return
|
|
}
|
|
|
|
applied, err := applyConsolidate(ctx, db, rows, opts, refs)
|
|
if err != nil {
|
|
log.Fatalf("apply failed: %v", err)
|
|
}
|
|
renderConsolidate(opts.Output, rows, applied)
|
|
}
|
|
|
|
func parseFlags() (*options, error) {
|
|
var opts options
|
|
flag.BoolVar(&opts.Apply, "apply", false, "Apply the consolidation. If false, run as dry-run")
|
|
flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json")
|
|
flag.StringVar(&opts.AreaName, "area-name", "", "Optional exact area name filter")
|
|
flag.StringVar(&opts.KandangLocationName, "kandang-location-name", "", "Optional exact canonical kandang location filter")
|
|
flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Optional database sslmode override, for example: require")
|
|
flag.BoolVar(&opts.DeleteKandangWarehouses, "delete-kandang-warehouses", true, "Soft delete kandang warehouse rows after all stocks have been moved")
|
|
flag.BoolVar(&opts.SkipBlockedRefsCheck, "skip-blocked-refs-check", false, "Skip blocked references check (use with caution - only if you understand the stock_transfers references)")
|
|
flag.BoolVar(&opts.SkipIncompleteLocations, "skip-incomplete-locations", false, "Skip locations that don't have farm-level warehouses and process the rest")
|
|
flag.Parse()
|
|
|
|
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
|
|
opts.AreaName = strings.TrimSpace(opts.AreaName)
|
|
opts.KandangLocationName = strings.TrimSpace(opts.KandangLocationName)
|
|
opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode)
|
|
|
|
if opts.Output == "" {
|
|
opts.Output = outputTable
|
|
}
|
|
if opts.Output != outputTable && opts.Output != outputJSON {
|
|
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
|
|
}
|
|
|
|
return &opts, nil
|
|
}
|
|
|
|
func loadConsolidateRows(ctx context.Context, db *gorm.DB, opts *options) ([]consolidateRow, error) {
|
|
filters := make([]string, 0, 2)
|
|
args := make([]any, 0, 2)
|
|
if opts.AreaName != "" {
|
|
filters = append(filters, "a.name = ?")
|
|
args = append(args, opts.AreaName)
|
|
}
|
|
if opts.KandangLocationName != "" {
|
|
filters = append(filters, "kl.name = ?")
|
|
args = append(args, opts.KandangLocationName)
|
|
}
|
|
|
|
// If skipping incomplete locations, filter out NULL farm warehouses
|
|
whereClause := ""
|
|
if opts.SkipIncompleteLocations {
|
|
whereClause = "AND fw.id IS NOT NULL"
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
a.name AS area_name,
|
|
kl.name AS kandang_location_name,
|
|
k.id AS kandang_id,
|
|
k.name AS kandang_name,
|
|
w.id AS kandang_warehouse_id,
|
|
w.name AS kandang_warehouse_name,
|
|
fw.id AS farm_warehouse_id,
|
|
fw.name AS farm_warehouse_name,
|
|
p.id AS product_id,
|
|
p.name AS product_name,
|
|
wp.project_flock_kandang_id,
|
|
wp.id AS survivor_pw_id,
|
|
COALESCE(wp.qty, 0) AS survivor_current_qty,
|
|
fpw.id AS absorbed_pw_id,
|
|
fpw.qty AS absorbed_current_qty
|
|
FROM warehouses w
|
|
JOIN kandangs k
|
|
ON k.id = w.kandang_id
|
|
AND k.deleted_at IS NULL
|
|
JOIN locations kl
|
|
ON kl.id = k.location_id
|
|
JOIN areas a
|
|
ON a.id = kl.area_id
|
|
JOIN LATERAL (
|
|
SELECT w2.id, w2.name
|
|
FROM warehouses w2
|
|
WHERE w2.location_id = k.location_id
|
|
AND UPPER(COALESCE(w2.type, '')) = 'LOKASI'
|
|
AND w2.deleted_at IS NULL
|
|
ORDER BY w2.id ASC
|
|
LIMIT 1
|
|
) AS fw ON TRUE
|
|
JOIN product_warehouses wp
|
|
ON wp.warehouse_id = w.id
|
|
JOIN products p
|
|
ON p.id = wp.product_id
|
|
JOIN flags f
|
|
ON f.flagable_id = p.id
|
|
AND f.flagable_type = 'products'
|
|
AND UPPER(f.name) IN ('PAKAN', 'OVK')
|
|
LEFT JOIN product_warehouses fpw
|
|
ON fpw.product_id = wp.product_id
|
|
AND fpw.warehouse_id = fw.id
|
|
AND fpw.project_flock_kandang_id IS NOT DISTINCT FROM wp.project_flock_kandang_id
|
|
WHERE w.deleted_at IS NULL
|
|
AND w.kandang_id IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM stock_allocations sa
|
|
WHERE sa.stockable_type = 'PURCHASE_ITEMS'
|
|
AND sa.stockable_id IN (
|
|
SELECT pi.id
|
|
FROM purchase_items pi
|
|
WHERE pi.warehouse_id = w.id
|
|
AND pi.product_id = p.id
|
|
)
|
|
AND sa.status = 'ACTIVE'
|
|
AND sa.allocation_purpose = 'CONSUME'
|
|
AND sa.deleted_at IS NULL
|
|
)
|
|
%s
|
|
%s
|
|
ORDER BY a.name ASC, kl.name ASC, k.name ASC, wp.id ASC
|
|
`,
|
|
andClause(filters),
|
|
whereClause)
|
|
|
|
rows := make([]consolidateRow, 0)
|
|
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return rows, nil
|
|
}
|
|
|
|
func buildReferencePlan(ctx context.Context, db *gorm.DB) (*referencePlan, error) {
|
|
productWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "product_warehouses")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
productWarehouseRefs = mergeReferences(productWarehouseRefs, extraProductWarehouseRefs)
|
|
if err := ensureHandledColumns(ctx, db, "%product_warehouse_id%", productWarehouseRefs, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
discoveredWarehouseRefs, err := discoverSingleColumnFKReferences(ctx, db, "warehouses")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
allowedWarehouseSet := referenceSet(allowedWarehouseRefs)
|
|
blockedWarehouseSet := referenceSet(blockedWarehouseRefs)
|
|
|
|
warehouseRefs := make([]reference, 0, len(discoveredWarehouseRefs))
|
|
for _, ref := range discoveredWarehouseRefs {
|
|
key := ref.key()
|
|
if _, ok := allowedWarehouseSet[key]; ok {
|
|
warehouseRefs = append(warehouseRefs, ref)
|
|
continue
|
|
}
|
|
if _, ok := blockedWarehouseSet[key]; ok {
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("unsupported warehouse foreign-key reference discovered: %s", key)
|
|
}
|
|
if err := ensureHandledColumns(ctx, db, "%warehouse_id%", mergeReferences(warehouseRefs, blockedWarehouseRefs), []string{"%product_warehouse_id%"}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &referencePlan{
|
|
ProductWarehouseRefs: productWarehouseRefs,
|
|
WarehouseRefs: warehouseRefs,
|
|
BlockedWarehouseRefs: append([]reference(nil), blockedWarehouseRefs...),
|
|
}, nil
|
|
}
|
|
|
|
func runPrechecks(ctx context.Context, db *gorm.DB, rows []consolidateRow, refs *referencePlan, opts *options) error {
|
|
// Only check blocked references if we're actually deleting the warehouses
|
|
if opts.DeleteKandangWarehouses && !opts.SkipBlockedRefsCheck {
|
|
if err := ensureNoBlockedWarehouseRefsConsolidate(ctx, db, rows, refs.BlockedWarehouseRefs); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := ensureNoPurchaseItemWarehouseConflictsConsolidate(ctx, db, rows); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureNoInFlightFifoArtifactsConsolidate(ctx, db, rows); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func discoverSingleColumnFKReferences(ctx context.Context, db *gorm.DB, referencedTable string) ([]reference, error) {
|
|
type countRow struct {
|
|
Count int64 `gorm:"column:cnt"`
|
|
}
|
|
|
|
var multiColumn countRow
|
|
if err := db.WithContext(ctx).Raw(`
|
|
SELECT COUNT(*) AS cnt
|
|
FROM pg_constraint c
|
|
JOIN pg_class ref ON ref.oid = c.confrelid
|
|
JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace
|
|
WHERE c.contype = 'f'
|
|
AND ref_ns.nspname = 'public'
|
|
AND ref.relname = ?
|
|
AND COALESCE(array_length(c.conkey, 1), 0) <> 1
|
|
`, referencedTable).Scan(&multiColumn).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
if multiColumn.Count > 0 {
|
|
return nil, fmt.Errorf("table %s has %d multi-column foreign keys; command only supports single-column rewrites", referencedTable, multiColumn.Count)
|
|
}
|
|
|
|
type row struct {
|
|
Table string `gorm:"column:table_name"`
|
|
Column string `gorm:"column:column_name"`
|
|
}
|
|
selected := make([]row, 0)
|
|
if err := db.WithContext(ctx).Raw(`
|
|
SELECT
|
|
src.relname AS table_name,
|
|
src_att.attname AS column_name
|
|
FROM pg_constraint c
|
|
JOIN pg_class src ON src.oid = c.conrelid
|
|
JOIN pg_namespace src_ns ON src_ns.oid = src.relnamespace
|
|
JOIN pg_class ref ON ref.oid = c.confrelid
|
|
JOIN pg_namespace ref_ns ON ref_ns.oid = ref.relnamespace
|
|
JOIN pg_attribute src_att ON src_att.attrelid = src.oid AND src_att.attnum = c.conkey[1]
|
|
WHERE c.contype = 'f'
|
|
AND src_ns.nspname = 'public'
|
|
AND ref_ns.nspname = 'public'
|
|
AND ref.relname = ?
|
|
AND COALESCE(array_length(c.conkey, 1), 0) = 1
|
|
ORDER BY src.relname ASC, src_att.attname ASC
|
|
`, referencedTable).Scan(&selected).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
refs := make([]reference, 0, len(selected))
|
|
for _, item := range selected {
|
|
ref := reference{
|
|
Table: strings.TrimSpace(item.Table),
|
|
Column: strings.TrimSpace(item.Column),
|
|
}
|
|
if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) {
|
|
return nil, fmt.Errorf("unsafe identifier discovered while inspecting %s references: %s.%s", referencedTable, ref.Table, ref.Column)
|
|
}
|
|
refs = append(refs, ref)
|
|
}
|
|
return refs, nil
|
|
}
|
|
|
|
func ensureHandledColumns(
|
|
ctx context.Context,
|
|
db *gorm.DB,
|
|
columnPattern string,
|
|
handled []reference,
|
|
excludePatterns []string,
|
|
) error {
|
|
type row struct {
|
|
Table string `gorm:"column:table_name"`
|
|
Column string `gorm:"column:column_name"`
|
|
}
|
|
|
|
query := `
|
|
SELECT table_name, column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND column_name LIKE ?
|
|
`
|
|
args := []any{columnPattern}
|
|
for _, pattern := range excludePatterns {
|
|
query += " AND column_name NOT LIKE ?\n"
|
|
args = append(args, pattern)
|
|
}
|
|
query += "ORDER BY table_name ASC, column_name ASC"
|
|
|
|
rows := make([]row, 0)
|
|
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
handledSet := referenceSet(handled)
|
|
for _, item := range rows {
|
|
ref := reference{
|
|
Table: strings.TrimSpace(item.Table),
|
|
Column: strings.TrimSpace(item.Column),
|
|
}
|
|
if _, ok := handledSet[ref.key()]; ok {
|
|
continue
|
|
}
|
|
return fmt.Errorf("unhandled warehouse-related column discovered: %s", ref.key())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureNoBlockedWarehouseRefsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow, blocked []reference) error {
|
|
kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows)
|
|
if len(kandangWarehouseIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, ref := range blocked {
|
|
var count int64
|
|
if err := db.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), kandangWarehouseIDs).
|
|
Count(&count).Error; err != nil {
|
|
return err
|
|
}
|
|
if count > 0 {
|
|
return fmt.Errorf("found %d rows in %s.%s referencing the kandang warehouse ids; aborting", count, ref.Table, ref.Column)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureNoPurchaseItemWarehouseConflictsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow) error {
|
|
kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows)
|
|
if len(kandangWarehouseIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
type countRow struct {
|
|
Count int64 `gorm:"column:cnt"`
|
|
}
|
|
|
|
var row countRow
|
|
if err := db.WithContext(ctx).Raw(`
|
|
WITH kandang_pairs AS (
|
|
SELECT
|
|
pi.id AS kandang_purchase_item_id,
|
|
pi.purchase_id,
|
|
pi.product_id,
|
|
pi.warehouse_id AS kandang_warehouse_id,
|
|
fw.id AS farm_warehouse_id
|
|
FROM purchase_items pi
|
|
JOIN warehouses w
|
|
ON w.id = pi.warehouse_id
|
|
AND w.deleted_at IS NULL
|
|
JOIN kandangs k
|
|
ON k.id = w.kandang_id
|
|
AND k.deleted_at IS NULL
|
|
JOIN LATERAL (
|
|
SELECT w2.id, w2.name
|
|
FROM warehouses w2
|
|
WHERE w2.location_id = k.location_id
|
|
AND UPPER(COALESCE(w2.type, '')) = 'LOKASI'
|
|
AND w2.deleted_at IS NULL
|
|
ORDER BY w2.id ASC
|
|
LIMIT 1
|
|
) AS fw ON TRUE
|
|
WHERE pi.warehouse_id IN ?
|
|
AND w.kandang_id IS NOT NULL
|
|
)
|
|
SELECT COUNT(*) AS cnt
|
|
FROM kandang_pairs kp
|
|
JOIN purchase_items target
|
|
ON target.purchase_id = kp.purchase_id
|
|
AND target.product_id = kp.product_id
|
|
AND target.warehouse_id = kp.farm_warehouse_id
|
|
`, kandangWarehouseIDs).Scan(&row).Error; err != nil {
|
|
return err
|
|
}
|
|
if row.Count > 0 {
|
|
return fmt.Errorf("found %d purchase_item uniqueness collisions on (purchase_id, product_id, warehouse_id); aborting", row.Count)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureNoInFlightFifoArtifactsConsolidate(ctx context.Context, db *gorm.DB, rows []consolidateRow) error {
|
|
affectedPWIDs := mergeUintSlices(uniqueSurvivorIDsConsolidate(rows), uniqueAbsorbedIDsConsolidate(rows))
|
|
if len(affectedPWIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, ref := range []reference{
|
|
{Table: "fifo_stock_v2_reflow_checkpoints", Column: "product_warehouse_id"},
|
|
{Table: "fifo_stock_v2_shadow_allocations", Column: "product_warehouse_id"},
|
|
} {
|
|
var count int64
|
|
if err := db.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), affectedPWIDs).
|
|
Count(&count).Error; err != nil {
|
|
return err
|
|
}
|
|
if count > 0 {
|
|
return fmt.Errorf("found %d in-flight FIFO rows in %s for affected product warehouses; aborting", count, ref.Table)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func applyConsolidate(
|
|
ctx context.Context,
|
|
db *gorm.DB,
|
|
rows []consolidateRow,
|
|
opts *options,
|
|
refs *referencePlan,
|
|
) (consolidateSummary, error) {
|
|
summary := summarizeConsolidate(rows)
|
|
summary.UpdatedPWRefs = make(map[string]int64)
|
|
summary.UpdatedWarehouseRefs = make(map[string]int64)
|
|
|
|
pwMoves := uniquePWMovesConsolidate(rows)
|
|
warehouseMoves := uniqueWarehouseMovesConsolidate(rows)
|
|
survivorWarehouseMap := buildSurvivorWarehouseMapConsolidate(rows)
|
|
duplicateSurvivors := uniqueDuplicateSurvivorIDsConsolidate(rows)
|
|
|
|
fifoSvc := commonSvc.NewFifoStockV2Service(db, nil)
|
|
now := time.Now().UTC()
|
|
|
|
err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
for _, move := range pwMoves {
|
|
for _, ref := range refs.ProductWarehouseRefs {
|
|
res := tx.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.FromID).
|
|
Update(ref.Column, move.ToID)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("move %s from pw %d to %d: %w", ref.Table+"."+ref.Column, move.FromID, move.ToID, res.Error)
|
|
}
|
|
if res.RowsAffected > 0 {
|
|
summary.UpdatedPWRefs[ref.Table+"."+ref.Column] += res.RowsAffected
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(pwMoves) > 0 {
|
|
fromIDs := make([]uint, 0, len(pwMoves))
|
|
for _, move := range pwMoves {
|
|
fromIDs = append(fromIDs, move.FromID)
|
|
}
|
|
res := tx.WithContext(ctx).
|
|
Where("id IN ?", fromIDs).
|
|
Delete(&entity.ProductWarehouse{})
|
|
if res.Error != nil {
|
|
return fmt.Errorf("delete absorbed product warehouses: %w", res.Error)
|
|
}
|
|
summary.DeletedProductWarehouses = res.RowsAffected
|
|
}
|
|
|
|
for survivorID, farmWarehouseID := range survivorWarehouseMap {
|
|
res := tx.WithContext(ctx).
|
|
Table("product_warehouses").
|
|
Where("id = ?", survivorID).
|
|
Update("warehouse_id", farmWarehouseID)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("update survivor product_warehouse %d -> warehouse %d: %w", survivorID, farmWarehouseID, res.Error)
|
|
}
|
|
if res.RowsAffected > 0 {
|
|
summary.UpdatedWarehouseRefs["product_warehouses.warehouse_id"] += res.RowsAffected
|
|
}
|
|
}
|
|
|
|
for _, move := range warehouseMoves {
|
|
for _, ref := range refs.WarehouseRefs {
|
|
res := tx.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s = ?", quotedIdentifier(ref.Column)), move.KandangWarehouseID).
|
|
Update(ref.Column, move.FarmWarehouseID)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("update %s from warehouse %d to %d: %w", ref.Table+"."+ref.Column, move.KandangWarehouseID, move.FarmWarehouseID, res.Error)
|
|
}
|
|
if res.RowsAffected > 0 {
|
|
summary.UpdatedWarehouseRefs[ref.Table+"."+ref.Column] += res.RowsAffected
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(duplicateSurvivors) > 0 {
|
|
if err := recomputeStockLogsConsolidate(ctx, tx, duplicateSurvivors); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, survivorID := range duplicateSurvivors {
|
|
if err := reflowAndRecalculateProductWarehouse(ctx, fifoSvc, tx, survivorID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if opts.DeleteKandangWarehouses {
|
|
kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows)
|
|
if len(kandangWarehouseIDs) > 0 {
|
|
res := tx.WithContext(ctx).
|
|
Table("warehouses").
|
|
Where("id IN ? AND deleted_at IS NULL", kandangWarehouseIDs).
|
|
Updates(map[string]any{
|
|
"deleted_at": now,
|
|
"updated_at": now,
|
|
})
|
|
if res.Error != nil {
|
|
return fmt.Errorf("soft delete kandang warehouses: %w", res.Error)
|
|
}
|
|
summary.SoftDeletedWarehouses = res.RowsAffected
|
|
}
|
|
}
|
|
|
|
if err := verifyNoKandangWarehouseRefsRemain(ctx, tx, rows, pwMoves, refs); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return consolidateSummary{}, err
|
|
}
|
|
|
|
return summary, nil
|
|
}
|
|
|
|
func recomputeStockLogsConsolidate(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error {
|
|
if len(productWarehouseIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
query := `
|
|
WITH recalculated AS (
|
|
SELECT
|
|
id,
|
|
SUM(COALESCE(increase, 0) - COALESCE(decrease, 0))
|
|
OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock
|
|
FROM stock_logs
|
|
WHERE product_warehouse_id IN ?
|
|
)
|
|
UPDATE stock_logs sl
|
|
SET stock = recalculated.running_stock
|
|
FROM recalculated
|
|
WHERE sl.id = recalculated.id
|
|
`
|
|
return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error
|
|
}
|
|
|
|
func verifyNoKandangWarehouseRefsRemain(
|
|
ctx context.Context,
|
|
tx *gorm.DB,
|
|
rows []consolidateRow,
|
|
pwMoves []pwMove,
|
|
refs *referencePlan,
|
|
) error {
|
|
kandangWarehouseIDs := uniqueKandangWarehouseIDs(rows)
|
|
if len(kandangWarehouseIDs) > 0 {
|
|
for _, ref := range refs.WarehouseRefs {
|
|
var remaining int64
|
|
if err := tx.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), kandangWarehouseIDs).
|
|
Count(&remaining).Error; err != nil {
|
|
return err
|
|
}
|
|
if remaining > 0 {
|
|
return fmt.Errorf("verification failed: %d rows still point to kandang warehouses via %s.%s", remaining, ref.Table, ref.Column)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(pwMoves) > 0 {
|
|
fromIDs := make([]uint, 0, len(pwMoves))
|
|
for _, move := range pwMoves {
|
|
fromIDs = append(fromIDs, move.FromID)
|
|
}
|
|
|
|
var remaining int64
|
|
for _, ref := range refs.ProductWarehouseRefs {
|
|
if err := tx.WithContext(ctx).
|
|
Table(ref.Table).
|
|
Where(fmt.Sprintf("%s IN ?", quotedIdentifier(ref.Column)), fromIDs).
|
|
Count(&remaining).Error; err != nil {
|
|
return err
|
|
}
|
|
if remaining > 0 {
|
|
return fmt.Errorf("verification failed: %d rows still point to absorbed product_warehouses via %s.%s", remaining, ref.Table, ref.Column)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func reflowAndRecalculateProductWarehouse(
|
|
ctx context.Context,
|
|
fifoSvc commonSvc.FifoStockV2Service,
|
|
tx *gorm.DB,
|
|
productWarehouseID uint,
|
|
) error {
|
|
flagGroupCode, err := resolveFlagGroupByProductWarehouse(ctx, tx, productWarehouseID)
|
|
if err != nil {
|
|
return fmt.Errorf("resolve flag group for product_warehouse %d: %w", productWarehouseID, err)
|
|
}
|
|
|
|
if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{
|
|
FlagGroupCode: flagGroupCode,
|
|
ProductWarehouseID: productWarehouseID,
|
|
Tx: tx,
|
|
}); err != nil {
|
|
return fmt.Errorf("reflow product_warehouse %d: %w", productWarehouseID, err)
|
|
}
|
|
|
|
if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{
|
|
ProductWarehouseIDs: []uint{productWarehouseID},
|
|
FlagGroupCodes: []string{flagGroupCode},
|
|
FixDrift: true,
|
|
Tx: tx,
|
|
}); err != nil {
|
|
return fmt.Errorf("recalculate product_warehouse %d: %w", productWarehouseID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func resolveFlagGroupByProductWarehouse(ctx context.Context, tx *gorm.DB, productWarehouseID uint) (string, error) {
|
|
type row struct {
|
|
FlagGroupCode string `gorm:"column:flag_group_code"`
|
|
}
|
|
|
|
var selected row
|
|
err := tx.WithContext(ctx).
|
|
Table("fifo_stock_v2_route_rules rr").
|
|
Select("rr.flag_group_code").
|
|
Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE").
|
|
Where("rr.is_active = TRUE").
|
|
Where("rr.lane = ?", "STOCKABLE").
|
|
Where("rr.function_code = ?", "PURCHASE_IN").
|
|
Where("rr.source_table = ?", "purchase_items").
|
|
Where(`
|
|
EXISTS (
|
|
SELECT 1
|
|
FROM product_warehouses pw
|
|
JOIN flags f ON f.flagable_id = pw.product_id
|
|
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
|
|
WHERE pw.id = ?
|
|
AND f.flagable_type = ?
|
|
AND fm.flag_group_code = rr.flag_group_code
|
|
)
|
|
`, productWarehouseID, entity.FlagableTypeProduct).
|
|
Order("rr.id ASC").
|
|
Limit(1).
|
|
Take(&selected).Error
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return strings.TrimSpace(selected.FlagGroupCode), nil
|
|
}
|
|
|
|
func summarizeConsolidate(rows []consolidateRow) consolidateSummary {
|
|
summary := consolidateSummary{
|
|
PlanRows: len(rows),
|
|
KandangWarehouses: len(uniqueKandangWarehouseIDs(rows)),
|
|
SurvivorProductWarehouses: len(uniqueSurvivorIDsConsolidate(rows)),
|
|
AbsorbedProductWarehouses: len(uniqueAbsorbedIDsConsolidate(rows)),
|
|
NeedsReflowProductWarehouses: len(uniqueDuplicateSurvivorIDsConsolidate(rows)),
|
|
}
|
|
return summary
|
|
}
|
|
|
|
func renderConsolidate(mode string, rows []consolidateRow, summary consolidateSummary) {
|
|
if mode == outputJSON {
|
|
payload := map[string]any{
|
|
"rows": rows,
|
|
"summary": summary,
|
|
}
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
_ = enc.Encode(payload)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
|
|
fmt.Fprintln(w, "AREA\tLOKASI\tKANDANG\tKANDANG_WAREHOUSE\tFARM_WAREHOUSE\tPRODUCT\tPFK_ID\tSURVIVOR_PW\tSURVIVOR_QTY\tABSORBED_PW\tABSORBED_QTY\tNEEDS_REFLOW")
|
|
for _, row := range rows {
|
|
fmt.Fprintf(
|
|
w,
|
|
"%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%.3f\t%s\t%s\t%t\n",
|
|
row.AreaName,
|
|
row.KandangLocationName,
|
|
row.KandangName,
|
|
row.KandangWarehouseName,
|
|
row.FarmWarehouseName,
|
|
row.ProductName,
|
|
displayOptionalUint(row.ProjectFlockKandangID),
|
|
row.SurvivorPWID,
|
|
row.SurvivorCurrentQty,
|
|
displayOptionalUint(row.AbsorbedPWID),
|
|
displayOptionalFloat(row.AbsorbedCurrentQty),
|
|
row.AbsorbedPWID != nil,
|
|
)
|
|
}
|
|
_ = w.Flush()
|
|
|
|
fmt.Printf(
|
|
"\nSummary: plan_rows=%d kandang_whs=%d survivor_pws=%d absorbed_pws=%d needs_reflow_pws=%d deleted_product_warehouses=%d soft_deleted_warehouses=%d\n",
|
|
summary.PlanRows,
|
|
summary.KandangWarehouses,
|
|
summary.SurvivorProductWarehouses,
|
|
summary.AbsorbedProductWarehouses,
|
|
summary.NeedsReflowProductWarehouses,
|
|
summary.DeletedProductWarehouses,
|
|
summary.SoftDeletedWarehouses,
|
|
)
|
|
|
|
if len(summary.UpdatedPWRefs) > 0 {
|
|
fmt.Println("Updated product_warehouse refs:")
|
|
printSortedCounts(summary.UpdatedPWRefs)
|
|
}
|
|
if len(summary.UpdatedWarehouseRefs) > 0 {
|
|
fmt.Println("Updated warehouse refs:")
|
|
printSortedCounts(summary.UpdatedWarehouseRefs)
|
|
}
|
|
}
|
|
|
|
func printSortedCounts(values map[string]int64) {
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
for _, key := range keys {
|
|
fmt.Printf(" %s=%d\n", key, values[key])
|
|
}
|
|
}
|
|
|
|
func (r reference) key() string {
|
|
return r.Table + "." + r.Column
|
|
}
|
|
|
|
func referenceSet(values []reference) map[string]struct{} {
|
|
out := make(map[string]struct{}, len(values))
|
|
for _, value := range values {
|
|
out[value.key()] = struct{}{}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mergeReferences(groups ...[]reference) []reference {
|
|
seen := make(map[string]struct{})
|
|
out := make([]reference, 0)
|
|
for _, group := range groups {
|
|
for _, ref := range group {
|
|
if !isSafeIdentifier(ref.Table) || !isSafeIdentifier(ref.Column) {
|
|
continue
|
|
}
|
|
if _, ok := seen[ref.key()]; ok {
|
|
continue
|
|
}
|
|
seen[ref.key()] = struct{}{}
|
|
out = append(out, ref)
|
|
}
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].Table == out[j].Table {
|
|
return out[i].Column < out[j].Column
|
|
}
|
|
return out[i].Table < out[j].Table
|
|
})
|
|
return out
|
|
}
|
|
|
|
func mergeUintSlices(groups ...[]uint) []uint {
|
|
seen := make(map[uint]struct{})
|
|
out := make([]uint, 0)
|
|
for _, group := range groups {
|
|
for _, value := range group {
|
|
if value == 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[value]; ok {
|
|
continue
|
|
}
|
|
seen[value] = struct{}{}
|
|
out = append(out, value)
|
|
}
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
|
return out
|
|
}
|
|
|
|
func isSafeIdentifier(value string) bool {
|
|
return sqlIdentifierPattern.MatchString(strings.TrimSpace(value))
|
|
}
|
|
|
|
func quotedIdentifier(value string) string {
|
|
if !isSafeIdentifier(value) {
|
|
panic(fmt.Sprintf("unsafe SQL identifier: %s", value))
|
|
}
|
|
return `"` + value + `"`
|
|
}
|
|
|
|
func uniquePWMovesConsolidate(rows []consolidateRow) []pwMove {
|
|
seen := make(map[string]struct{})
|
|
out := make([]pwMove, 0)
|
|
for _, row := range rows {
|
|
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 || *row.AbsorbedPWID == row.SurvivorPWID {
|
|
continue
|
|
}
|
|
key := fmt.Sprintf("%d:%d", *row.AbsorbedPWID, row.SurvivorPWID)
|
|
if _, ok := seen[key]; ok {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
out = append(out, pwMove{FromID: *row.AbsorbedPWID, ToID: row.SurvivorPWID})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func uniqueWarehouseMovesConsolidate(rows []consolidateRow) []warehouseMove {
|
|
seen := make(map[uint]uint)
|
|
out := make([]warehouseMove, 0)
|
|
for _, row := range rows {
|
|
if _, ok := seen[row.KandangWarehouseID]; ok {
|
|
continue
|
|
}
|
|
seen[row.KandangWarehouseID] = row.FarmWarehouseID
|
|
out = append(out, warehouseMove{
|
|
KandangWarehouseID: row.KandangWarehouseID,
|
|
FarmWarehouseID: row.FarmWarehouseID,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func buildSurvivorWarehouseMapConsolidate(rows []consolidateRow) map[uint]uint {
|
|
out := make(map[uint]uint)
|
|
for _, row := range rows {
|
|
out[row.SurvivorPWID] = row.FarmWarehouseID
|
|
}
|
|
return out
|
|
}
|
|
|
|
func uniqueKandangWarehouseIDs(rows []consolidateRow) []uint {
|
|
seen := make(map[uint]struct{})
|
|
out := make([]uint, 0)
|
|
for _, row := range rows {
|
|
if _, ok := seen[row.KandangWarehouseID]; ok {
|
|
continue
|
|
}
|
|
seen[row.KandangWarehouseID] = struct{}{}
|
|
out = append(out, row.KandangWarehouseID)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
|
return out
|
|
}
|
|
|
|
func uniqueSurvivorIDsConsolidate(rows []consolidateRow) []uint {
|
|
seen := make(map[uint]struct{})
|
|
out := make([]uint, 0)
|
|
for _, row := range rows {
|
|
if _, ok := seen[row.SurvivorPWID]; ok {
|
|
continue
|
|
}
|
|
seen[row.SurvivorPWID] = struct{}{}
|
|
out = append(out, row.SurvivorPWID)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
|
return out
|
|
}
|
|
|
|
func uniqueAbsorbedIDsConsolidate(rows []consolidateRow) []uint {
|
|
seen := make(map[uint]struct{})
|
|
out := make([]uint, 0)
|
|
for _, row := range rows {
|
|
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[*row.AbsorbedPWID]; ok {
|
|
continue
|
|
}
|
|
seen[*row.AbsorbedPWID] = struct{}{}
|
|
out = append(out, *row.AbsorbedPWID)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
|
return out
|
|
}
|
|
|
|
func uniqueDuplicateSurvivorIDsConsolidate(rows []consolidateRow) []uint {
|
|
seen := make(map[uint]struct{})
|
|
out := make([]uint, 0)
|
|
for _, row := range rows {
|
|
if row.AbsorbedPWID == nil || *row.AbsorbedPWID == 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[row.SurvivorPWID]; ok {
|
|
continue
|
|
}
|
|
seen[row.SurvivorPWID] = struct{}{}
|
|
out = append(out, row.SurvivorPWID)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
|
return out
|
|
}
|
|
|
|
func andClause(filters []string) string {
|
|
if len(filters) == 0 {
|
|
return ""
|
|
}
|
|
return " AND " + strings.Join(filters, " AND ")
|
|
}
|
|
|
|
func displayOptionalUint(value *uint) string {
|
|
if value == nil || *value == 0 {
|
|
return "-"
|
|
}
|
|
return fmt.Sprintf("%d", *value)
|
|
}
|
|
|
|
func displayOptionalFloat(value *float64) string {
|
|
if value == nil {
|
|
return "-"
|
|
}
|
|
return fmt.Sprintf("%.3f", *value)
|
|
}
|