package main import ( "context" "flag" "fmt" "log" "math" "os" "sort" "strings" "time" commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service" "gitlab.com/mbugroup/lti-api.git/internal/config" "gitlab.com/mbugroup/lti-api.git/internal/database" entity "gitlab.com/mbugroup/lti-api.git/internal/entities" "gitlab.com/mbugroup/lti-api.git/internal/utils/fifo" "gorm.io/gorm" ) type productWarehouseScopeRow struct { ProductWarehouseID uint `gorm:"column:product_warehouse_id"` ProductID uint `gorm:"column:product_id"` WarehouseID uint `gorm:"column:warehouse_id"` ProjectFlockKandangID *uint `gorm:"column:project_flock_kandang_id"` } type reflowTarget struct { ProductWarehouseID uint ProductID uint WarehouseID uint ProjectFlockKandangID *uint FlagGroupCode string } func main() { var ( projectFlockKandangID uint apply bool asOfRaw string includeShared bool ) flag.UintVar(&projectFlockKandangID, "project-flock-kandang-id", 0, "Project flock kandang ID (required)") flag.BoolVar(&apply, "apply", false, "Apply reflow. If false, run as dry-run") flag.StringVar(&asOfRaw, "as-of", "", "Optional AsOf boundary. Format: RFC3339 or YYYY-MM-DD") flag.BoolVar(&includeShared, "include-shared", true, "Include product warehouses referenced by transactions in this PFK scope (including shared/non-bound product warehouses)") flag.Parse() if projectFlockKandangID == 0 { log.Fatal("--project-flock-kandang-id is required") } asOf, err := parseAsOf(asOfRaw) if err != nil { log.Fatalf("invalid --as-of: %v", err) } ctx := context.Background() db := database.Connect(config.DBHost, config.DBName) fifoStockV2Svc := commonSvc.NewFifoStockV2Service(db, nil) exists, err := projectFlockKandangExists(ctx, db, projectFlockKandangID) if err != nil { log.Fatalf("failed to check project flock kandang: %v", err) } if !exists { log.Fatalf("project_flock_kandang_id %d not found", projectFlockKandangID) } scopedPWs, err := loadScopedProductWarehouses(ctx, db, projectFlockKandangID, includeShared) if err != nil { log.Fatalf("failed to load scoped product warehouses: %v", err) } if len(scopedPWs) == 0 { fmt.Printf("Mode: %s\n", modeLabel(apply)) fmt.Printf("Scope: project_flock_kandang_id=%d\n", projectFlockKandangID) fmt.Println("No product warehouse found in scope") return } targets := make([]reflowTarget, 0, len(scopedPWs)) skippedPW := 0 failedResolve := 0 for _, pw := range scopedPWs { flagGroups, err := resolveFlagGroupsByProductWarehouse(ctx, db, pw.ProductWarehouseID) if err != nil { fmt.Printf("FAIL pw=%d error=resolve flag groups: %v\n", pw.ProductWarehouseID, err) failedResolve++ continue } if len(flagGroups) == 0 { fmt.Printf("SKIP pw=%d reason=no active fifo v2 route by product flag\n", pw.ProductWarehouseID) skippedPW++ continue } for _, group := range flagGroups { targets = append(targets, reflowTarget{ ProductWarehouseID: pw.ProductWarehouseID, ProductID: pw.ProductID, WarehouseID: pw.WarehouseID, ProjectFlockKandangID: pw.ProjectFlockKandangID, FlagGroupCode: group, }) } } sort.Slice(targets, func(i, j int) bool { if targets[i].ProductWarehouseID == targets[j].ProductWarehouseID { return targets[i].FlagGroupCode < targets[j].FlagGroupCode } return targets[i].ProductWarehouseID < targets[j].ProductWarehouseID }) fmt.Printf("Mode: %s\n", modeLabel(apply)) fmt.Printf("Scope: project_flock_kandang_id=%d include_shared=%t\n", projectFlockKandangID, includeShared) if asOf != nil { fmt.Printf("AsOf: %s\n", asOf.UTC().Format(time.RFC3339)) } else { fmt.Println("AsOf: (full timeline)") } fmt.Printf("Product warehouses in scope: %d\n", len(scopedPWs)) fmt.Printf("Planned reflow targets: %d\n\n", len(targets)) for _, target := range targets { fmt.Printf( "PLAN pw=%d product=%d warehouse=%d pw_pfk=%s flag_group=%s\n", target.ProductWarehouseID, target.ProductID, target.WarehouseID, displayOptionalUint(target.ProjectFlockKandangID), target.FlagGroupCode, ) } if !apply { fmt.Println() fmt.Printf("Summary: planned=%d skipped_pw=%d failed_resolve=%d applied=0 failed_apply=0\n", len(targets), skippedPW, failedResolve) if failedResolve > 0 { os.Exit(1) } return } successApply := 0 failedApply := 0 for idx, target := range targets { req := commonSvc.FifoStockV2ReflowRequest{ FlagGroupCode: target.FlagGroupCode, ProductWarehouseID: target.ProductWarehouseID, AsOf: asOf, IdempotencyKey: fmt.Sprintf( "manual-pfk-reflow-%d-%d-%s-%d-%d", projectFlockKandangID, target.ProductWarehouseID, strings.ToUpper(strings.TrimSpace(target.FlagGroupCode)), time.Now().UnixNano(), idx, ), } res, err := fifoStockV2Svc.Reflow(ctx, req) if err != nil { fmt.Printf("FAIL pw=%d flag_group=%s error=%v\n", target.ProductWarehouseID, target.FlagGroupCode, err) failedApply++ continue } fmt.Printf( "DONE pw=%d flag_group=%s rollback=%.3f allocate=%.3f pending=%.3f processed_usable=%d\n", target.ProductWarehouseID, target.FlagGroupCode, res.Rollback.ReleasedQty, res.Allocate.AllocatedQty, res.Allocate.PendingQty, res.ProcessedUsables, ) successApply++ } orphanPopulationRows := int64(0) syncedPopulationQtyRows := int64(0) syncedPopulationUsedRows := int64(0) traceReleasedRows := int64(0) traceInsertedRows := int64(0) if rowsOrphan, rowsQty, rowsUsed, err := resyncProjectFlockPopulation(ctx, db, projectFlockKandangID); err != nil { fmt.Printf("FAIL population_resync project_flock_kandang_id=%d error=%v\n", projectFlockKandangID, err) failedApply++ } else { orphanPopulationRows = rowsOrphan syncedPopulationQtyRows = rowsQty syncedPopulationUsedRows = rowsUsed fmt.Printf( "SYNC project_flock_populations orphan_marked=%d qty_synced=%d used_synced=%d\n", orphanPopulationRows, syncedPopulationQtyRows, syncedPopulationUsedRows, ) } if released, inserted, err := resyncChickinTraceByProjectFlockKandang(ctx, db, fifoStockV2Svc, projectFlockKandangID); err != nil { fmt.Printf("FAIL chickin_trace_resync project_flock_kandang_id=%d error=%v\n", projectFlockKandangID, err) failedApply++ } else { traceReleasedRows = released traceInsertedRows = inserted fmt.Printf( "SYNC chickin_trace released=%d inserted=%d\n", traceReleasedRows, traceInsertedRows, ) } fmt.Println() fmt.Printf( "Summary: planned=%d skipped_pw=%d failed_resolve=%d applied=%d failed_apply=%d population_orphan=%d population_qty_synced=%d population_used_synced=%d trace_released=%d trace_inserted=%d\n", len(targets), skippedPW, failedResolve, successApply, failedApply, orphanPopulationRows, syncedPopulationQtyRows, syncedPopulationUsedRows, traceReleasedRows, traceInsertedRows, ) if failedResolve > 0 || failedApply > 0 { os.Exit(1) } } func parseAsOf(raw string) (*time.Time, error) { raw = strings.TrimSpace(raw) if raw == "" { return nil, nil } layouts := []string{ time.RFC3339Nano, time.RFC3339, "2006-01-02 15:04:05", "2006-01-02", } for _, layout := range layouts { parsed, err := time.Parse(layout, raw) if err != nil { continue } if layout == "2006-01-02" { endOfDay := time.Date(parsed.Year(), parsed.Month(), parsed.Day(), 23, 59, 59, int(time.Second-time.Nanosecond), time.UTC) return &endOfDay, nil } asOf := parsed.UTC() return &asOf, nil } return nil, fmt.Errorf("unsupported format %q", raw) } func modeLabel(apply bool) string { if apply { return "APPLY" } return "DRY-RUN" } func displayOptionalUint(v *uint) string { if v == nil { return "NULL" } return fmt.Sprintf("%d", *v) } func projectFlockKandangExists(ctx context.Context, db *gorm.DB, projectFlockKandangID uint) (bool, error) { var count int64 err := db.WithContext(ctx). Table("project_flock_kandangs"). Where("id = ?", projectFlockKandangID). Count(&count).Error if err != nil { return false, err } return count > 0, nil } func loadScopedProductWarehouses(ctx context.Context, db *gorm.DB, projectFlockKandangID uint, includeShared bool) ([]productWarehouseScopeRow, error) { if !includeShared { var rows []productWarehouseScopeRow err := db.WithContext(ctx). Table("product_warehouses"). Select("id AS product_warehouse_id, product_id, warehouse_id, project_flock_kandang_id"). Where("project_flock_kandang_id = ?", projectFlockKandangID). Order("id ASC"). Scan(&rows).Error if err != nil { return nil, err } return rows, nil } query := ` WITH scoped_pw AS ( SELECT pw.id AS product_warehouse_id FROM product_warehouses pw WHERE pw.project_flock_kandang_id = ? UNION SELECT pc.product_warehouse_id FROM project_chickins pc WHERE pc.project_flock_kandang_id = ? AND pc.deleted_at IS NULL UNION SELECT rs.product_warehouse_id FROM recordings r JOIN recording_stocks rs ON rs.recording_id = r.id WHERE r.project_flock_kandangs_id = ? AND r.deleted_at IS NULL UNION SELECT rd.product_warehouse_id FROM recordings r JOIN recording_depletions rd ON rd.recording_id = r.id WHERE r.project_flock_kandangs_id = ? AND r.deleted_at IS NULL UNION SELECT rd.source_product_warehouse_id FROM recordings r JOIN recording_depletions rd ON rd.recording_id = r.id WHERE r.project_flock_kandangs_id = ? AND r.deleted_at IS NULL AND rd.source_product_warehouse_id IS NOT NULL UNION SELECT re.product_warehouse_id FROM recordings r JOIN recording_eggs re ON re.recording_id = r.id WHERE r.project_flock_kandangs_id = ? AND r.deleted_at IS NULL UNION SELECT lts.product_warehouse_id FROM laying_transfer_sources lts WHERE lts.source_project_flock_kandang_id = ? AND lts.deleted_at IS NULL AND lts.product_warehouse_id IS NOT NULL UNION SELECT ltt.product_warehouse_id FROM laying_transfer_targets ltt WHERE ltt.target_project_flock_kandang_id = ? AND ltt.deleted_at IS NULL AND ltt.product_warehouse_id IS NOT NULL UNION SELECT pi.product_warehouse_id FROM purchase_items pi WHERE pi.project_flock_kandang_id = ? AND pi.product_warehouse_id IS NOT NULL ) SELECT DISTINCT pw.id AS product_warehouse_id, pw.product_id, pw.warehouse_id, pw.project_flock_kandang_id FROM scoped_pw s JOIN product_warehouses pw ON pw.id = s.product_warehouse_id ORDER BY pw.id ASC ` var rows []productWarehouseScopeRow err := db.WithContext(ctx). Raw( query, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, projectFlockKandangID, ). Scan(&rows).Error if err != nil { return nil, err } return rows, nil } func resolveFlagGroupsByProductWarehouse(ctx context.Context, db *gorm.DB, productWarehouseID uint) ([]string, error) { var groups []string err := db.WithContext(ctx). Table("fifo_stock_v2_route_rules rr"). Select("DISTINCT rr.flag_group_code"). Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE"). Where("rr.is_active = TRUE"). Where(` EXISTS ( SELECT 1 FROM product_warehouses pw JOIN flags f ON f.flagable_id = pw.product_id JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE WHERE pw.id = ? AND f.flagable_type = ? AND fm.flag_group_code = rr.flag_group_code ) `, productWarehouseID, entity.FlagableTypeProduct). Order("rr.flag_group_code ASC"). Scan(&groups).Error if err != nil { return nil, err } return groups, nil } func resyncProjectFlockPopulation(ctx context.Context, db *gorm.DB, projectFlockKandangID uint) (int64, int64, int64, error) { if projectFlockKandangID == 0 { return 0, 0, 0, nil } orphanResult := db.WithContext(ctx).Exec(` UPDATE project_flock_populations pfp SET deleted_at = NOW(), updated_at = NOW() FROM project_chickins pc WHERE pfp.project_chickin_id = pc.id AND pc.project_flock_kandang_id = ? AND pc.deleted_at IS NOT NULL AND pfp.deleted_at IS NULL `, projectFlockKandangID) if orphanResult.Error != nil { return 0, 0, 0, orphanResult.Error } qtyResult := db.WithContext(ctx).Exec(` UPDATE project_flock_populations p SET total_qty = GREATEST(COALESCE(pc.usage_qty, 0), 0), updated_at = NOW() FROM project_chickins pc WHERE p.project_chickin_id = pc.id AND pc.project_flock_kandang_id = ? AND pc.deleted_at IS NULL AND p.deleted_at IS NULL `, projectFlockKandangID) if qtyResult.Error != nil { return 0, 0, 0, qtyResult.Error } usedResult := db.WithContext(ctx).Exec(` WITH scoped AS ( SELECT pfp.id, pfp.total_qty FROM project_flock_populations pfp JOIN project_chickins pc ON pc.id = pfp.project_chickin_id WHERE pc.project_flock_kandang_id = ? AND pc.deleted_at IS NULL AND pfp.deleted_at IS NULL ), alloc AS ( SELECT sa.stockable_id, SUM(sa.qty) AS used_qty FROM stock_allocations sa WHERE sa.stockable_type = 'PROJECT_FLOCK_POPULATION' AND sa.status = 'ACTIVE' AND sa.allocation_purpose = 'CONSUME' GROUP BY sa.stockable_id ) UPDATE project_flock_populations p SET total_used_qty = LEAST(COALESCE(a.used_qty, 0), GREATEST(s.total_qty, 0)), updated_at = NOW() FROM scoped s LEFT JOIN alloc a ON a.stockable_id = s.id WHERE p.id = s.id `, projectFlockKandangID) if usedResult.Error != nil { return 0, 0, 0, usedResult.Error } return orphanResult.RowsAffected, qtyResult.RowsAffected, usedResult.RowsAffected, nil } func resyncChickinTraceByProjectFlockKandang( ctx context.Context, db *gorm.DB, fifoStockV2Svc commonSvc.FifoStockV2Service, projectFlockKandangID uint, ) (int64, int64, error) { if projectFlockKandangID == 0 { return 0, 0, nil } var productWarehouseIDs []uint if err := db.WithContext(ctx). Table("project_chickins"). Distinct("product_warehouse_id"). Where("project_flock_kandang_id = ?", projectFlockKandangID). Where("deleted_at IS NULL"). Order("product_warehouse_id ASC"). Pluck("product_warehouse_id", &productWarehouseIDs).Error; err != nil { return 0, 0, err } if len(productWarehouseIDs) == 0 { return 0, 0, nil } totalReleased := int64(0) totalInserted := int64(0) for _, productWarehouseID := range productWarehouseIDs { var releasedRows int64 var insertedRows int64 err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { flagGroups, err := resolveFlagGroupsByProductWarehouse(ctx, tx, productWarehouseID) if err != nil { return err } if len(flagGroups) == 0 { return nil } flagGroupCode := strings.TrimSpace(flagGroups[0]) if flagGroupCode == "" { return nil } released := tx.WithContext(ctx). Table("stock_allocations"). Where("product_warehouse_id = ?", productWarehouseID). Where("usable_type = ?", fifo.UsableKeyProjectChickin.String()). Where("allocation_purpose = ?", entity.StockAllocationPurposeTraceChickin). Where("status = ?", entity.StockAllocationStatusActive). Updates(map[string]any{ "status": entity.StockAllocationStatusReleased, "released_at": time.Now(), "updated_at": time.Now(), "note": "chickin_trace_reflow_reset", }) if released.Error != nil { return released.Error } releasedRows = released.RowsAffected type chickinRow struct { ID uint `gorm:"column:id"` UsageQty float64 `gorm:"column:usage_qty"` ChickIn time.Time `gorm:"column:chick_in_date"` } chickins := make([]chickinRow, 0) if err := tx.WithContext(ctx). Table("project_chickins"). Select("id, usage_qty, chick_in_date"). Where("product_warehouse_id = ?", productWarehouseID). Where("deleted_at IS NULL"). Where("usage_qty > 0"). Order("chick_in_date ASC, id ASC"). Scan(&chickins).Error; err != nil { return err } if len(chickins) == 0 { return nil } gatherRows, err := fifoStockV2Svc.Gather(ctx, commonSvc.FifoStockV2GatherRequest{ FlagGroupCode: flagGroupCode, Lane: "STOCKABLE", AllocationPurpose: entity.StockAllocationPurposeTraceChickin, IgnoreSourceUsed: true, ProductWarehouseID: productWarehouseID, Limit: 50000, Tx: tx, }) if err != nil { return err } if len(gatherRows) == 0 { return nil } type lotKey struct { StockableType string StockableID uint } remainingByLot := make(map[lotKey]float64, len(gatherRows)) for _, row := range gatherRows { key := lotKey{StockableType: row.Ref.LegacyTypeKey, StockableID: row.Ref.ID} remainingByLot[key] = row.AvailableQuantity } now := time.Now() lotIndex := 0 for _, chickinRow := range chickins { remaining := chickinRow.UsageQty for remaining > 1e-6 && lotIndex < len(gatherRows) { lot := gatherRows[lotIndex] key := lotKey{StockableType: lot.Ref.LegacyTypeKey, StockableID: lot.Ref.ID} available := remainingByLot[key] if available <= 1e-6 { lotIndex++ continue } portion := math.Min(remaining, available) if portion <= 1e-6 { lotIndex++ continue } insert := map[string]any{ "product_warehouse_id": productWarehouseID, "stockable_type": lot.Ref.LegacyTypeKey, "stockable_id": lot.Ref.ID, "usable_type": fifo.UsableKeyProjectChickin.String(), "usable_id": chickinRow.ID, "qty": portion, "status": entity.StockAllocationStatusActive, "allocation_purpose": entity.StockAllocationPurposeTraceChickin, "engine_version": "v2", "flag_group_code": flagGroupCode, "function_code": "CHICKIN_TRACE", "created_at": now, "updated_at": now, } if err := tx.WithContext(ctx).Table("stock_allocations").Create(insert).Error; err != nil { return err } insertedRows++ remaining -= portion remainingByLot[key] = available - portion } } return nil }) if err != nil { return totalReleased, totalInserted, err } totalReleased += releasedRows totalInserted += insertedRows } return totalReleased, totalInserted, nil }