diff --git a/cmd/run/dealpusher.go b/cmd/run/dealpusher.go index d810f3c9..e5da075c 100644 --- a/cmd/run/dealpusher.go +++ b/cmd/run/dealpusher.go @@ -1,6 +1,8 @@ package run import ( + "time" + "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/service" @@ -25,6 +27,26 @@ var DealPusherCmd = &cli.Command{ Aliases: []string{"M"}, DefaultText: "Unlimited", }, + &cli.IntFlag{ + Name: "pdp-batch-size", + Usage: "Number of roots to include in each PDP add-roots transaction", + Value: 128, + }, + &cli.Uint64Flag{ + Name: "pdp-gas-limit", + Usage: "Gas limit for PDP on-chain transactions", + Value: 5000000, + }, + &cli.Uint64Flag{ + Name: "pdp-confirmation-depth", + Usage: "Number of block confirmations required for PDP transactions", + Value: 5, + }, + &cli.DurationFlag{ + Name: "pdp-poll-interval", + Usage: "Polling interval for PDP transaction confirmation checks", + Value: 30 * time.Second, + }, }, Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) @@ -39,7 +61,24 @@ var DealPusherCmd = &cli.Command{ return errors.WithStack(err) } - dm, err := dealpusher.NewDealPusher(db, c.String("lotus-api"), c.String("lotus-token"), c.Uint("deal-attempts"), c.Uint("max-replication-factor")) + pdpCfg := dealpusher.PDPSchedulingConfig{ + BatchSize: c.Int("pdp-batch-size"), + GasLimit: c.Uint64("pdp-gas-limit"), + ConfirmationDepth: c.Uint64("pdp-confirmation-depth"), + PollingInterval: c.Duration("pdp-poll-interval"), + } + if err := pdpCfg.Validate(); err != nil { + return errors.WithStack(err) + } + + dm, err := dealpusher.NewDealPusher( + db, + c.String("lotus-api"), + c.String("lotus-token"), + c.Uint("deal-attempts"), + c.Uint("max-replication-factor"), + dealpusher.WithPDPSchedulingConfig(pdpCfg), + ) if err != nil { return errors.WithStack(err) } diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index ecc89798..22caa830 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -35,11 +35,12 @@ var waitPendingInterval = time.Minute // DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process. type DealPusher struct { - dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. - walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication. - dealMaker replication.DealMaker // Object responsible for making a deal in replication. - pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager. - pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer. + dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. + walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication. + dealMaker replication.DealMaker // Object responsible for making a deal in replication. + pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager. + pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer. + pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config used for root batching and tx confirmation behavior. // Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage. scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType workerID uuid.UUID // UUID identifying the associated worker. @@ -433,20 +434,13 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) func (d *DealPusher) resolveScheduleDealType(schedule *model.Schedule) model.DealType { if d.scheduleDealTypeResolver == nil { - return model.DealTypeMarket + return inferScheduleDealType(schedule) } return d.scheduleDealTypeResolver(schedule) } -func (d *DealPusher) runPDPSchedule(_ context.Context, _ *model.Schedule) (model.ScheduleState, error) { - if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { - return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") - } - return model.ScheduleError, errors.New("pdp scheduling path is not implemented") -} - func NewDealPusher(db *gorm.DB, lotusURL string, - lotusToken string, numAttempts uint, maxReplicas uint, + lotusToken string, numAttempts uint, maxReplicas uint, opts ...Option, ) (*DealPusher, error) { if numAttempts <= 1 { numAttempts = 1 @@ -460,20 +454,25 @@ func NewDealPusher(db *gorm.DB, lotusURL string, if err != nil { return nil, errors.Wrap(err, "failed to init deal maker") } - return &DealPusher{ + dealPusher := &DealPusher{ dbNoContext: db, activeScheduleCancelFunc: make(map[model.ScheduleID]context.CancelFunc), activeSchedule: make(map[model.ScheduleID]*model.Schedule), cronEntries: make(map[model.ScheduleID]cron.EntryID), walletChooser: &replication.RandomWalletChooser{}, dealMaker: dealMaker, + pdpSchedulingConfig: defaultPDPSchedulingConfig(), workerID: uuid.New(), cron: cron.New(cron.WithLogger(&cronLogger{}), cron.WithLocation(time.UTC), cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))), sendDealAttempts: numAttempts, host: h, maxReplicas: maxReplicas, - }, nil + } + for _, opt := range opts { + opt(dealPusher) + } + return dealPusher, nil } // runOnce is a method of the DealPusher type that runs a single iteration of the deal pushing logic. diff --git a/service/dealpusher/options.go b/service/dealpusher/options.go new file mode 100644 index 00000000..807b0d20 --- /dev/null +++ b/service/dealpusher/options.go @@ -0,0 +1,34 @@ +package dealpusher + +import "github.com/data-preservation-programs/singularity/model" + +// Option customizes DealPusher initialization. +type Option func(*DealPusher) + +// WithPDPProofSetManager sets the PDP proof set manager dependency. +func WithPDPProofSetManager(manager PDPProofSetManager) Option { + return func(d *DealPusher) { + d.pdpProofSetManager = manager + } +} + +// WithPDPTransactionConfirmer sets the PDP transaction confirmer dependency. +func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option { + return func(d *DealPusher) { + d.pdpTxConfirmer = confirmer + } +} + +// WithPDPSchedulingConfig overrides PDP scheduling configuration. +func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option { + return func(d *DealPusher) { + d.pdpSchedulingConfig = cfg + } +} + +// WithScheduleDealTypeResolver overrides schedule deal type resolution logic. +func WithScheduleDealTypeResolver(resolver func(schedule *model.Schedule) model.DealType) Option { + return func(d *DealPusher) { + d.scheduleDealTypeResolver = resolver + } +} diff --git a/service/dealpusher/pdp_schedule.go b/service/dealpusher/pdp_schedule.go new file mode 100644 index 00000000..6f43c72a --- /dev/null +++ b/service/dealpusher/pdp_schedule.go @@ -0,0 +1,298 @@ +package dealpusher + +import ( + "context" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util" + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + "github.com/rjNemo/underscore" + "gorm.io/gorm" +) + +func defaultPDPSchedulingConfig() PDPSchedulingConfig { + return PDPSchedulingConfig{ + BatchSize: 128, + GasLimit: 5_000_000, + ConfirmationDepth: 5, + PollingInterval: 30 * time.Second, + } +} + +func inferScheduleDealType(schedule *model.Schedule) model.DealType { + if schedule == nil { + return model.DealTypeMarket + } + providerAddr, err := address.NewFromString(schedule.Provider) + if err != nil { + return model.DealTypeMarket + } + if providerAddr.Protocol() == address.Delegated { + return model.DealTypePDP + } + return model.DealTypeMarket +} + +func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) { + if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { + return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") + } + if err := d.pdpSchedulingConfig.Validate(); err != nil { + return model.ScheduleError, errors.Wrap(err, "invalid PDP scheduling configuration") + } + + db := d.dbNoContext.WithContext(ctx) + var attachments []model.SourceAttachment + if err := db.Model(&model.SourceAttachment{}). + Where("preparation_id = ?", schedule.PreparationID). + Find(&attachments).Error; err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to find attachments") + } + + allowedPieceCIDs := make([]model.CID, 0, len(schedule.AllowedPieceCIDs)) + for _, c := range schedule.AllowedPieceCIDs { + parsed, err := cid.Parse(c) + if err != nil { + return model.ScheduleError, errors.Wrapf(err, "failed to parse CID %s", c) + } + allowedPieceCIDs = append(allowedPieceCIDs, model.CID(parsed)) + } + + overReplicatedCIDs := db. + Table("deals"). + Select("piece_cid"). + Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}). + Group("piece_cid"). + Having("count(*) >= ?", d.maxReplicas) + + var timer *time.Timer + current := sumResult{} + for { + if ctx.Err() != nil { + return "", nil + } + + pending, total, err := d.getPDPScheduleCounts(ctx, schedule) + if err != nil { + return model.ScheduleError, err + } + + shouldWait := false + if schedule.MaxPendingDealNumber > 0 && pending.DealNumber >= schedule.MaxPendingDealNumber { + shouldWait = true + } + if schedule.MaxPendingDealSize > 0 && pending.DealSize >= schedule.MaxPendingDealSize { + shouldWait = true + } + if shouldWait { + if timer == nil { + timer = time.NewTimer(waitPendingInterval) + defer timer.Stop() + } else { + timer.Reset(waitPendingInterval) + } + select { + case <-ctx.Done(): + return "", nil + case <-timer.C: + } + continue + } + if schedule.TotalDealNumber > 0 && total.DealNumber >= schedule.TotalDealNumber { + return model.ScheduleCompleted, nil + } + if schedule.TotalDealSize > 0 && total.DealSize >= schedule.TotalDealSize { + return model.ScheduleCompleted, nil + } + if schedule.ScheduleCron != "" && schedule.ScheduleDealNumber > 0 && current.DealNumber >= schedule.ScheduleDealNumber { + return "", nil + } + if schedule.ScheduleCron != "" && schedule.ScheduleDealSize > 0 && current.DealSize >= schedule.ScheduleDealSize { + return "", nil + } + + cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { + return "", nil + } + return model.ScheduleCompleted, nil + } + return model.ScheduleError, err + } + if len(cars) == 0 { + if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { + return "", nil + } + return model.ScheduleCompleted, nil + } + + walletObj, err := d.walletChooser.Choose(ctx, schedule.Preparation.Wallets) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to choose wallet") + } + + proofSetID, err := d.pdpProofSetManager.EnsureProofSet(ctx, walletObj, schedule.Provider) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to ensure PDP proof set") + } + + pieceCIDs := make([]cid.Cid, 0, len(cars)) + for _, car := range cars { + pieceCIDs = append(pieceCIDs, cid.Cid(car.PieceCID)) + } + queuedTx, err := d.pdpProofSetManager.QueueAddRoots(ctx, proofSetID, pieceCIDs, d.pdpSchedulingConfig) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to queue PDP root addition transaction") + } + + _, err = d.pdpTxConfirmer.WaitForConfirmations(ctx, queuedTx.Hash, d.pdpSchedulingConfig.ConfirmationDepth, d.pdpSchedulingConfig.PollingInterval) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation") + } + + for _, car := range cars { + proofSetIDCopy := proofSetID + dealModel := &model.Deal{ + State: model.DealProposed, + DealType: model.DealTypePDP, + Provider: schedule.Provider, + PieceCID: car.PieceCID, + PieceSize: car.PieceSize, + Verified: schedule.Verified, + ScheduleID: &schedule.ID, + ClientID: walletObj.ID, + ProofSetID: &proofSetIDCopy, + } + + if err := database.DoRetry(ctx, func() error { return db.Create(dealModel).Error }); err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to create PDP deal") + } + current.DealNumber++ + current.DealSize += car.PieceSize + } + continue + } +} + +func (d *DealPusher) getPDPScheduleCounts(ctx context.Context, schedule *model.Schedule) (sumResult, sumResult, error) { + db := d.dbNoContext.WithContext(ctx) + var pending sumResult + err := db.Model(&model.Deal{}). + Where("schedule_id = ? AND deal_type = ? AND state IN (?)", schedule.ID, model.DealTypePDP, []model.DealState{ + model.DealProposed, model.DealPublished, + }). + Select("COUNT(*) AS deal_number, SUM(piece_size) AS deal_size"). + Scan(&pending).Error + if err != nil { + return sumResult{}, sumResult{}, errors.Wrap(err, "failed to count pending PDP deals") + } + + var total sumResult + err = db.Model(&model.Deal{}). + Where("schedule_id = ? AND deal_type = ? AND state IN (?)", schedule.ID, model.DealTypePDP, []model.DealState{ + model.DealActive, model.DealProposed, model.DealPublished, + }). + Select("COUNT(*) AS deal_number, SUM(piece_size) AS deal_size"). + Scan(&total).Error + if err != nil { + return sumResult{}, sumResult{}, errors.Wrap(err, "failed to count total PDP deals") + } + + return pending, total, nil +} + +func (d *DealPusher) findPDPCars( + ctx context.Context, + schedule *model.Schedule, + attachments []model.SourceAttachment, + allowedPieceCIDs []model.CID, + overReplicatedCIDs *gorm.DB, + limit int, +) ([]model.Car, error) { + db := d.dbNoContext.WithContext(ctx) + attachmentIDs := underscore.Map(attachments, func(a model.SourceAttachment) uint32 { return uint32(a.ID) }) + existingPieceCIDQuery := db.Table("deals").Select("piece_cid"). + Where("provider = ? AND deal_type = ? AND state IN (?)", + schedule.Provider, + model.DealTypePDP, + []model.DealState{ + model.DealProposed, model.DealPublished, model.DealActive, + }). + Where("piece_cid IS NOT NULL") + if schedule.Force { + existingPieceCIDQuery = db.Table("deals").Select("piece_cid"). + Where("schedule_id = ? AND deal_type = ?", schedule.ID, model.DealTypePDP). + Where("piece_cid IS NOT NULL") + } + + var existingPieceCIDs []model.CID + if err := existingPieceCIDQuery.Find(&existingPieceCIDs).Error; err != nil { + return nil, errors.Wrap(err, "failed to query existing PDP piece CIDs") + } + existingSet := make(map[string]struct{}, len(existingPieceCIDs)) + for _, existing := range existingPieceCIDs { + existingSet[cid.Cid(existing).String()] = struct{}{} + } + + baseQuery := func() *gorm.DB { + query := db.Where("attachment_id IN ?", attachmentIDs) + if d.maxReplicas > 0 && !schedule.Force { + query = query.Where("piece_cid NOT IN (?)", overReplicatedCIDs) + } + return query + } + + if len(allowedPieceCIDs) == 0 { + var cars []model.Car + if err := baseQuery().Find(&cars).Error; err != nil { + return nil, errors.Wrap(err, "failed to find PDP cars") + } + filtered := make([]model.Car, 0, limit) + for _, car := range cars { + if _, exists := existingSet[cid.Cid(car.PieceCID).String()]; exists { + continue + } + filtered = append(filtered, car) + if len(filtered) >= limit { + break + } + } + if len(filtered) == 0 { + return nil, gorm.ErrRecordNotFound + } + return filtered, nil + } + + cars := make([]model.Car, 0, limit) + pieceCIDChunks := util.ChunkSlice(allowedPieceCIDs, util.BatchSize) + for _, pieceCIDChunk := range pieceCIDChunks { + if len(cars) >= limit { + break + } + var chunkCars []model.Car + if err := baseQuery(). + Where("piece_cid IN ?", pieceCIDChunk). + Find(&chunkCars).Error; err != nil { + return nil, errors.Wrap(err, "failed to find PDP cars by allowed piece CID") + } + for _, car := range chunkCars { + if _, exists := existingSet[cid.Cid(car.PieceCID).String()]; exists { + continue + } + cars = append(cars, car) + if len(cars) >= limit { + break + } + } + } + if len(cars) == 0 { + return nil, gorm.ErrRecordNotFound + } + return cars, nil +} diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go index 5a7dbaa5..c7607a55 100644 --- a/service/dealpusher/pdp_wiring_test.go +++ b/service/dealpusher/pdp_wiring_test.go @@ -6,23 +6,41 @@ import ( "time" "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + "gorm.io/gorm" ) -type noopPDPProofSetManager struct{} +type fixedWalletChooser struct { + wallet model.Wallet +} + +func (c fixedWalletChooser) Choose(_ context.Context, _ []model.Wallet) (model.Wallet, error) { + return c.wallet, nil +} -func (noopPDPProofSetManager) EnsureProofSet(_ context.Context, _ model.Wallet, _ string) (uint64, error) { - return 1, nil +type proofSetManagerMock struct { + proofSetID uint64 + pieceCIDs []cid.Cid } -func (noopPDPProofSetManager) QueueAddRoots(_ context.Context, _ uint64, _ []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { - return &PDPQueuedTx{Hash: "0x1"}, nil +func (m *proofSetManagerMock) EnsureProofSet(_ context.Context, _ model.Wallet, _ string) (uint64, error) { + return m.proofSetID, nil } -type noopPDPTransactionConfirmer struct{} +func (m *proofSetManagerMock) QueueAddRoots(_ context.Context, _ uint64, pieceCIDs []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { + m.pieceCIDs = append([]cid.Cid(nil), pieceCIDs...) + return &PDPQueuedTx{Hash: "0xabc"}, nil +} + +type txConfirmerMock struct { + txHash string +} -func (noopPDPTransactionConfirmer) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { +func (m *txConfirmerMock) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { + m.txHash = txHash return &PDPTransactionReceipt{Hash: txHash}, nil } @@ -31,6 +49,15 @@ func TestDealPusher_ResolveScheduleDealType_DefaultsToMarket(t *testing.T) { require.Equal(t, model.DealTypeMarket, d.resolveScheduleDealType(&model.Schedule{})) } +func TestDealPusher_ResolveScheduleDealType_DefaultsToPDPForDelegatedProvider(t *testing.T) { + subaddr := make([]byte, 20) + subaddr[19] = 1 + providerAddr, err := address.NewDelegatedAddress(10, subaddr) + require.NoError(t, err) + d := &DealPusher{} + require.Equal(t, model.DealTypePDP, d.resolveScheduleDealType(&model.Schedule{Provider: providerAddr.String()})) +} + func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t *testing.T) { d := &DealPusher{ scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { @@ -44,17 +71,89 @@ func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t * require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured") } -func TestDealPusher_RunSchedule_PDPWithDependenciesReturnsNotImplemented(t *testing.T) { - d := &DealPusher{ - pdpProofSetManager: noopPDPProofSetManager{}, - pdpTxConfirmer: noopPDPTransactionConfirmer{}, - scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { - return model.DealTypePDP - }, - } +func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + clientSubaddr := make([]byte, 20) + clientSubaddr[19] = 10 + clientAddr, err := address.NewDelegatedAddress(10, clientSubaddr) + require.NoError(t, err) + providerSubaddr := make([]byte, 20) + providerSubaddr[19] = 20 + providerAddr, err := address.NewDelegatedAddress(10, providerSubaddr) + require.NoError(t, err) - state, err := d.runSchedule(context.Background(), &model.Schedule{}) - require.Error(t, err) - require.Equal(t, model.ScheduleError, state) - require.Contains(t, err.Error(), "pdp scheduling path is not implemented") + prep := model.Preparation{Name: "prep"} + require.NoError(t, db.Create(&prep).Error) + require.NotZero(t, prep.ID) + wallet := model.Wallet{ + ID: clientAddr.String(), + Address: clientAddr.String(), + PrivateKey: testutil.TestPrivateKeyHex, + } + require.NoError(t, db.Create(&wallet).Error) + require.NoError(t, db.Model(&prep).Association("Wallets").Append(&wallet)) + storage := model.Storage{Name: "src-storage"} + require.NoError(t, db.Create(&storage).Error) + require.NotZero(t, storage.ID) + attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID} + require.NoError(t, db.Create(&attachment).Error) + require.NotZero(t, attachment.ID) + + pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024)) + car := model.Car{ + AttachmentID: &attachment.ID, + PreparationID: &prep.ID, + PieceCID: pieceCID, + PieceSize: 1024, + StoragePath: "car-1", + } + require.NoError(t, db.Create(&car).Error) + + schedule := model.Schedule{ + PreparationID: prep.ID, + State: model.ScheduleActive, + Provider: providerAddr.String(), + TotalDealNumber: 1, + } + require.NoError(t, db.Create(&schedule).Error) + schedule.Preparation = &model.Preparation{Wallets: []model.Wallet{wallet}} + + psm := &proofSetManagerMock{proofSetID: 42} + conf := &txConfirmerMock{} + d := &DealPusher{ + dbNoContext: db, + walletChooser: fixedWalletChooser{wallet: wallet}, + pdpProofSetManager: psm, + pdpTxConfirmer: conf, + pdpSchedulingConfig: defaultPDPSchedulingConfig(), + scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP }, + } + var attachments []model.SourceAttachment + require.NoError(t, db.Where("preparation_id = ?", schedule.PreparationID).Find(&attachments).Error) + require.Len(t, attachments, 1) + overReplicatedCIDs := db. + Table("deals"). + Select("piece_cid"). + Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}). + Group("piece_cid"). + Having("count(*) >= ?", d.maxReplicas) + cars, err := d.findPDPCars(ctx, &schedule, attachments, nil, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize) + require.NoError(t, err) + require.Len(t, cars, 1) + + state, err := d.runSchedule(ctx, &schedule) + require.NoError(t, err) + require.Equal(t, model.ScheduleCompleted, state) + require.Equal(t, "0xabc", conf.txHash) + require.Len(t, psm.pieceCIDs, 1) + require.Equal(t, cid.Cid(pieceCID), psm.pieceCIDs[0]) + + var deals []model.Deal + require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error) + require.Len(t, deals, 1) + require.Equal(t, model.DealTypePDP, deals[0].DealType) + require.Equal(t, model.DealProposed, deals[0].State) + require.NotNil(t, deals[0].ProofSetID) + require.Equal(t, uint64(42), *deals[0].ProofSetID) + }) }