Moved Broker to seperate package
continuous-integration/drone/push Build is passing Details

This commit is contained in:
kolaente 2019-09-04 18:53:49 +02:00
parent 8d164cf2d8
commit 10ebc05799
Signed by: konrad
GPG Key ID: F40E70337AB24C9B
3 changed files with 42 additions and 33 deletions

View File

@ -1,4 +1,4 @@
package router package broker
import ( import (
"encoding/json" "encoding/json"
@ -33,7 +33,9 @@ type Broker struct {
// //
messages chan Message messages chan Message
handler *Handler // This function is called every time a new clients connects.
// Its output is sent to the client as initial connect message.
init func(c echo.Context) (interface{}, error)
} }
type MessageKind string type MessageKind string
@ -50,12 +52,15 @@ type Message struct {
Data interface{} `json:"data"` Data interface{} `json:"data"`
} }
func NewBroker() *Broker { var broker *Broker
return &Broker{
func Init(init func(c echo.Context) (interface{}, error)) {
broker = &Broker{
clients: make(map[chan Message]bool), clients: make(map[chan Message]bool),
newClients: make(chan (chan Message)), newClients: make(chan (chan Message)),
defunctClients: make(chan (chan Message)), defunctClients: make(chan (chan Message)),
messages: make(chan Message), messages: make(chan Message),
init: init,
} }
} }
@ -63,10 +68,10 @@ func NewBroker() *Broker {
// the addition & removal of clients, as well as the broadcasting // the addition & removal of clients, as well as the broadcasting
// of messages out to clients that are currently attached. // of messages out to clients that are currently attached.
// //
func (b *Broker) Start() { func Start() {
if b.handler == nil { if broker.init == nil {
panic("handler must not be nil!") panic("Init function must not be nil!")
} }
// Start a goroutine // Start a goroutine
@ -81,37 +86,37 @@ func (b *Broker) Start() {
// three following channels. // three following channels.
select { select {
case s := <-b.newClients: case s := <-broker.newClients:
// There is a new client attached and we // There is a new client attached and we
// want to start sending them messages. // want to start sending them messages.
b.clients[s] = true broker.clients[s] = true
log.Debug("Added new client") log.Debug("Added new client")
case s := <-b.defunctClients: case s := <-broker.defunctClients:
// A client has dettached and we want to // A client has dettached and we want to
// stop sending them messages. // stop sending them messages.
delete(b.clients, s) delete(broker.clients, s)
close(s) close(s)
log.Debug("Removed client") log.Debug("Removed client")
case msg := <-b.messages: case msg := <-broker.messages:
// There is a new message to send. For each // There is a new message to send. For each
// attached client, push the new message // attached client, push the new message
// into the client's message channel. // into the client's message channel.
for s := range b.clients { for s := range broker.clients {
s <- msg s <- msg
} }
log.Printf("Broadcast message to %d clients", len(b.clients)) log.Printf("Broadcast message to %d clients", len(broker.clients))
} }
} }
}() }()
} }
func (b *Broker) Serve(c echo.Context) error { func Serve(c echo.Context) error {
rw := c.Response().Writer rw := c.Response().Writer
@ -128,7 +133,7 @@ func (b *Broker) Serve(c echo.Context) error {
// Add this client to the map of those that should // Add this client to the map of those that should
// receive updates // receive updates
b.newClients <- messageChan broker.newClients <- messageChan
// Listen to the closing of the http connection via the CloseNotifier // Listen to the closing of the http connection via the CloseNotifier
// FIXME: use Done() // FIXME: use Done()
@ -137,7 +142,7 @@ func (b *Broker) Serve(c echo.Context) error {
<-notify <-notify
// Remove this client from the map of attached clients // Remove this client from the map of attached clients
// when `EventHandler` exits. // when `EventHandler` exits.
b.defunctClients <- messageChan broker.defunctClients <- messageChan
log.Debug("HTTP connection closed.") log.Debug("HTTP connection closed.")
}() }()
@ -148,7 +153,7 @@ func (b *Broker) Serve(c echo.Context) error {
rw.Header().Set("Transfer-Encoding", "chunked") rw.Header().Set("Transfer-Encoding", "chunked")
// Push the initial list to the user // Push the initial list to the user
data, err := b.handler.readAll(c) data, err := broker.init(c)
if err != nil { if err != nil {
log.Errorf("Error getting initial data: %s", err) log.Errorf("Error getting initial data: %s", err)
} }
@ -178,6 +183,10 @@ func (b *Broker) Serve(c echo.Context) error {
return nil return nil
} }
func SendMessage(m Message) {
broker.messages <- m
}
func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) { func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) {
jsonMessage, err := json.Marshal(m.Data) jsonMessage, err := json.Marshal(m.Data)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package router package router
import ( import (
"git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/broker"
"net/http" "net/http"
"strconv" "strconv"
@ -11,7 +12,7 @@ import (
type Handler struct { type Handler struct {
str func() models.Managable str func() models.Managable
broker *Broker broker *broker.Broker
} }
type UpdatedMessage struct { type UpdatedMessage struct {
@ -55,10 +56,10 @@ func (h *Handler) Create(c echo.Context) error {
} }
// Notify the broker // Notify the broker
h.broker.messages <- Message{ broker.SendMessage(broker.Message{
Kind: KindCreate, Kind: broker.KindCreate,
Data: str, Data: str,
} })
return c.JSON(http.StatusOK, "success") return c.JSON(http.StatusOK, "success")
} }
@ -75,10 +76,10 @@ func (h *Handler) Delete(c echo.Context) error {
} }
// Notify the broker // Notify the broker
h.broker.messages <- Message{ broker.SendMessage(broker.Message{
Kind: KindDelete, Kind: broker.KindDelete,
Data: str, Data: str,
} })
return c.JSON(http.StatusOK, "success") return c.JSON(http.StatusOK, "success")
} }
@ -97,10 +98,10 @@ func (h *Handler) Update(c echo.Context) error {
} }
// Notify the broker // Notify the broker
h.broker.messages <- Message{ broker.SendMessage(broker.Message{
Kind: KindUpdate, Kind: broker.KindUpdate,
Data: str, Data: str,
} })
return c.JSON(http.StatusOK, UpdatedMessage{ return c.JSON(http.StatusOK, UpdatedMessage{
Message: "success", Message: "success",

View File

@ -1,6 +1,7 @@
package router package router
import ( import (
"git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/broker"
"html/template" "html/template"
"net/http" "net/http"
@ -65,11 +66,9 @@ func RegisterRoutes(e *echo.Echo) {
} }
// Fancy message broker with SSE // Fancy message broker with SSE
b := NewBroker() broker.Init(handler.readAll)
handler.broker = b broker.Start()
b.handler = &handler e.GET("/events", broker.Serve)
b.Start()
e.GET("/events", b.Serve)
e.GET("/list", handler.ReadAll) e.GET("/list", handler.ReadAll)
// Routes with auth // Routes with auth