router/middlware based webhooks.
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Ubuntu 2022-06-14 00:30:15 +00:00
parent 9466b1fba5
commit 44128ceb13
7 changed files with 282 additions and 324 deletions

View File

@ -1149,3 +1149,33 @@ Full path: `metrics.password`
Environment path: `VIKUNJA_METRICS_PASSWORD`
---
## webhook
Webhook configuration
### enabled
If set to true, HTTP POST is executed
Default: `true`
Full path: `webhook.enabled`
Environment path: `VIKUNJA_WEBHOOK_ENABLED`
### url
URL for webhook
Default: `http://127.0.0.1/test/`
Full path: `webhook.url`
Environment path: `VIKUNJA_WEBHOOK_URL`

View File

@ -160,12 +160,15 @@ const (
MetricsUsername Key = `metrics.username`
MetricsPassword Key = `metrics.password`
WebhookEnabled Key = `webhook.enabled`
WebhookURL Key = `webhook.url`
WebhookConf Key = `webhooks`
)
// Unmarshal object directly from config
func (k Key) GetUnmarshaled(rawVal interface{}) interface{} {
return viper.UnmarshalKey(string(k), &rawVal)
}
// GetString returns a string config value
func (k Key) GetString() string {
return viper.GetString(string(k))
@ -373,8 +376,9 @@ func InitDefaultConfig() {
// Metrics
MetricsEnabled.setDefault(false)
WebhookEnabled.setDefault(false)
WebhookURL.setDefault("")
WebhookConf.setDefault( []int{} )
// WebhookEnabled.setDefault(false)
// WebhookURL.setDefault("")
}
// InitConfig initializes the config, sets defaults etc.

View File

@ -28,6 +28,8 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"code.vikunja.io/api/pkg/webhooks"
)
var pubsub *gochannel.GoChannel
@ -72,7 +74,10 @@ func InitEvents() (err error) {
return nil
})
whMiddleware := webhooks.GenerateMiddleware()
router.AddMiddleware(
whMiddleware, //webhooks.WebhookMiddleware,
poison,
middleware.Retry{
MaxRetries: 5,
@ -86,6 +91,10 @@ func InitEvents() (err error) {
middleware.Recoverer,
)
router.AddMiddleware(
)
for topic, funcs := range listeners {
for _, handler := range funcs {
router.AddNoPublisherHandler(topic+"."+handler.Name(), topic, pubsub, handler.Handle)

View File

@ -32,7 +32,6 @@ import (
"code.vikunja.io/api/pkg/notifications"
"code.vikunja.io/api/pkg/red"
"code.vikunja.io/api/pkg/user"
"code.vikunja.io/api/pkg/webhooks"
)
// LightInit will only fullInit config, redis, logger but no db connection.
@ -104,7 +103,6 @@ func FullInit() {
go func() {
models.RegisterListeners()
user.RegisterListeners()
webhooks.RegisterListeners()
err := events.InitEvents()
if err != nil {
log.Fatal(err.Error())

View File

@ -1,316 +0,0 @@
// TESTING
package webhooks
import (
// "encoding/json"
"fmt"
"net/http"
"bytes"
"code.vikunja.io/api/pkg/config"
"code.vikunja.io/api/pkg/events"
"code.vikunja.io/api/pkg/models"
"code.vikunja.io/api/pkg/log"
"github.com/ThreeDotsLabs/watermill/message"
)
// RegisterListeners registers all event listeners
func RegisterListeners() {
events.RegisterListener((&models.ListCreatedEvent{}).Name(), &WebHookListCreatedEvent{})
events.RegisterListener((&models.ListDeletedEvent{}).Name(), &WebHookListDeletedEvent{})
events.RegisterListener((&models.NamespaceCreatedEvent{}).Name(), &WebHookNamespaceCreatedEvent{})
events.RegisterListener((&models.NamespaceDeletedEvent{}).Name(), &WebHookNamespaceDeletedEvent{})
events.RegisterListener((&models.TaskCreatedEvent{}).Name(), &WebHookTaskCreatedEvent{})
events.RegisterListener((&models.TaskDeletedEvent{}).Name(), &WebHookTaskDeletedEvent{})
events.RegisterListener((&models.TeamDeletedEvent{}).Name(), &WebHookTeamDeletedEvent{})
events.RegisterListener((&models.TeamCreatedEvent{}).Name(), &WebHookTeamCreatedEvent{})
events.RegisterListener((&models.TeamMemberAddedEvent{}).Name(), &WebHookTeamMemberAddedEvent{})
events.RegisterListener((&models.TaskCommentCreatedEvent{}).Name(), &WebHookTaskCommentCreatedEvent{})
events.RegisterListener((&models.TaskAssigneeCreatedEvent{}).Name(), &WebHookTaskAssigneeCreatedEvent{})
events.RegisterListener((&models.TaskCommentUpdatedEvent{}).Name(), &WebHookTaskCommentUpdatedEvent{})
events.RegisterListener((&models.TaskUpdatedEvent{}).Name(), &WebHookTaskUpdatedEvent{})
}
///////
// Event Listeners
// WebHookListCreatedEvent represents a listener
type WebHookListCreatedEvent struct {
}
// Name defines the name for the WebHookListCreatedEvent listener
func (s *WebHookListCreatedEvent) Name() string {
return "web.hook.list.created.event"
}
// Handle is executed when the event WebHookListCreatedEvent listens on is fired
func (s *WebHookListCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.ListCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookListDeletedEvent represents a listener
type WebHookListDeletedEvent struct {
}
// Name defines the name for the WebHookListDeletedEvent listener
func (s *WebHookListDeletedEvent) Name() string {
return "web.hook.list.deleted.event"
}
// Handle is executed when the event WebHookListDeletedEvent listens on is fired
func (s *WebHookListDeletedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.ListDeletedEvent{}).Name(), msg)
// end
return nil
}
// WebHookNamespaceCreatedEvent represents a listener
type WebHookNamespaceCreatedEvent struct {
}
// Name defines the name for the WebHookNamespaceCreatedEvent listener
func (s *WebHookNamespaceCreatedEvent) Name() string {
return "web.hook.namespace.created.event"
}
// Handle is executed when the event WebHookNamespaceCreatedEvent listens on is fired
func (s *WebHookNamespaceCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.NamespaceCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookNamespaceDeletedEvent represents a listener
type WebHookNamespaceDeletedEvent struct {
}
// Name defines the name for the WebHookNamespaceDeletedEvent listener
func (s *WebHookNamespaceDeletedEvent) Name() string {
return "web.hook.namespace.deleted.event"
}
// Handle is executed when the event WebHookNamespaceDeletedEvent listens on is fired
func (s *WebHookNamespaceDeletedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.NamespaceDeletedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskCreatedEvent represents a listener
type WebHookTaskCreatedEvent struct {
}
// Name defines the name for the WebHookTaskCreatedEvent listener
func (s *WebHookTaskCreatedEvent) Name() string {
return "web.hook.task.created.event"
}
// Handle is executed when the event WebHookTaskCreatedEvent listens on is fired
func (s *WebHookTaskCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskDeletedEvent represents a listener
type WebHookTaskDeletedEvent struct {
}
// Name defines the name for the WebHookTaskDeletedEvent listener
func (s *WebHookTaskDeletedEvent) Name() string {
return "web.hook.task.deleted.event"
}
// Handle is executed when the event WebHookTaskDeletedEvent listens on is fired
func (s *WebHookTaskDeletedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskDeletedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTeamDeletedEvent represents a listener
type WebHookTeamDeletedEvent struct {
}
// Name defines the name for the WebHookTeamDeletedEvent listener
func (s *WebHookTeamDeletedEvent) Name() string {
return "web.hook.team.deleted.event"
}
// Handle is executed when the event WebHookTeamDeletedEvent listens on is fired
func (s *WebHookTeamDeletedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TeamDeletedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTeamCreatedEvent represents a listener
type WebHookTeamCreatedEvent struct {
}
// Name defines the name for the WebHookTeamCreatedEvent listener
func (s *WebHookTeamCreatedEvent) Name() string {
return "web.hook.team.created.event"
}
// Handle is executed when the event WebHookTeamCreatedEvent listens on is fired
func (s *WebHookTeamCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TeamCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTeamMemberAddedEvent represents a listener
type WebHookTeamMemberAddedEvent struct {
}
// Name defines the name for the WebHookTeamMemberAddedEvent listener
func (s *WebHookTeamMemberAddedEvent) Name() string {
return "web.hook.team.member.added.event"
}
// Handle is executed when the event WebHookTeamMemberAddedEvent listens on is fired
func (s *WebHookTeamMemberAddedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TeamMemberAddedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskCommentCreatedEvent represents a listener
type WebHookTaskCommentCreatedEvent struct {
}
// Name defines the name for the WebHookTaskCommentCreatedEvent listener
func (s *WebHookTaskCommentCreatedEvent) Name() string {
return "web.hook.task.comment.created.event"
}
// Handle is executed when the event WebHookTaskCommentCreatedEvent listens on is fired
func (s *WebHookTaskCommentCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskCommentCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskAssigneeCreatedEvent represents a listener
type WebHookTaskAssigneeCreatedEvent struct {
}
// Name defines the name for the WebHookTaskAssigneeCreatedEvent listener
func (s *WebHookTaskAssigneeCreatedEvent) Name() string {
return "web.hook.task.assignee.created.event"
}
// Handle is executed when the event WebHookTaskAssigneeCreatedEvent listens on is fired
func (s *WebHookTaskAssigneeCreatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskAssigneeCreatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskCommentUpdatedEvent represents a listener
type WebHookTaskCommentUpdatedEvent struct {
}
// Name defines the name for the WebHookTaskCommentUpdatedEvent listener
func (s *WebHookTaskCommentUpdatedEvent) Name() string {
return "web.hook.task.comment.updated.event"
}
// Handle is executed when the event WebHookTaskCommentUpdatedEvent listens on is fired
func (s *WebHookTaskCommentUpdatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskCommentUpdatedEvent{}).Name(), msg)
// end
return nil
}
// WebHookTaskUpdatedEvent represents a listener
type WebHookTaskUpdatedEvent struct {
}
// Name defines the name for the WebHookTaskUpdatedEvent listener
func (s *WebHookTaskUpdatedEvent) Name() string {
return "web.hook.task.updated.event"
}
// Handle is executed when the event WebHookTaskUpdatedEvent listens on is fired
func (s *WebHookTaskUpdatedEvent) Handle(msg *message.Message) (err error) {
// -> global handler
TheGreatWebhookHandler( (&models.TaskUpdatedEvent{}).Name(), msg)
// end
return nil
}
func TheGreatWebhookHandler(name string, msg *message.Message) {
log.Debugf("Webhook for %s => %s", name, msg.Payload)
if !config.WebhookEnabled.GetBool() {
return
}
base_url := config.WebhookURL.GetString()
webhook_url := fmt.Sprintf("%s?event=%s", base_url, name)
data := bytes.NewBuffer(msg.Payload)
_, err := http.Post(webhook_url, "application/json",data)
if err != nil {
log.Debugf("Webhook failed : %s ", webhook_url )
} else {
log.Debugf("Webhook success : %s ", webhook_url )
}
}

View File

@ -0,0 +1,91 @@
package webhooks
import (
"code.vikunja.io/api/pkg/log"
"github.com/ThreeDotsLabs/watermill/message"
)
// type middlwareFunc func(message.HandlerFunc) message.HandlerFunc
func GenerateMiddleware() message.HandlerMiddleware {
runtimeConfig := ProcessConfig()
return func(hf message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
topic := message.SubscribeTopicFromCtx(msg.Context() )
TheGreatWebhookHandler(runtimeConfig, topic, msg)
newMsg, err := hf(msg)
if err != nil {
return nil, err
}
return newMsg, nil
}
}
}
func TheGreatWebhookHandler(cfg []WebhookRuntimeConfig, topic string, msg *message.Message) {
log.Debugf("Webhook handler for '%s'", topic)
for i, entry := range cfg {
log.Debugf("Checking webhook config [%d]", i)
if entry.FilterFunc(topic) {
log.Debugf("Executing webhook [%d]", i)
entry.ExecuteFunc(topic, msg)
}
}
}
/*
func OldWebhookMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
topic := message.SubscribeTopicFromCtx(msg.Context() )
//TheGreatWebhookHandler(topic, msg)
*
log.Debugf("MIDDLEWARE: %s ", msg )
log.Debugf("MIDDLEWARE META : %s ", msg.Metadata )
log.Debugf("MIDDLEWARE PAYLOAD : %s ", msg.Payload )
log.Debugf("MIDDLEWARE PUB : %s ", message.PublishTopicFromCtx(msg.Context()) )
log.Debugf("MIDDLEWARE SUB : %s ", message.SubscribeTopicFromCtx(msg.Context()) )
*
newMsg, err := h(msg)
if err != nil {
return nil, err
}
return newMsg, nil
}
}
func OldTheGreatWebhookHandler(name string, msg *message.Message) {
log.Debugf("Webhook for %s => %s", name, msg.Payload)
for endpoint := range config.WebhookConf {
log.Debugf("ENDPOINT : %s ", endpoint )
}
base_url := "123" // config.WebhookURL.GetString()
webhook_url := fmt.Sprintf("%s?event=%s", base_url, name)
data := bytes.NewBuffer(msg.Payload)
_, err := http.Post(webhook_url, "application/json",data)
if err != nil {
log.Debugf("Webhook failed : %s ", webhook_url )
} else {
log.Debugf("Webhook success : %s ", webhook_url )
}
}
*/

142
pkg/webhooks/runtime.go Normal file
View File

@ -0,0 +1,142 @@
package webhooks
import (
"fmt"
"strings"
"net/http"
"bytes"
"time"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"code.vikunja.io/api/pkg/log"
"code.vikunja.io/api/pkg/config"
"github.com/ThreeDotsLabs/watermill/message"
)
const (
timeoutVal = 5 * time.Second
ctHeader = "Content-Type"
ctValue = "application/json"
hmacHeader = "X-signature"
)
type FilteringFunction func(string) (bool)
type WebhookCallFunction func(string, *message.Message) (error)
// Single configuration entry
type SingleConfEntry struct {
Events []string `json:"events"`
URL string `json:"url"`
Secret string `json:"secret"`
}
type WebhookRuntimeConfig struct {
FilterFunc FilteringFunction
ExecuteFunc WebhookCallFunction
}
func GetWebhookFilterFunc(cfg SingleConfEntry) FilteringFunction {
return func(topic string) (iteresting bool) {
for _, filter := range cfg.Events {
log.Debugf("Match pattern:'%s' agains:'%s'", filter, topic)
if filter == "*" {
log.Debugf(" '*' == Always match ")
return true
} else {
if strings.HasPrefix(topic, filter) {
log.Debugf(" Positive match ")
return true
}
}
}
return false
}
}
func GetWebhookCallFunc(cfg SingleConfEntry) WebhookCallFunction {
return func(topic string, msg *message.Message) (error) {
endpointURL := cfg.URL
hmacKey := cfg.Secret
log.Debugf("Webhook Call : %s (%s)", endpointURL, hmacKey)
webhookUrl := fmt.Sprintf("%s%s", endpointURL, topic)
rawData := msg.Payload
req, err1 := http.NewRequest( http.MethodPost, webhookUrl, bytes.NewBuffer(rawData))
if err1 != nil {
return err1
}
client := &http.Client{
Timeout: timeoutVal,
}
req.Header.Set(ctHeader, ctValue)
if len(hmacKey) > 1 {
signature := GenerateHMAC(rawData, hmacKey)
req.Header.Set(hmacHeader, signature)
}
_, err2 := client.Do(req)
//_, err := http.Post(webhook_url, "application/json",data) "x-vikunja-signature"
if err2 != nil {
log.Debugf("Webhook failed : %s , +%v", webhookUrl , err2)
return err2
}
log.Debugf("Webhook success : %s ", webhookUrl )
return nil
}
}
func GenerateHMAC(data []byte, key string) string {
h := hmac.New(sha256.New, []byte(key))
h.Write(data)
return hex.EncodeToString(h.Sum(nil))
}
// Process config and prepare mapping
func ProcessConfig() ([]WebhookRuntimeConfig) {
var items []SingleConfEntry
config.WebhookConf.GetUnmarshaled(&items)
runtime := make([]WebhookRuntimeConfig, len(items))
fmt.Printf("RAW: %+v\n", items)
for i, item := range items {
fmt.Printf("RAW ITEM: %d : %+v\n", i, item)
for j, exp := range item.Events {
fmt.Printf("RAW exp : %d %s\n", j, exp)
}
runtime[i].FilterFunc = GetWebhookFilterFunc(item)
runtime[i].ExecuteFunc = GetWebhookCallFunc(item)
}
// fmt.Printf("RAW: %+v\n", runtime)
//for i := range items {
// runtime[i].FilterFunc("test")
//}
return runtime
}