Move payload parsing to the listener

This commit is contained in:
kolaente 2021-01-31 15:28:13 +01:00
parent 7a00cb1f6a
commit 124cc5e54f
Signed by: konrad
GPG Key ID: F40E70337AB24C9B
3 changed files with 21 additions and 11 deletions

View File

@ -19,11 +19,12 @@ package events
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
"github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"time"
) )
var ( var (
@ -64,12 +65,7 @@ func InitEvents() (err error) {
for topic, funcs := range listeners { for topic, funcs := range listeners {
for _, handler := range funcs { for _, handler := range funcs {
router.AddNoPublisherHandler(topic+watermill.NewShortUUID(), topic, pubsub, func(msg *message.Message) error { router.AddNoPublisherHandler(topic+watermill.NewShortUUID(), topic, pubsub, func(msg *message.Message) error {
var payload interface{} return handler.Handle(msg.Payload)
err = json.Unmarshal(msg.Payload, payload)
if err != nil {
return err
}
return handler.Handle(payload)
}) })
} }
} }

View File

@ -16,8 +16,10 @@
package events package events
import "github.com/ThreeDotsLabs/watermill/message"
type Listener interface { type Listener interface {
Handle(payload interface{}) error Handle(payload message.Payload) error
} }
var listeners map[string][]Listener var listeners map[string][]Listener

View File

@ -16,7 +16,12 @@
package models package models
import "code.vikunja.io/api/pkg/events" import (
"code.vikunja.io/api/pkg/events"
"code.vikunja.io/api/pkg/log"
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
)
func RegisterListeners() { func RegisterListeners() {
events.RegisterListener((&TaskCreatedEvent{}).TopicName(), &SendTaskCreatedNotification{}) events.RegisterListener((&TaskCreatedEvent{}).TopicName(), &SendTaskCreatedNotification{})
@ -25,6 +30,13 @@ func RegisterListeners() {
type SendTaskCreatedNotification struct { type SendTaskCreatedNotification struct {
} }
func (s *SendTaskCreatedNotification) Handle(payload interface{}) error { func (s *SendTaskCreatedNotification) Handle(payload message.Payload) (err error) {
panic("implement me") task := &Task{}
err = json.Unmarshal(payload, task)
if err != nil {
return err
}
log.Debugf("task.created: %v", task)
return nil
} }