Files

1086 lines
34 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"sort"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/go-playground/validator/v10"
"github.com/sirupsen/logrus"
commonSvc "gitlab.com/mbugroup/lti-api.git/internal/common/service"
fifoStockV2 "gitlab.com/mbugroup/lti-api.git/internal/common/service/fifo_stock_v2"
"gitlab.com/mbugroup/lti-api.git/internal/config"
"gitlab.com/mbugroup/lti-api.git/internal/database"
entity "gitlab.com/mbugroup/lti-api.git/internal/entities"
pwRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/inventory/product-warehouses/repositories"
transferRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/inventory/transfers/repositories"
transferSvc "gitlab.com/mbugroup/lti-api.git/internal/modules/inventory/transfers/services"
warehouseRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/master/warehouses/repositories"
pfkRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/production/project_flocks/repositories"
stockLogRepo "gitlab.com/mbugroup/lti-api.git/internal/modules/shared/repositories"
"gitlab.com/mbugroup/lti-api.git/internal/utils"
"gorm.io/gorm"
)
const (
adjustmentCutoverReasonPrefix = "EGG_FARM_ADJUSTMENT_CUTOVER"
adjustmentOutputModeTable = "table"
adjustmentOutputModeJSON = "json"
)
type adjustmentCommandOptions struct {
Apply bool
DryRun bool
RollbackRunID string
LocationID uint
LocationName string
CutoverDate time.Time
CutoverDateRaw string
Output string
ActorID uint
RunID string
}
type adjustmentLocationTiming struct {
LocationID uint
LocationName string
FirstKandangDate *time.Time
LastKandangDate *time.Time
FirstFarmDate *time.Time
LastFarmDate *time.Time
Status string
}
type adjustmentLegacyEggRow struct {
LocationID uint
LocationName string
SourceWarehouseID uint
SourceWarehouseName string
FarmWarehouseID *uint
FarmWarehouseName *string
ProductWarehouseID uint
ProductID uint
ProductName string
RemainingQty float64
CurrentPWQty float64
AdjustmentIDs []uint
}
type adjustmentMigrationReportRow struct {
RunID string `json:"run_id"`
LocationID uint `json:"location_id"`
LocationName string `json:"location_name"`
SourceWarehouseID uint `json:"source_warehouse_id"`
SourceWarehouseName string `json:"source_warehouse_name"`
FarmWarehouseID *uint `json:"farm_warehouse_id,omitempty"`
FarmWarehouseName *string `json:"farm_warehouse_name,omitempty"`
ProductWarehouseID uint `json:"product_warehouse_id"`
ProductID uint `json:"product_id"`
ProductName string `json:"product_name"`
RemainingQty float64 `json:"remaining_qty"`
CurrentPWQty float64 `json:"current_pw_qty"`
VerifiedQty float64 `json:"verified_qty"`
LocationStatus string `json:"location_status"`
AdjustmentIDs []uint `json:"adjustment_ids,omitempty"`
Status string `json:"status"`
Reason string `json:"reason,omitempty"`
TransferID *uint64 `json:"transfer_id,omitempty"`
MovementNumber *string `json:"movement_number,omitempty"`
}
type adjustmentApplySummary struct {
RowsPlanned int `json:"rows_planned"`
RowsApplied int `json:"rows_applied"`
RowsSkipped int `json:"rows_skipped"`
RowsFailed int `json:"rows_failed"`
GroupsPlanned int `json:"groups_planned"`
GroupsApplied int `json:"groups_applied"`
}
type adjustmentRollbackDetailRow struct {
RunID string `json:"run_id"`
TransferID uint64 `json:"transfer_id"`
MovementNumber string `json:"movement_number"`
LocationName string `json:"location_name"`
SourceWarehouseName string `json:"source_warehouse_name"`
FarmWarehouseName string `json:"farm_warehouse_name"`
ProductName string `json:"product_name"`
Qty float64 `json:"qty"`
Status string `json:"status"`
Reason string `json:"reason,omitempty"`
}
type adjustmentTransferGroup struct {
LocationID uint
LocationName string
SourceWarehouseID uint
SourceWarehouseName string
FarmWarehouseID uint
FarmWarehouseName string
Rows []*adjustmentMigrationReportRow
}
type adjustmentCandidateValidation struct {
Status string
Reason string
VerifiedQty float64
}
type adjustmentCandidateValidator interface {
ValidateCandidate(ctx context.Context, row adjustmentLegacyEggRow) (adjustmentCandidateValidation, error)
}
type liveAdjustmentCandidateValidator struct {
db *gorm.DB
fifoSvc commonSvc.FifoStockV2Service
}
type adjustmentSystemTransferExecutor interface {
CreateSystemTransfer(ctx context.Context, req *transferSvc.SystemTransferRequest) (*entity.StockTransfer, error)
DeleteSystemTransfer(ctx context.Context, id uint, actorID uint) error
}
func main() {
opts, err := parseAdjustmentFlags()
if err != nil {
log.Fatalf("invalid flags: %v", err)
}
db := database.Connect(config.DBHost, config.DBName)
ctx := context.Background()
validator := &liveAdjustmentCandidateValidator{
db: db,
fifoSvc: commonSvc.NewFifoStockV2Service(db, logrus.StandardLogger()),
}
if strings.TrimSpace(opts.RollbackRunID) != "" {
rows, err := loadAdjustmentRollbackDetails(ctx, db, opts.RollbackRunID)
if err != nil {
log.Fatalf("failed to load rollback details: %v", err)
}
if !opts.Apply {
for i := range rows {
rows[i].Status = "eligible"
}
renderAdjustmentRollbackReport(opts.Output, rows)
return
}
if err := executeAdjustmentRollback(ctx, newAdjustmentSystemTransferService(db), rows, opts.ActorID); err != nil {
log.Fatalf("rollback failed: %v", err)
}
renderAdjustmentRollbackReport(opts.Output, rows)
return
}
timings, err := loadAdjustmentLocationTimings(ctx, db, opts)
if err != nil {
log.Fatalf("failed to load location timings: %v", err)
}
candidates, err := loadLegacyEggAdjustmentRows(ctx, db, opts)
if err != nil {
log.Fatalf("failed to load legacy adjustment egg rows: %v", err)
}
reportRows, groups := buildAdjustmentMigrationPlan(ctx, opts, timings, candidates, validator)
if !opts.Apply {
renderAdjustmentMigrationReport(opts.Output, reportRows, summarizeAdjustmentApply(reportRows, groups, 0))
return
}
summary, err := executeAdjustmentApply(ctx, newAdjustmentSystemTransferService(db), validator, opts, groups)
if err != nil {
log.Fatalf("apply failed: %v", err)
}
finalRows := flattenAdjustmentGroups(groups, reportRows)
summary = summarizeAdjustmentApply(finalRows, groups, summary.GroupsApplied)
renderAdjustmentMigrationReport(opts.Output, finalRows, summary)
}
func parseAdjustmentFlags() (*adjustmentCommandOptions, error) {
var opts adjustmentCommandOptions
flag.BoolVar(&opts.Apply, "apply", false, "Apply migration. If false, run as dry-run")
flag.BoolVar(&opts.DryRun, "dry-run", true, "Run as dry-run")
flag.StringVar(&opts.RollbackRunID, "rollback-run-id", "", "Rollback all transfers created by the provided run id")
flag.UintVar(&opts.LocationID, "location-id", 0, "Filter by location id")
flag.StringVar(&opts.LocationName, "location-name", "", "Filter by exact location name")
flag.StringVar(&opts.CutoverDateRaw, "cutover-date", "", "Cutover date in YYYY-MM-DD format")
flag.StringVar(&opts.Output, "output", adjustmentOutputModeTable, "Output format: table or json")
flag.UintVar(&opts.ActorID, "actor-id", 1, "Actor id used for created/deleted transfers")
flag.Parse()
opts.LocationName = strings.TrimSpace(opts.LocationName)
opts.RollbackRunID = strings.TrimSpace(opts.RollbackRunID)
opts.Output = strings.ToLower(strings.TrimSpace(opts.Output))
if opts.Output == "" {
opts.Output = adjustmentOutputModeTable
}
if opts.Output != adjustmentOutputModeTable && opts.Output != adjustmentOutputModeJSON {
return nil, fmt.Errorf("unsupported --output=%s", opts.Output)
}
if opts.Apply {
opts.DryRun = false
}
if opts.LocationID > 0 && opts.LocationName != "" {
return nil, errors.New("use either --location-id or --location-name, not both")
}
if opts.RollbackRunID != "" {
if opts.LocationID > 0 || opts.LocationName != "" {
return nil, errors.New("location filters are not supported with --rollback-run-id")
}
if opts.CutoverDateRaw != "" {
return nil, errors.New("--cutover-date is not used with --rollback-run-id")
}
} else if opts.Apply {
if opts.LocationID == 0 && opts.LocationName == "" {
return nil, errors.New("apply mode requires --location-id or --location-name for safety")
}
if strings.TrimSpace(opts.CutoverDateRaw) == "" {
return nil, errors.New("--cutover-date is required in apply mode")
}
}
if strings.TrimSpace(opts.CutoverDateRaw) == "" {
opts.CutoverDate = normalizeAdjustmentDateOnly(time.Now().In(time.FixedZone("Asia/Jakarta", 7*3600)))
} else {
t, err := time.Parse("2006-01-02", opts.CutoverDateRaw)
if err != nil {
return nil, fmt.Errorf("invalid --cutover-date: %w", err)
}
opts.CutoverDate = normalizeAdjustmentDateOnly(t)
}
opts.RunID = buildAdjustmentRunID()
return &opts, nil
}
func newAdjustmentSystemTransferService(db *gorm.DB) adjustmentSystemTransferExecutor {
validate := validator.New()
stockTransferRepo := transferRepo.NewStockTransferRepository(db)
stockTransferDetailRepo := transferRepo.NewStockTransferDetailRepository(db)
stockTransferDeliveryRepo := transferRepo.NewStockTransferDeliveryRepository(db)
stockTransferDeliveryItemRepo := transferRepo.NewStockTransferDeliveryItemRepository(db)
stockLogsRepo := stockLogRepo.NewStockLogRepository(db)
productWarehouseRepo := pwRepo.NewProductWarehouseRepository(db)
warehouseRepository := warehouseRepo.NewWarehouseRepository(db)
projectFlockKandangRepo := pfkRepo.NewProjectFlockKandangRepository(db)
projectFlockPopulationRepo := pfkRepo.NewProjectFlockPopulationRepository(db)
fifoSvc := commonSvc.NewFifoStockV2Service(db, logrus.StandardLogger())
return transferSvc.NewTransferService(
validate,
stockTransferRepo,
stockTransferDetailRepo,
stockTransferDeliveryRepo,
stockTransferDeliveryItemRepo,
stockLogsRepo,
productWarehouseRepo,
nil,
warehouseRepository,
projectFlockKandangRepo,
projectFlockPopulationRepo,
nil,
fifoSvc,
nil,
)
}
func loadAdjustmentLocationTimings(ctx context.Context, db *gorm.DB, opts *adjustmentCommandOptions) (map[uint]adjustmentLocationTiming, error) {
type row struct {
LocationID uint `gorm:"column:location_id"`
LocationName string `gorm:"column:location_name"`
FirstKandangDate *time.Time `gorm:"column:first_kandang_date"`
LastKandangDate *time.Time `gorm:"column:last_kandang_date"`
FirstFarmDate *time.Time `gorm:"column:first_farm_date"`
LastFarmDate *time.Time `gorm:"column:last_farm_date"`
}
query := db.WithContext(ctx).
Table("recording_eggs re").
Select(`
pf.location_id AS location_id,
l.name AS location_name,
MIN(CASE WHEN w.type = 'KANDANG' THEN DATE(r.record_datetime) END) AS first_kandang_date,
MAX(CASE WHEN w.type = 'KANDANG' THEN DATE(r.record_datetime) END) AS last_kandang_date,
MIN(CASE WHEN w.type = 'LOKASI' THEN DATE(r.record_datetime) END) AS first_farm_date,
MAX(CASE WHEN w.type = 'LOKASI' THEN DATE(r.record_datetime) END) AS last_farm_date
`).
Joins("JOIN recordings r ON r.id = re.recording_id").
Joins("JOIN project_flock_kandangs pk ON pk.id = COALESCE(re.project_flock_kandang_id, r.project_flock_kandangs_id)").
Joins("JOIN project_flocks pf ON pf.id = pk.project_flock_id").
Joins("JOIN locations l ON l.id = pf.location_id").
Joins("JOIN product_warehouses pw ON pw.id = re.product_warehouse_id").
Joins("JOIN warehouses w ON w.id = pw.warehouse_id").
Group("pf.location_id, l.name")
query = applyAdjustmentTimingLocationFilter(query, opts)
var rows []row
if err := query.Scan(&rows).Error; err != nil {
return nil, err
}
result := make(map[uint]adjustmentLocationTiming, len(rows))
for _, row := range rows {
status := "KANDANG_ONLY"
if row.FirstFarmDate != nil {
status = "OVERLAP"
if row.LastKandangDate == nil || row.FirstFarmDate.After(normalizeAdjustmentDateOnly(*row.LastKandangDate)) {
status = "CLEAN_CUTOVER"
}
}
result[row.LocationID] = adjustmentLocationTiming{
LocationID: row.LocationID,
LocationName: row.LocationName,
FirstKandangDate: normalizeAdjustmentDatePtr(row.FirstKandangDate),
LastKandangDate: normalizeAdjustmentDatePtr(row.LastKandangDate),
FirstFarmDate: normalizeAdjustmentDatePtr(row.FirstFarmDate),
LastFarmDate: normalizeAdjustmentDatePtr(row.LastFarmDate),
Status: status,
}
}
return result, nil
}
func loadLegacyEggAdjustmentRows(ctx context.Context, db *gorm.DB, opts *adjustmentCommandOptions) ([]adjustmentLegacyEggRow, error) {
type row struct {
LocationID uint `gorm:"column:location_id"`
LocationName string `gorm:"column:location_name"`
SourceWarehouseID uint `gorm:"column:source_warehouse_id"`
SourceWarehouseName string `gorm:"column:source_warehouse_name"`
FarmWarehouseID *uint `gorm:"column:farm_warehouse_id"`
FarmWarehouseName *string `gorm:"column:farm_warehouse_name"`
ProductWarehouseID uint `gorm:"column:product_warehouse_id"`
ProductID uint `gorm:"column:product_id"`
ProductName string `gorm:"column:product_name"`
RemainingQty float64 `gorm:"column:remaining_qty"`
CurrentPWQty float64 `gorm:"column:current_pw_qty"`
AdjustmentIDs string `gorm:"column:adjustment_ids"`
}
firstFarmSub := db.WithContext(ctx).
Table("warehouses fw").
Select("fw.location_id AS location_id, MIN(fw.id) AS farm_warehouse_id").
Where("fw.deleted_at IS NULL").
Where("fw.type = ?", "LOKASI").
Group("fw.location_id")
query := db.WithContext(ctx).
Table("adjustment_stocks ast").
Select(`
kw.location_id AS location_id,
l.name AS location_name,
kw.id AS source_warehouse_id,
kw.name AS source_warehouse_name,
fw.id AS farm_warehouse_id,
fw.name AS farm_warehouse_name,
pw.id AS product_warehouse_id,
pw.product_id AS product_id,
p.name AS product_name,
COALESCE(pw.qty, 0) AS current_pw_qty,
SUM(GREATEST(ast.total_qty - ast.total_used, 0)) AS remaining_qty,
STRING_AGG(ast.id::text, ',' ORDER BY ast.id) AS adjustment_ids
`).
Joins("JOIN product_warehouses pw ON pw.id = ast.product_warehouse_id").
Joins("JOIN warehouses kw ON kw.id = pw.warehouse_id AND kw.deleted_at IS NULL").
Joins("JOIN locations l ON l.id = kw.location_id").
Joins("JOIN products p ON p.id = pw.product_id").
Joins("LEFT JOIN product_categories pc ON pc.id = p.product_category_id").
Joins("LEFT JOIN (?) ff ON ff.location_id = kw.location_id", firstFarmSub).
Joins("LEFT JOIN warehouses fw ON fw.id = ff.farm_warehouse_id").
Where("kw.type = ?", "KANDANG").
Where("ast.total_qty > ast.total_used").
Where(`
EXISTS (
SELECT 1
FROM flags f
WHERE f.flagable_type = ?
AND f.flagable_id = p.id
AND (UPPER(f.name) = 'TELUR' OR UPPER(f.name) LIKE 'TELUR-%')
)
OR (
NOT EXISTS (
SELECT 1
FROM flags f_any
WHERE f_any.flagable_type = ?
AND f_any.flagable_id = p.id
)
AND UPPER(COALESCE(pc.code, '')) = 'EGG'
)
`, entity.FlagableTypeProduct, entity.FlagableTypeProduct).
Group(`
kw.location_id,
l.name,
kw.id,
kw.name,
fw.id,
fw.name,
pw.id,
pw.product_id,
p.name,
pw.qty
`).
Order("l.name ASC, kw.name ASC, p.name ASC")
query = applyAdjustmentLegacyLocationFilter(query, opts)
var rows []row
if err := query.Scan(&rows).Error; err != nil {
return nil, err
}
result := make([]adjustmentLegacyEggRow, 0, len(rows))
for _, row := range rows {
result = append(result, adjustmentLegacyEggRow{
LocationID: row.LocationID,
LocationName: row.LocationName,
SourceWarehouseID: row.SourceWarehouseID,
SourceWarehouseName: row.SourceWarehouseName,
FarmWarehouseID: row.FarmWarehouseID,
FarmWarehouseName: row.FarmWarehouseName,
ProductWarehouseID: row.ProductWarehouseID,
ProductID: row.ProductID,
ProductName: row.ProductName,
RemainingQty: row.RemainingQty,
CurrentPWQty: row.CurrentPWQty,
AdjustmentIDs: parseAdjustmentIDs(row.AdjustmentIDs),
})
}
return result, nil
}
func (v *liveAdjustmentCandidateValidator) ValidateCandidate(ctx context.Context, row adjustmentLegacyEggRow) (adjustmentCandidateValidation, error) {
if row.RemainingQty <= 0 {
return adjustmentCandidateValidation{Status: "skipped", Reason: "non_positive_remaining_qty"}, nil
}
if row.ProductWarehouseID == 0 || row.ProductID == 0 {
return adjustmentCandidateValidation{Status: "failed", Reason: "invalid_candidate"}, nil
}
flagGroupCode, err := resolveAdjustmentEggFlagGroup(ctx, v.db, row.ProductID)
if err != nil {
return adjustmentCandidateValidation{}, err
}
gatherRows, err := v.fifoSvc.Gather(ctx, commonSvc.FifoStockV2GatherRequest{
FlagGroupCode: flagGroupCode,
Lane: fifoStockV2.LaneStockable,
AllocationPurpose: entity.StockAllocationPurposeConsume,
ProductWarehouseID: row.ProductWarehouseID,
Limit: 1000,
})
if err != nil {
return adjustmentCandidateValidation{}, err
}
return validateAdjustmentGatherAgainstAllowedIDs(row.RemainingQty, row.AdjustmentIDs, gatherRows), nil
}
func validateAdjustmentGatherAgainstAllowedIDs(
needQty float64,
allowedIDs []uint,
rows []commonSvc.FifoStockV2GatherRow,
) adjustmentCandidateValidation {
if needQty <= 0 {
return adjustmentCandidateValidation{Status: "skipped", Reason: "non_positive_remaining_qty"}
}
allowed := make(map[uint]struct{}, len(allowedIDs))
for _, id := range allowedIDs {
if id > 0 {
allowed[id] = struct{}{}
}
}
if len(allowed) == 0 {
return adjustmentCandidateValidation{Status: "failed", Reason: "missing_adjustment_ids"}
}
const epsilon = 1e-6
verifiedQty := 0.0
for _, row := range rows {
if row.AvailableQuantity <= epsilon {
continue
}
takeQty := row.AvailableQuantity
if remaining := needQty - verifiedQty; takeQty > remaining {
takeQty = remaining
}
if takeQty <= epsilon {
continue
}
if row.SourceTable != "adjustment_stocks" {
return adjustmentCandidateValidation{
Status: "skipped",
Reason: fmt.Sprintf("mixed_fifo_source_%s", strings.ToLower(strings.TrimSpace(row.SourceTable))),
VerifiedQty: verifiedQty,
}
}
if _, ok := allowed[row.SourceID]; !ok {
return adjustmentCandidateValidation{
Status: "skipped",
Reason: "mixed_adjustment_source",
VerifiedQty: verifiedQty,
}
}
verifiedQty += takeQty
if verifiedQty >= needQty-epsilon {
return adjustmentCandidateValidation{
Status: "eligible",
VerifiedQty: verifiedQty,
}
}
}
return adjustmentCandidateValidation{
Status: "skipped",
Reason: "insufficient_fifo_available",
VerifiedQty: verifiedQty,
}
}
func resolveAdjustmentEggFlagGroup(ctx context.Context, db *gorm.DB, productID uint) (string, error) {
type row struct {
FlagGroupCode string `gorm:"column:flag_group_code"`
}
var selected row
err := db.WithContext(ctx).
Table("fifo_stock_v2_route_rules rr").
Select("rr.flag_group_code").
Joins("JOIN fifo_stock_v2_flag_groups fg ON fg.code = rr.flag_group_code AND fg.is_active = TRUE").
Where("rr.is_active = TRUE").
Where("rr.lane = ?", "USABLE").
Where("rr.function_code = ?", "STOCK_TRANSFER_OUT").
Where("rr.source_table = ?", "stock_transfer_details").
Where(`
EXISTS (
SELECT 1
FROM products p
LEFT JOIN product_categories pc ON pc.id = p.product_category_id
WHERE p.id = ?
AND (
EXISTS (
SELECT 1
FROM flags f
JOIN fifo_stock_v2_flag_members fm ON fm.flag_name = f.name AND fm.is_active = TRUE
WHERE f.flagable_type = ?
AND f.flagable_id = p.id
AND fm.flag_group_code = rr.flag_group_code
)
OR (
NOT EXISTS (
SELECT 1
FROM flags f_any
WHERE f_any.flagable_type = ?
AND f_any.flagable_id = p.id
)
AND rr.flag_group_code = ?
AND UPPER(COALESCE(pc.code, '')) = 'EGG'
)
)
)
`, productID, entity.FlagableTypeProduct, entity.FlagableTypeProduct, utils.LegacyFlagGroupCodeByProductCategoryCode("EGG")).
Order("rr.id ASC").
Limit(1).
Take(&selected).Error
if err != nil {
return "", err
}
return strings.TrimSpace(selected.FlagGroupCode), nil
}
func buildAdjustmentMigrationPlan(
ctx context.Context,
opts *adjustmentCommandOptions,
timings map[uint]adjustmentLocationTiming,
rows []adjustmentLegacyEggRow,
validator adjustmentCandidateValidator,
) ([]adjustmentMigrationReportRow, []adjustmentTransferGroup) {
reportRows := make([]adjustmentMigrationReportRow, 0, len(rows))
groupMap := make(map[string]*adjustmentTransferGroup)
for _, row := range rows {
locationStatus := "UNKNOWN"
if timing, ok := timings[row.LocationID]; ok {
locationStatus = timing.Status
}
report := adjustmentMigrationReportRow{
RunID: opts.RunID,
LocationID: row.LocationID,
LocationName: row.LocationName,
SourceWarehouseID: row.SourceWarehouseID,
SourceWarehouseName: row.SourceWarehouseName,
FarmWarehouseID: row.FarmWarehouseID,
FarmWarehouseName: row.FarmWarehouseName,
ProductWarehouseID: row.ProductWarehouseID,
ProductID: row.ProductID,
ProductName: row.ProductName,
RemainingQty: row.RemainingQty,
CurrentPWQty: row.CurrentPWQty,
LocationStatus: locationStatus,
AdjustmentIDs: row.AdjustmentIDs,
Status: "eligible",
}
switch {
case row.FarmWarehouseID == nil || row.FarmWarehouseName == nil:
report.Status = "skipped"
report.Reason = "missing_farm_warehouse"
case row.RemainingQty <= 0:
report.Status = "skipped"
report.Reason = "non_positive_remaining_qty"
default:
validation, err := validator.ValidateCandidate(ctx, row)
if err != nil {
report.Status = "failed"
report.Reason = err.Error()
} else {
report.Status = validation.Status
report.Reason = validation.Reason
report.VerifiedQty = validation.VerifiedQty
}
}
reportRows = append(reportRows, report)
if report.Status != "eligible" {
continue
}
groupKey := fmt.Sprintf("%d:%d", row.SourceWarehouseID, *row.FarmWarehouseID)
group := groupMap[groupKey]
if group == nil {
group = &adjustmentTransferGroup{
LocationID: row.LocationID,
LocationName: row.LocationName,
SourceWarehouseID: row.SourceWarehouseID,
SourceWarehouseName: row.SourceWarehouseName,
FarmWarehouseID: *row.FarmWarehouseID,
FarmWarehouseName: derefAdjustmentString(row.FarmWarehouseName),
}
groupMap[groupKey] = group
}
group.Rows = append(group.Rows, &reportRows[len(reportRows)-1])
}
groups := make([]adjustmentTransferGroup, 0, len(groupMap))
for _, group := range groupMap {
sort.Slice(group.Rows, func(i, j int) bool {
return group.Rows[i].ProductName < group.Rows[j].ProductName
})
groups = append(groups, *group)
}
sort.Slice(groups, func(i, j int) bool {
if groups[i].LocationName == groups[j].LocationName {
return groups[i].SourceWarehouseName < groups[j].SourceWarehouseName
}
return groups[i].LocationName < groups[j].LocationName
})
return reportRows, groups
}
func executeAdjustmentApply(
ctx context.Context,
svc adjustmentSystemTransferExecutor,
validator adjustmentCandidateValidator,
opts *adjustmentCommandOptions,
groups []adjustmentTransferGroup,
) (adjustmentApplySummary, error) {
summary := adjustmentApplySummary{GroupsPlanned: len(groups)}
for _, group := range groups {
eligibleRows := make([]*adjustmentMigrationReportRow, 0, len(group.Rows))
for _, row := range group.Rows {
validation, err := validator.ValidateCandidate(ctx, adjustmentLegacyEggRow{
LocationID: row.LocationID,
LocationName: row.LocationName,
SourceWarehouseID: row.SourceWarehouseID,
SourceWarehouseName: row.SourceWarehouseName,
FarmWarehouseID: row.FarmWarehouseID,
FarmWarehouseName: row.FarmWarehouseName,
ProductWarehouseID: row.ProductWarehouseID,
ProductID: row.ProductID,
ProductName: row.ProductName,
RemainingQty: row.RemainingQty,
CurrentPWQty: row.CurrentPWQty,
AdjustmentIDs: row.AdjustmentIDs,
})
if err != nil {
row.Status = "failed"
row.Reason = err.Error()
summary.RowsFailed++
continue
}
if validation.Status != "eligible" {
row.Status = "failed"
row.Reason = validation.Reason
row.VerifiedQty = validation.VerifiedQty
summary.RowsFailed++
continue
}
row.VerifiedQty = validation.VerifiedQty
eligibleRows = append(eligibleRows, row)
}
if len(eligibleRows) == 0 {
continue
}
products := make([]transferSvc.SystemTransferProduct, 0, len(eligibleRows))
for _, row := range eligibleRows {
products = append(products, transferSvc.SystemTransferProduct{
ProductID: row.ProductID,
ProductQty: row.RemainingQty,
})
}
reason := buildAdjustmentCutoverReason(opts.RunID, group.LocationName, opts.CutoverDate)
transfer, err := svc.CreateSystemTransfer(ctx, &transferSvc.SystemTransferRequest{
TransferReason: reason,
TransferDate: opts.CutoverDate,
SourceWarehouseID: group.SourceWarehouseID,
DestinationWarehouseID: group.FarmWarehouseID,
Products: products,
ActorID: opts.ActorID,
StockLogNotes: reason,
})
if err != nil {
for _, row := range eligibleRows {
row.Status = "failed"
row.Reason = err.Error()
summary.RowsFailed++
}
continue
}
summary.GroupsApplied++
for _, row := range eligibleRows {
row.Status = "applied"
row.TransferID = &transfer.Id
row.MovementNumber = &transfer.MovementNumber
summary.RowsApplied++
}
}
return summary, nil
}
func executeAdjustmentRollback(
ctx context.Context,
svc adjustmentSystemTransferExecutor,
rows []adjustmentRollbackDetailRow,
actorID uint,
) error {
if actorID == 0 {
return fmt.Errorf("actor id is required for rollback")
}
byTransfer := make(map[uint64][]int)
for idx, row := range rows {
byTransfer[row.TransferID] = append(byTransfer[row.TransferID], idx)
}
transferIDs := make([]uint64, 0, len(byTransfer))
for transferID := range byTransfer {
transferIDs = append(transferIDs, transferID)
}
sort.Slice(transferIDs, func(i, j int) bool { return transferIDs[i] > transferIDs[j] })
var firstErr error
for _, transferID := range transferIDs {
err := svc.DeleteSystemTransfer(ctx, uint(transferID), actorID)
for _, idx := range byTransfer[transferID] {
if err != nil {
rows[idx].Status = "failed"
rows[idx].Reason = err.Error()
} else {
rows[idx].Status = "rolled_back"
}
}
if err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func loadAdjustmentRollbackDetails(ctx context.Context, db *gorm.DB, runID string) ([]adjustmentRollbackDetailRow, error) {
type row struct {
TransferID uint64 `gorm:"column:transfer_id"`
MovementNumber string `gorm:"column:movement_number"`
LocationName string `gorm:"column:location_name"`
SourceWarehouseName string `gorm:"column:source_warehouse_name"`
FarmWarehouseName string `gorm:"column:farm_warehouse_name"`
ProductName string `gorm:"column:product_name"`
Qty float64 `gorm:"column:qty"`
}
needle := buildAdjustmentRunReasonMatcher(runID)
var dbRows []row
err := db.WithContext(ctx).
Table("stock_transfers st").
Select(`
st.id AS transfer_id,
st.movement_number AS movement_number,
COALESCE(loc.name, '') AS location_name,
ws.name AS source_warehouse_name,
wd.name AS farm_warehouse_name,
p.name AS product_name,
COALESCE(std.total_qty, std.usage_qty, 0) AS qty
`).
Joins("JOIN warehouses ws ON ws.id = st.from_warehouse_id").
Joins("JOIN warehouses wd ON wd.id = st.to_warehouse_id").
Joins("LEFT JOIN locations loc ON loc.id = COALESCE(ws.location_id, wd.location_id)").
Joins("JOIN stock_transfer_details std ON std.stock_transfer_id = st.id AND std.deleted_at IS NULL").
Joins("JOIN products p ON p.id = std.product_id").
Where("st.deleted_at IS NULL").
Where("st.reason LIKE ?", needle).
Order("st.id DESC, std.id ASC").
Scan(&dbRows).Error
if err != nil {
return nil, err
}
rows := make([]adjustmentRollbackDetailRow, 0, len(dbRows))
for _, row := range dbRows {
rows = append(rows, adjustmentRollbackDetailRow{
RunID: runID,
TransferID: row.TransferID,
MovementNumber: row.MovementNumber,
LocationName: row.LocationName,
SourceWarehouseName: row.SourceWarehouseName,
FarmWarehouseName: row.FarmWarehouseName,
ProductName: row.ProductName,
Qty: row.Qty,
})
}
return rows, nil
}
func applyAdjustmentTimingLocationFilter(db *gorm.DB, opts *adjustmentCommandOptions) *gorm.DB {
if opts == nil {
return db
}
switch {
case opts.LocationID > 0:
return db.Where("pf.location_id = ?", opts.LocationID)
case opts.LocationName != "":
return db.Where("LOWER(l.name) = LOWER(?)", opts.LocationName)
default:
return db
}
}
func applyAdjustmentLegacyLocationFilter(db *gorm.DB, opts *adjustmentCommandOptions) *gorm.DB {
if opts == nil {
return db
}
switch {
case opts.LocationID > 0:
return db.Where("kw.location_id = ?", opts.LocationID)
case opts.LocationName != "":
return db.Where("LOWER(l.name) = LOWER(?)", opts.LocationName)
default:
return db
}
}
func buildAdjustmentCutoverReason(runID, locationName string, cutoverDate time.Time) string {
locationName = strings.ReplaceAll(strings.TrimSpace(locationName), "|", "/")
return fmt.Sprintf("%s|run_id=%s|location=%s|cutover_date=%s", adjustmentCutoverReasonPrefix, runID, locationName, cutoverDate.Format("2006-01-02"))
}
func buildAdjustmentRunReasonMatcher(runID string) string {
return fmt.Sprintf("%s|run_id=%s|%%", adjustmentCutoverReasonPrefix, strings.TrimSpace(runID))
}
func buildAdjustmentRunID() string {
return fmt.Sprintf("egg-adjustment-cutover-%s", time.Now().UTC().Format("20060102T150405.000000000Z"))
}
func normalizeAdjustmentDateOnly(value time.Time) time.Time {
return time.Date(value.Year(), value.Month(), value.Day(), 0, 0, 0, 0, time.UTC)
}
func normalizeAdjustmentDatePtr(value *time.Time) *time.Time {
if value == nil {
return nil
}
normalized := normalizeAdjustmentDateOnly(*value)
return &normalized
}
func parseAdjustmentIDs(raw string) []uint {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
parts := strings.Split(raw, ",")
out := make([]uint, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
value, err := strconv.ParseUint(part, 10, 64)
if err != nil || value == 0 {
continue
}
out = append(out, uint(value))
}
return out
}
func derefAdjustmentString(value *string) string {
if value == nil {
return ""
}
return *value
}
func summarizeAdjustmentApply(rows []adjustmentMigrationReportRow, groups []adjustmentTransferGroup, appliedGroups int) adjustmentApplySummary {
summary := adjustmentApplySummary{
GroupsPlanned: len(groups),
GroupsApplied: appliedGroups,
}
for _, row := range rows {
switch row.Status {
case "eligible":
summary.RowsPlanned++
case "applied":
summary.RowsPlanned++
summary.RowsApplied++
case "failed":
summary.RowsPlanned++
summary.RowsFailed++
case "skipped":
summary.RowsSkipped++
}
}
return summary
}
func flattenAdjustmentGroups(groups []adjustmentTransferGroup, fallback []adjustmentMigrationReportRow) []adjustmentMigrationReportRow {
if len(groups) == 0 {
return fallback
}
rows := make([]adjustmentMigrationReportRow, 0, len(fallback))
for _, group := range groups {
for _, row := range group.Rows {
rows = append(rows, *row)
}
}
for _, row := range fallback {
if row.Status == "skipped" {
rows = append(rows, row)
}
}
sort.Slice(rows, func(i, j int) bool {
if rows[i].LocationName == rows[j].LocationName {
if rows[i].SourceWarehouseName == rows[j].SourceWarehouseName {
return rows[i].ProductName < rows[j].ProductName
}
return rows[i].SourceWarehouseName < rows[j].SourceWarehouseName
}
return rows[i].LocationName < rows[j].LocationName
})
return rows
}
func renderAdjustmentMigrationReport(mode string, rows []adjustmentMigrationReportRow, summary adjustmentApplySummary) {
if mode == adjustmentOutputModeJSON {
payload := map[string]any{
"rows": rows,
"summary": summary,
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(payload)
return
}
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
fmt.Fprintln(w, "RUN_ID\tLOCATION\tSOURCE_WAREHOUSE\tFARM_WAREHOUSE\tPRODUCT\tREMAINING_QTY\tCURRENT_PW_QTY\tVERIFIED_QTY\tLOCATION_STATUS\tSTATUS\tREASON\tTRANSFER_ID\tMOVEMENT_NUMBER")
for _, row := range rows {
transferID := "-"
if row.TransferID != nil {
transferID = fmt.Sprintf("%d", *row.TransferID)
}
movementNumber := "-"
if row.MovementNumber != nil {
movementNumber = *row.MovementNumber
}
fmt.Fprintf(
w,
"%s\t%s\t%s\t%s\t%s\t%.3f\t%.3f\t%.3f\t%s\t%s\t%s\t%s\t%s\n",
row.RunID,
row.LocationName,
row.SourceWarehouseName,
derefAdjustmentString(row.FarmWarehouseName),
row.ProductName,
row.RemainingQty,
row.CurrentPWQty,
row.VerifiedQty,
row.LocationStatus,
row.Status,
row.Reason,
transferID,
movementNumber,
)
}
_ = w.Flush()
fmt.Printf(
"\nSUMMARY: rows_planned=%d rows_applied=%d rows_skipped=%d rows_failed=%d groups_planned=%d groups_applied=%d\n",
summary.RowsPlanned,
summary.RowsApplied,
summary.RowsSkipped,
summary.RowsFailed,
summary.GroupsPlanned,
summary.GroupsApplied,
)
}
func renderAdjustmentRollbackReport(mode string, rows []adjustmentRollbackDetailRow) {
if mode == adjustmentOutputModeJSON {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
_ = enc.Encode(map[string]any{"rows": rows})
return
}
w := tabwriter.NewWriter(os.Stdout, 2, 8, 2, ' ', 0)
fmt.Fprintln(w, "RUN_ID\tTRANSFER_ID\tMOVEMENT_NUMBER\tLOCATION\tSOURCE_WAREHOUSE\tFARM_WAREHOUSE\tPRODUCT\tQTY\tSTATUS\tREASON")
for _, row := range rows {
fmt.Fprintf(
w,
"%s\t%d\t%s\t%s\t%s\t%s\t%s\t%.3f\t%s\t%s\n",
row.RunID,
row.TransferID,
row.MovementNumber,
row.LocationName,
row.SourceWarehouseName,
row.FarmWarehouseName,
row.ProductName,
row.Qty,
row.Status,
row.Reason,
)
}
_ = w.Flush()
}