package tkasynq import ( "encoding/json" "fmt" "time" "github.com/google/uuid" "github.com/hibiken/asynq" ) type TaskInfo struct { // ID is the identifier of the task. ID string // Queue is the name of the queue in which the task belongs. Queue string // Type is the type name of the task. Type string // Payload is the payload data of the task. Payload []byte } type AsynqClient struct { client *asynq.Client inspector *asynq.Inspector } const ( // Default max retry count used if nothing is specified. defaultMaxRetry = 25 // Default timeout used if both timeout and deadline are not specified. defaultTimeout = 30 * time.Minute ) type OptionType int const ( MaxRetryOpt OptionType = iota QueueOpt TimeoutOpt ProcessAtOpt ProcessInOpt TaskIDOpt ) type ( retryOption int queueOption string taskIDOption string timeoutOption time.Duration processAtOption time.Time processInOption time.Duration ) // Option specifies the task processing behavior. type Option interface { // String returns a string representation of the option. String() string // Type describes the type of the option. Type() OptionType // Value returns a value used to create this option. Value() interface{} } // MaxRetry returns an option to specify the max number of times // the task will be retried. // // Negative retry count is treated as zero retry. func MaxRetry(n int) Option { if n < 0 { n = 0 } return retryOption(n) } func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) } func (n retryOption) Type() OptionType { return MaxRetryOpt } func (n retryOption) Value() interface{} { return int(n) } // Timeout returns an option to specify how long a task may run. // If the timeout elapses before the Handler returns, then the task // will be retried. // // Zero duration means no limit. // // If there's a conflicting Deadline option, whichever comes earliest // will be used. func Timeout(d time.Duration) Option { return timeoutOption(d) } func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) } func (d timeoutOption) Type() OptionType { return TimeoutOpt } func (d timeoutOption) Value() interface{} { return time.Duration(d) } // Queue returns an option to specify the queue to enqueue the task into. func Queue(name string) Option { return queueOption(name) } func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) } func (name queueOption) Type() OptionType { return QueueOpt } func (name queueOption) Value() interface{} { return string(name) } // ProcessAt returns an option to specify when to process the given task. // // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. func ProcessAt(t time.Time) Option { return processAtOption(t) } func (t processAtOption) String() string { return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate)) } func (t processAtOption) Type() OptionType { return ProcessAtOpt } func (t processAtOption) Value() interface{} { return time.Time(t) } // ProcessIn returns an option to specify when to process the given task relative to the current time. // // If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others. func ProcessIn(d time.Duration) Option { return processInOption(d) } func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) } func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Value() interface{} { return time.Duration(d) } // TaskID returns an option to specify the task ID. func TaskID(id string) Option { return taskIDOption(id) } func (id taskIDOption) String() string { return fmt.Sprintf("TaskID(%q)", string(id)) } func (id taskIDOption) Type() OptionType { return TaskIDOpt } func (id taskIDOption) Value() interface{} { return string(id) } // NewClient 初始化client func NewClient(client *asynq.Client, clientOpt *asynq.RedisClientOpt) *AsynqClient { return &AsynqClient{ client: client, inspector: asynq.NewInspector(clientOpt), } } type option struct { retry int queue string timeout time.Duration processAt time.Time taskId string } func (y *AsynqClient) composeOptions(opts ...Option) option { res := option{ retry: defaultMaxRetry, queue: "default", timeout: 0, // do not set to deafultTimeout here processAt: time.Now(), taskId: uuid.NewString(), } for _, opt := range opts { switch opt := opt.(type) { case retryOption: res.retry = int(opt) case queueOption: res.queue = string(opt) case timeoutOption: res.timeout = time.Duration(opt) case processAtOption: res.processAt = time.Time(opt) case processInOption: res.processAt = time.Now().Add(time.Duration(opt)) case taskIDOption: res.taskId = string(opt) fmt.Println("here-taksId: ", res.taskId) } } return res } // EnqueueTask 支持及时任务,支持时间点任务,多少时间后任务 func (y *AsynqClient) EnqueueTask(jobTag string, v interface{}, queue string, opts ...Option) (*TaskInfo, error) { payload, err := json.Marshal(v) if err != nil { return nil, err } task := asynq.NewTask(jobTag, payload) opt := y.composeOptions(opts...) info, err := y.client.Enqueue( task, asynq.Queue(queue), asynq.MaxRetry(opt.retry), asynq.Timeout(time.Duration(opt.timeout)*time.Second), asynq.ProcessAt(opt.processAt), asynq.TaskID(opt.taskId), ) if err != nil { return nil, err } return &TaskInfo{ ID: info.ID, Queue: info.Queue, Type: info.Type, Payload: info.Payload, }, nil } // RunTask 执行对应taskId任务 func (y *AsynqClient) RunTask(queue, id string) error { return y.inspector.RunTask(queue, id) } // DeleteTask 删除对应taskId任务 func (y *AsynqClient) DeleteTask(queue, id string) error { return y.inspector.DeleteTask(queue, id) }