Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion cmd/run/dealpusher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package run

import (
"time"

"github.com/cockroachdb/errors"
"github.com/data-preservation-programs/singularity/database"
"github.com/data-preservation-programs/singularity/service"
Expand All @@ -25,6 +27,26 @@ var DealPusherCmd = &cli.Command{
Aliases: []string{"M"},
DefaultText: "Unlimited",
},
&cli.IntFlag{
Name: "pdp-batch-size",
Usage: "Number of roots to include in each PDP add-roots transaction",
Value: 128,
},
&cli.Uint64Flag{
Name: "pdp-gas-limit",
Usage: "Gas limit for PDP on-chain transactions",
Value: 5000000,
},
&cli.Uint64Flag{
Name: "pdp-confirmation-depth",
Usage: "Number of block confirmations required for PDP transactions",
Value: 5,
},
&cli.DurationFlag{
Name: "pdp-poll-interval",
Usage: "Polling interval for PDP transaction confirmation checks",
Value: 30 * time.Second,
},
},
Action: func(c *cli.Context) error {
db, closer, err := database.OpenFromCLI(c)
Expand All @@ -39,7 +61,24 @@ var DealPusherCmd = &cli.Command{
return errors.WithStack(err)
}

dm, err := dealpusher.NewDealPusher(db, c.String("lotus-api"), c.String("lotus-token"), c.Uint("deal-attempts"), c.Uint("max-replication-factor"))
pdpCfg := dealpusher.PDPSchedulingConfig{
BatchSize: c.Int("pdp-batch-size"),
GasLimit: c.Uint64("pdp-gas-limit"),
ConfirmationDepth: c.Uint64("pdp-confirmation-depth"),
PollingInterval: c.Duration("pdp-poll-interval"),
}
if err := pdpCfg.Validate(); err != nil {
return errors.WithStack(err)
}

dm, err := dealpusher.NewDealPusher(
db,
c.String("lotus-api"),
c.String("lotus-token"),
c.Uint("deal-attempts"),
c.Uint("max-replication-factor"),
dealpusher.WithPDPSchedulingConfig(pdpCfg),
)
if err != nil {
return errors.WithStack(err)
}
Expand Down
31 changes: 15 additions & 16 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ var waitPendingInterval = time.Minute

// DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process.
type DealPusher struct {
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication.
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication.
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config used for root batching and tx confirmation behavior.
// Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage.
scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType
workerID uuid.UUID // UUID identifying the associated worker.
Expand Down Expand Up @@ -433,20 +434,13 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)

func (d *DealPusher) resolveScheduleDealType(schedule *model.Schedule) model.DealType {
if d.scheduleDealTypeResolver == nil {
return model.DealTypeMarket
return inferScheduleDealType(schedule)
}
return d.scheduleDealTypeResolver(schedule)
}

func (d *DealPusher) runPDPSchedule(_ context.Context, _ *model.Schedule) (model.ScheduleState, error) {
if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil {
return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured")
}
return model.ScheduleError, errors.New("pdp scheduling path is not implemented")
}

func NewDealPusher(db *gorm.DB, lotusURL string,
lotusToken string, numAttempts uint, maxReplicas uint,
lotusToken string, numAttempts uint, maxReplicas uint, opts ...Option,
) (*DealPusher, error) {
if numAttempts <= 1 {
numAttempts = 1
Expand All @@ -460,20 +454,25 @@ func NewDealPusher(db *gorm.DB, lotusURL string,
if err != nil {
return nil, errors.Wrap(err, "failed to init deal maker")
}
return &DealPusher{
dealPusher := &DealPusher{
dbNoContext: db,
activeScheduleCancelFunc: make(map[model.ScheduleID]context.CancelFunc),
activeSchedule: make(map[model.ScheduleID]*model.Schedule),
cronEntries: make(map[model.ScheduleID]cron.EntryID),
walletChooser: &replication.RandomWalletChooser{},
dealMaker: dealMaker,
pdpSchedulingConfig: defaultPDPSchedulingConfig(),
workerID: uuid.New(),
cron: cron.New(cron.WithLogger(&cronLogger{}), cron.WithLocation(time.UTC),
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))),
sendDealAttempts: numAttempts,
host: h,
maxReplicas: maxReplicas,
}, nil
}
for _, opt := range opts {
opt(dealPusher)
}
return dealPusher, nil
}

// runOnce is a method of the DealPusher type that runs a single iteration of the deal pushing logic.
Expand Down
34 changes: 34 additions & 0 deletions service/dealpusher/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dealpusher

import "github.com/data-preservation-programs/singularity/model"

// Option customizes DealPusher initialization.
type Option func(*DealPusher)

// WithPDPProofSetManager sets the PDP proof set manager dependency.
func WithPDPProofSetManager(manager PDPProofSetManager) Option {
return func(d *DealPusher) {
d.pdpProofSetManager = manager
}
}

// WithPDPTransactionConfirmer sets the PDP transaction confirmer dependency.
func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option {
return func(d *DealPusher) {
d.pdpTxConfirmer = confirmer
}
}

// WithPDPSchedulingConfig overrides PDP scheduling configuration.
func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option {
return func(d *DealPusher) {
d.pdpSchedulingConfig = cfg
}
}

// WithScheduleDealTypeResolver overrides schedule deal type resolution logic.
func WithScheduleDealTypeResolver(resolver func(schedule *model.Schedule) model.DealType) Option {
return func(d *DealPusher) {
d.scheduleDealTypeResolver = resolver
}
}
Loading
Loading