diff --git a/pkg/events/events.go b/pkg/events/events.go index c1ddac2eb..0aec2eed7 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -49,16 +49,6 @@ func InitEvents() (err error) { return err } - router.AddMiddleware( - middleware.Retry{ - MaxRetries: 5, - InitialInterval: time.Millisecond * 100, - Logger: logger, - Multiplier: 2, - }.Middleware, - middleware.Recoverer, - ) - metricsBuilder := metrics.NewPrometheusMetricsBuilder(vmetrics.GetRegistry(), "", "") metricsBuilder.AddPrometheusRouterMetrics(router) @@ -69,6 +59,33 @@ func InitEvents() (err error) { logger, ) + poison, err := middleware.PoisonQueue(pubsub, "poison") + if err != nil { + return err + } + router.AddNoPublisherHandler("poison.logger", "poison", pubsub, func(msg *message.Message) error { + meta := "" + for s, m := range msg.Metadata { + meta += s + "=" + m + ", " + } + log.Errorf("Error while handling message %s, %s payload=%s", msg.UUID, meta, string(msg.Payload)) + return nil + }) + + router.AddMiddleware( + poison, + middleware.Retry{ + MaxRetries: 5, + InitialInterval: time.Millisecond * 100, + MaxInterval: time.Hour, + Multiplier: 2, + MaxElapsedTime: 0, + RandomizationFactor: 1, + Logger: logger, + }.Middleware, + middleware.Recoverer, + ) + for topic, funcs := range listeners { for _, handler := range funcs { router.AddNoPublisherHandler(topic+"."+handler.Name(), topic, pubsub, handler.Handle)