mirror of
https://gitlab.com/mbugroup/lti-api.git
synced 2026-05-20 13:31:56 +00:00
416 lines
12 KiB
Go
416 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"text/tabwriter"
|
|
|
|
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
|
|
"gitlab.com/mbugroup/lti-api.git/internal/config"
|
|
"gitlab.com/mbugroup/lti-api.git/internal/database"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
const (
|
|
outputTable = "table"
|
|
outputJSON = "json"
|
|
)
|
|
|
|
type options struct {
|
|
Apply bool
|
|
Output string
|
|
DBSSLMode string
|
|
AreaName string
|
|
}
|
|
|
|
type duplicateGroup struct {
|
|
WarehouseID uint `json:"warehouse_id"`
|
|
WarehouseName string `json:"warehouse_name"`
|
|
ProductID uint `json:"product_id"`
|
|
ProductName string `json:"product_name"`
|
|
AreaName string `json:"area_name"`
|
|
LocationName string `json:"location_name"`
|
|
|
|
SurvivorID uint `json:"survivor_id"`
|
|
SurvivorQty float64 `json:"survivor_qty"`
|
|
AbsorbedCount int `json:"absorbed_count"`
|
|
TotalMergedQty float64 `json:"total_merged_qty"`
|
|
AbsorbedIDs string `json:"absorbed_ids"`
|
|
}
|
|
|
|
type consolidateSummary struct {
|
|
TotalDuplicateGroups int `json:"total_duplicate_groups"`
|
|
TotalProductWarehouses int64 `json:"total_product_warehouses"`
|
|
UpdatedReferences map[string]int64 `json:"updated_references,omitempty"`
|
|
DeletedProductWarehouses int64 `json:"deleted_product_warehouses,omitempty"`
|
|
OverallStatus string `json:"overall_status"`
|
|
}
|
|
|
|
func main() {
|
|
opts, err := parseFlags()
|
|
if err != nil {
|
|
log.Fatalf("invalid flags: %v", err)
|
|
}
|
|
|
|
if opts.DBSSLMode != "" {
|
|
config.DBSSLMode = opts.DBSSLMode
|
|
}
|
|
|
|
ctx := context.Background()
|
|
db := database.Connect(config.DBHost, config.DBName)
|
|
|
|
// Find duplicate groups
|
|
groups, err := findDuplicateProductWarehouses(ctx, db, opts)
|
|
if err != nil {
|
|
log.Fatalf("failed to find duplicates: %v", err)
|
|
}
|
|
|
|
if len(groups) == 0 {
|
|
fmt.Println("No duplicate product_warehouses found")
|
|
return
|
|
}
|
|
|
|
summary := summarizeGroups(groups)
|
|
if !opts.Apply {
|
|
renderConsolidation(opts.Output, groups, summary)
|
|
return
|
|
}
|
|
|
|
applied, err := applyConsolidation(ctx, db, groups)
|
|
if err != nil {
|
|
log.Fatalf("apply failed: %v", err)
|
|
}
|
|
renderConsolidation(opts.Output, groups, applied)
|
|
}
|
|
|
|
func parseFlags() (*options, error) {
|
|
var opts options
|
|
flag.BoolVar(&opts.Apply, "apply", false, "Apply consolidation (omit for dry-run)")
|
|
flag.StringVar(&opts.Output, "output", outputTable, "Output format: table or json")
|
|
flag.StringVar(&opts.DBSSLMode, "db-sslmode", "", "Database sslmode override")
|
|
flag.StringVar(&opts.AreaName, "area-name", "", "Optional area filter")
|
|
flag.Parse()
|
|
|
|
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
|
|
opts.AreaName = strings.TrimSpace(opts.AreaName)
|
|
opts.DBSSLMode = strings.TrimSpace(opts.DBSSLMode)
|
|
|
|
if opts.Output == "" {
|
|
opts.Output = outputTable
|
|
}
|
|
if opts.Output != outputTable && opts.Output != outputJSON {
|
|
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
|
|
}
|
|
|
|
return &opts, nil
|
|
}
|
|
|
|
func findDuplicateProductWarehouses(ctx context.Context, db *gorm.DB, opts *options) ([]duplicateGroup, error) {
|
|
filters := ""
|
|
args := []any{}
|
|
if opts.AreaName != "" {
|
|
filters = "WHERE a.name = ?"
|
|
args = append(args, opts.AreaName)
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
WITH duplicates AS (
|
|
SELECT
|
|
pw.warehouse_id,
|
|
w.name AS warehouse_name,
|
|
pw.product_id,
|
|
p.name AS product_name,
|
|
COALESCE(a.name, 'N/A') AS area_name,
|
|
COALESCE(l.name, 'N/A') AS location_name,
|
|
pw.id,
|
|
pw.qty,
|
|
MIN(pw.id) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS survivor_id,
|
|
COUNT(*) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS duplicate_count,
|
|
SUM(pw.qty) OVER (PARTITION BY pw.warehouse_id, pw.product_id) AS total_qty
|
|
FROM product_warehouses pw
|
|
JOIN warehouses w ON w.id = pw.warehouse_id
|
|
JOIN products p ON p.id = pw.product_id
|
|
LEFT JOIN locations l ON l.id = w.location_id
|
|
LEFT JOIN areas a ON a.id = l.area_id
|
|
%s
|
|
)
|
|
SELECT
|
|
warehouse_id,
|
|
warehouse_name,
|
|
product_id,
|
|
product_name,
|
|
area_name,
|
|
location_name,
|
|
survivor_id,
|
|
(SELECT qty FROM duplicates d2 WHERE d2.id = survivor_id LIMIT 1) AS survivor_qty,
|
|
duplicate_count - 1 AS absorbed_count,
|
|
total_qty AS total_merged_qty,
|
|
STRING_AGG(id::text, ', ' ORDER BY id::text) FILTER (WHERE id <> survivor_id) AS absorbed_ids
|
|
FROM duplicates
|
|
WHERE duplicate_count > 1
|
|
GROUP BY warehouse_id, warehouse_name, product_id, product_name, area_name, location_name, survivor_id, total_qty, duplicate_count
|
|
ORDER BY area_name, location_name, warehouse_name, product_name
|
|
`, filters)
|
|
|
|
rows := make([]duplicateGroup, 0)
|
|
if err := db.WithContext(ctx).Raw(query, args...).Scan(&rows).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
func applyConsolidation(ctx context.Context, db *gorm.DB, groups []duplicateGroup) (consolidateSummary, error) {
|
|
summary := consolidateSummary{
|
|
TotalDuplicateGroups: len(groups),
|
|
UpdatedReferences: make(map[string]int64),
|
|
OverallStatus: "PASS",
|
|
}
|
|
|
|
fifoSvc := commonSvc.NewFifoStockV2Service(db, nil)
|
|
|
|
err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
for _, group := range groups {
|
|
absorbedIDs := []uint{}
|
|
if group.AbsorbedIDs != "" {
|
|
parts := strings.Split(group.AbsorbedIDs, ", ")
|
|
for _, p := range parts {
|
|
var id uint
|
|
fmt.Sscanf(p, "%d", &id)
|
|
absorbedIDs = append(absorbedIDs, id)
|
|
}
|
|
}
|
|
|
|
if len(absorbedIDs) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Update all references to point to survivor
|
|
refTables := []struct {
|
|
table string
|
|
column string
|
|
}{
|
|
{"stock_allocations", "product_warehouse_id"},
|
|
{"stock_logs", "product_warehouse_id"},
|
|
{"purchase_items", "product_warehouse_id"},
|
|
{"recording_stocks", "product_warehouse_id"},
|
|
{"recording_eggs", "product_warehouse_id"},
|
|
{"recording_depletions", "product_warehouse_id"},
|
|
{"recording_depletions", "source_product_warehouse_id"},
|
|
{"marketing_delivery_products", "product_warehouse_id"},
|
|
{"marketing_products", "product_warehouse_id"},
|
|
{"stock_transfer_details", "source_product_warehouse_id"},
|
|
{"stock_transfer_details", "dest_product_warehouse_id"},
|
|
{"adjustment_stocks", "product_warehouse_id"},
|
|
{"laying_transfer_sources", "product_warehouse_id"},
|
|
{"laying_transfer_targets", "product_warehouse_id"},
|
|
{"laying_transfers", "source_product_warehouse_id"},
|
|
{"project_chickin_details", "product_warehouse_id"},
|
|
{"project_chickins", "product_warehouse_id"},
|
|
{"project_flock_populations", "product_warehouse_id"},
|
|
{"fifo_stock_v2_operation_log", "product_warehouse_id"},
|
|
{"fifo_stock_v2_reflow_checkpoints", "product_warehouse_id"},
|
|
{"fifo_stock_v2_shadow_allocations", "product_warehouse_id"},
|
|
}
|
|
|
|
for _, ref := range refTables {
|
|
res := tx.WithContext(ctx).
|
|
Table(ref.table).
|
|
Where(fmt.Sprintf("%s IN ?", ref.column), absorbedIDs).
|
|
Update(ref.column, group.SurvivorID)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("update %s.%s: %w", ref.table, ref.column, res.Error)
|
|
}
|
|
if res.RowsAffected > 0 {
|
|
summary.UpdatedReferences[ref.table+"."+ref.column] += res.RowsAffected
|
|
}
|
|
}
|
|
|
|
// Update survivor qty to merged total
|
|
res := tx.WithContext(ctx).
|
|
Table("product_warehouses").
|
|
Where("id = ?", group.SurvivorID).
|
|
Update("qty", group.TotalMergedQty)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("update survivor qty: %w", res.Error)
|
|
}
|
|
|
|
// Delete absorbed product_warehouses
|
|
res = tx.WithContext(ctx).
|
|
Table("product_warehouses").
|
|
Where("id IN ?", absorbedIDs).
|
|
Delete(nil)
|
|
if res.Error != nil {
|
|
return fmt.Errorf("delete absorbed: %w", res.Error)
|
|
}
|
|
summary.DeletedProductWarehouses += res.RowsAffected
|
|
|
|
// Recalculate stock_logs for survivor
|
|
if err := recalculateStockLogs(ctx, tx, []uint{group.SurvivorID}); err != nil {
|
|
return fmt.Errorf("recalculate stock_logs: %w", err)
|
|
}
|
|
|
|
// Reflow and recalculate FIFO
|
|
if err := reflowProductWarehouse(ctx, fifoSvc, tx, group.SurvivorID); err != nil {
|
|
return fmt.Errorf("reflow product_warehouse %d: %w", group.SurvivorID, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
summary.OverallStatus = "FAIL"
|
|
return summary, err
|
|
}
|
|
|
|
return summary, nil
|
|
}
|
|
|
|
func recalculateStockLogs(ctx context.Context, tx *gorm.DB, productWarehouseIDs []uint) error {
|
|
if len(productWarehouseIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
query := `
|
|
WITH recalculated AS (
|
|
SELECT
|
|
id,
|
|
SUM(COALESCE(increase, 0) - COALESCE(decrease, 0))
|
|
OVER (PARTITION BY product_warehouse_id ORDER BY created_at ASC, id ASC) AS running_stock
|
|
FROM stock_logs
|
|
WHERE product_warehouse_id IN ?
|
|
)
|
|
UPDATE stock_logs sl
|
|
SET stock = recalculated.running_stock
|
|
FROM recalculated
|
|
WHERE sl.id = recalculated.id
|
|
`
|
|
return tx.WithContext(ctx).Exec(query, productWarehouseIDs).Error
|
|
}
|
|
|
|
func reflowProductWarehouse(ctx context.Context, fifoSvc commonSvc.FifoStockV2Service, tx *gorm.DB, productWarehouseID uint) error {
|
|
type row struct {
|
|
FlagGroupCode string `gorm:"column:flag_group_code"`
|
|
}
|
|
|
|
var selected row
|
|
err := tx.WithContext(ctx).
|
|
Table("fifo_stock_v2_route_rules rr").
|
|
Select("rr.flag_group_code").
|
|
Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE").
|
|
Where("rr.is_active = TRUE").
|
|
Where("rr.lane = ?", "STOCKABLE").
|
|
Where("rr.function_code = ?", "PURCHASE_IN").
|
|
Where("rr.source_table = ?", "purchase_items").
|
|
Where(`EXISTS (
|
|
SELECT 1
|
|
FROM product_warehouses pw
|
|
JOIN flags f ON f.flagable_id = pw.product_id
|
|
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
|
|
WHERE pw.id = ?
|
|
AND f.flagable_type = 'products'
|
|
AND fm.flag_group_code = rr.flag_group_code
|
|
)`, productWarehouseID).
|
|
Order("rr.id ASC").
|
|
Limit(1).
|
|
Take(&selected).Error
|
|
|
|
if err != nil && err != gorm.ErrRecordNotFound {
|
|
return err
|
|
}
|
|
|
|
if err == gorm.ErrRecordNotFound {
|
|
return nil
|
|
}
|
|
|
|
flagGroupCode := strings.TrimSpace(selected.FlagGroupCode)
|
|
|
|
if _, err := fifoSvc.Reflow(ctx, commonSvc.FifoStockV2ReflowRequest{
|
|
FlagGroupCode: flagGroupCode,
|
|
ProductWarehouseID: productWarehouseID,
|
|
Tx: tx,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := fifoSvc.Recalculate(ctx, commonSvc.FifoStockV2RecalculateRequest{
|
|
ProductWarehouseIDs: []uint{productWarehouseID},
|
|
FlagGroupCodes: []string{flagGroupCode},
|
|
FixDrift: true,
|
|
Tx: tx,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func summarizeGroups(groups []duplicateGroup) consolidateSummary {
|
|
var totalQty int64
|
|
for _, g := range groups {
|
|
totalQty += int64(g.AbsorbedCount)
|
|
}
|
|
|
|
return consolidateSummary{
|
|
TotalDuplicateGroups: len(groups),
|
|
TotalProductWarehouses: totalQty,
|
|
OverallStatus: "PASS",
|
|
}
|
|
}
|
|
|
|
func renderConsolidation(mode string, groups []duplicateGroup, summary consolidateSummary) {
|
|
if mode == outputJSON {
|
|
payload := map[string]any{
|
|
"groups": groups,
|
|
"summary": summary,
|
|
}
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
_ = enc.Encode(payload)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
|
|
fmt.Fprintln(w, "AREA\tLOCATION\tWAREHOUSE\tPRODUCT\tSURVIVOR_ID\tSURVIVOR_QTY\tABSORBED_COUNT\tTOTAL_MERGED_QTY\tABSORBED_IDS")
|
|
for _, g := range groups {
|
|
fmt.Fprintf(
|
|
w,
|
|
"%s\t%s\t%s\t%s\t%d\t%.3f\t%d\t%.3f\t%s\n",
|
|
g.AreaName,
|
|
g.LocationName,
|
|
g.WarehouseName,
|
|
g.ProductName,
|
|
g.SurvivorID,
|
|
g.SurvivorQty,
|
|
g.AbsorbedCount,
|
|
g.TotalMergedQty,
|
|
g.AbsorbedIDs,
|
|
)
|
|
}
|
|
_ = w.Flush()
|
|
|
|
fmt.Printf("\n=== SUMMARY ===\n")
|
|
fmt.Printf("Duplicate groups found: %d\n", summary.TotalDuplicateGroups)
|
|
fmt.Printf("Product warehouses to delete: %d\n", summary.TotalProductWarehouses)
|
|
fmt.Printf("Overall status: %s\n", summary.OverallStatus)
|
|
|
|
if len(summary.UpdatedReferences) > 0 {
|
|
fmt.Println("\nUpdated references:")
|
|
keys := make([]string, 0, len(summary.UpdatedReferences))
|
|
for k := range summary.UpdatedReferences {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
for _, k := range keys {
|
|
fmt.Printf(" %s=%d\n", k, summary.UpdatedReferences[k])
|
|
}
|
|
}
|
|
}
|