From 7a00cb1f6a587f37072adc8bae351e60094c7391 Mon Sep 17 00:00:00 2001 From: kolaente Date: Sun, 31 Jan 2021 15:08:39 +0100 Subject: [PATCH] Add handlers and listeners --- pkg/events/events.go | 25 ++++++++++++++++++++++--- pkg/events/listeners.go | 16 ++++++++++++---- pkg/initialize/init.go | 1 + pkg/models/events.go | 29 +++++++++++++++++++++++++++++ pkg/models/listeners.go | 30 ++++++++++++++++++++++++++++++ pkg/models/tasks.go | 5 +++++ 6 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 pkg/models/events.go create mode 100644 pkg/models/listeners.go diff --git a/pkg/events/events.go b/pkg/events/events.go index 072362395..26bbbbf68 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -18,6 +18,7 @@ package events import ( "context" + "encoding/json" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" @@ -30,6 +31,11 @@ var ( logger = watermill.NewStdLogger(false, false) ) +type Event interface { + TopicName() string + Message() interface{} +} + func InitEvents() (err error) { router, err := message.NewRouter( message.RouterConfig{}, @@ -57,13 +63,26 @@ func InitEvents() (err error) { for topic, funcs := range listeners { for _, handler := range funcs { - router.AddNoPublisherHandler(topic+watermill.NewShortUUID(), topic, pubsub, handler) + router.AddNoPublisherHandler(topic+watermill.NewShortUUID(), topic, pubsub, func(msg *message.Message) error { + var payload interface{} + err = json.Unmarshal(msg.Payload, payload) + if err != nil { + return err + } + return handler.Handle(payload) + }) } } return router.Run(context.Background()) } -func Publish(topic string, message ...*message.Message) error { - return pubsub.Publish(topic, message...) +func Publish(event Event) error { + content, err := json.Marshal(event.Message()) + if err != nil { + return err + } + + msg := message.NewMessage(watermill.NewUUID(), content) + return pubsub.Publish(event.TopicName(), msg) } diff --git a/pkg/events/listeners.go b/pkg/events/listeners.go index 3d6d692d0..33227cf4d 100644 --- a/pkg/events/listeners.go +++ b/pkg/events/listeners.go @@ -16,8 +16,16 @@ package events -import "github.com/ThreeDotsLabs/watermill/message" - -var listeners = map[string][]message.NoPublishHandlerFunc{ - "task.created": {}, +type Listener interface { + Handle(payload interface{}) error +} + +var listeners map[string][]Listener + +func init() { + listeners = make(map[string][]Listener) +} + +func RegisterListener(topicName string, listener Listener) { + listeners[topicName] = append(listeners[topicName], listener) } diff --git a/pkg/initialize/init.go b/pkg/initialize/init.go index ed4c62a68..be09a614d 100644 --- a/pkg/initialize/init.go +++ b/pkg/initialize/init.go @@ -89,6 +89,7 @@ func FullInit() { // Start processing events go func() { + models.RegisterListeners() err := events.InitEvents() if err != nil { log.Fatal(err.Error()) diff --git a/pkg/models/events.go b/pkg/models/events.go new file mode 100644 index 000000000..6e59dd231 --- /dev/null +++ b/pkg/models/events.go @@ -0,0 +1,29 @@ +// Vikunja is a to-do list application to facilitate your life. +// Copyright 2018-2021 Vikunja and contributors. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public Licensee as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public Licensee for more details. +// +// You should have received a copy of the GNU Affero General Public Licensee +// along with this program. If not, see . + +package models + +type TaskCreatedEvent struct { + Task *Task +} + +func (t *TaskCreatedEvent) TopicName() string { + return "task.created" +} + +func (t *TaskCreatedEvent) Message() interface{} { + return t.Task +} diff --git a/pkg/models/listeners.go b/pkg/models/listeners.go new file mode 100644 index 000000000..29ccc6a2d --- /dev/null +++ b/pkg/models/listeners.go @@ -0,0 +1,30 @@ +// Vikunja is a to-do list application to facilitate your life. +// Copyright 2018-2021 Vikunja and contributors. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public Licensee as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public Licensee for more details. +// +// You should have received a copy of the GNU Affero General Public Licensee +// along with this program. If not, see . + +package models + +import "code.vikunja.io/api/pkg/events" + +func RegisterListeners() { + events.RegisterListener((&TaskCreatedEvent{}).TopicName(), &SendTaskCreatedNotification{}) +} + +type SendTaskCreatedNotification struct { +} + +func (s *SendTaskCreatedNotification) Handle(payload interface{}) error { + panic("implement me") +} diff --git a/pkg/models/tasks.go b/pkg/models/tasks.go index ff0b9165d..c5d78b5b9 100644 --- a/pkg/models/tasks.go +++ b/pkg/models/tasks.go @@ -17,6 +17,7 @@ package models import ( + "code.vikunja.io/api/pkg/events" "math" "sort" "strconv" @@ -828,6 +829,10 @@ func createTask(s *xorm.Session, t *Task, a web.Auth, updateAssignees bool) (err t.setIdentifier(l) + if err := events.Publish(&TaskCreatedEvent{Task: t}); err != nil { + return err + } + err = updateListLastUpdated(s, &List{ID: t.ListID}) return }