package main
import (
"fmt"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/tasks"
"os"
"time"
)
func main() {
if len(os.Args) == 2 && os.Args[1] == "worker" {
if err := worker(); err != nil {
panic(err)
}
}
TestPeriodicTask()
TestAdd()
time.Sleep(time.Second * 1000)
}
func TestAdd() {
server, _ := startServer()
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 4,
},
{
Type: "int64",
Value: 1,
},
},
}
asyncResult, _ := server.SendTask(signature)
results,_ := asyncResult.Get(time.Millisecond * 5)
for _, result := range results {
fmt.Println(result.Interface())
}
}
func TestPeriodicTask() {
server, _ := startServer()
signature := &tasks.Signature{
Name: "periodicTask",
Args: []tasks.Arg{
},
}
err := server.RegisterPeriodicTask("*/1 * * * ?", "periodic-task", signature)
if err != nil {
fmt.Println(err)
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult)
}
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 0)
backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 0)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
tasksMap := map[string]interface{}{
"add": Add,
"periodicTask": PeriodicTask,
}
return server, server.RegisterTasks(tasksMap)
}
func worker() error {
consumerTag := "machinery_worker"
server, err := startServer()
if err != nil {
return err
}
worker := server.NewWorker(consumerTag, 0)
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}
worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
func Add(args ...int64) (int64, error) {
println("############# 执行Add方法 #############")
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
func PeriodicTask() error {
fmt.Println("################ 执行周期任务PeriodicTask #################")
return nil
}