From 6b007411cd9f7112607a96a6e69bb6d302f78f74 Mon Sep 17 00:00:00 2001 From: kolaente Date: Tue, 3 Sep 2019 21:24:47 +0200 Subject: [PATCH] Started adding sse broker --- pkg/router/handler.go | 16 +++- pkg/router/router.go | 21 +++-- pkg/router/sse.go | 187 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 10 deletions(-) create mode 100644 pkg/router/sse.go diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 713a0aa..6f51132 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -17,18 +17,26 @@ type UpdatedMessage struct { Data models.Managable `json:"data"` } -func (h *Handler) ReadAll(c echo.Context) error { - +func (h *Handler) readAll(c echo.Context) (interface{}, error) { str := h.str() if err := c.Bind(str); err != nil { - return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + return nil, echo.NewHTTPError(http.StatusBadRequest, err.Error()) } asc := c.QueryParam("asc") data, err := str.ReadAll(asc) if err != nil { log.Error(err) - return echo.ErrInternalServerError + return nil, echo.ErrInternalServerError + } + return data, nil +} + +func (h *Handler) ReadAll(c echo.Context) error { + + data, err := h.readAll(c) + if err != nil { + return err } return c.JSON(http.StatusOK, data) } diff --git a/pkg/router/router.go b/pkg/router/router.go index dda55d0..309da9c 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -1,6 +1,9 @@ package router import ( + "html/template" + "net/http" + "git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/config" "git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/models" "github.com/gorilla/sessions" @@ -8,8 +11,6 @@ import ( "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/labstack/gommon/log" - "html/template" - "net/http" ) func NewEcho() *echo.Echo { @@ -65,8 +66,16 @@ func RegisterRoutes(e *echo.Echo) { } e.GET("/list", handler.ReadAll) + + // Fancy message broker with SSE + b := NewBroker() + b.Start() + b.handler = &handler + e.GET("/events", b.Serve) + // Routes with auth - e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { + a := e.Group("") + a.Use(func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { if !isLoggedIn(c) { return echo.NewHTTPError(http.StatusForbidden, "Login first.") @@ -74,7 +83,7 @@ func RegisterRoutes(e *echo.Echo) { return next(c) } }) - e.POST("/update", handler.Update) - e.POST("/delete", handler.Delete) - e.POST("/add", handler.Create) + a.POST("/update", handler.Update) + a.POST("/delete", handler.Delete) + a.POST("/add", handler.Create) } diff --git a/pkg/router/sse.go b/pkg/router/sse.go new file mode 100644 index 0000000..c577c1a --- /dev/null +++ b/pkg/router/sse.go @@ -0,0 +1,187 @@ +package router + +import ( + "encoding/json" + "fmt" + "github.com/labstack/echo/v4" + "github.com/labstack/gommon/log" + "net/http" +) + +// A single Broker will be created in this program. It is responsible +// for keeping a list of which clients (browsers) are currently attached +// and broadcasting events (messages) to those clients. +// +type Broker struct { + + // Create a map of clients, the keys of the map are the channels + // over which we can push messages to attached clients. (The values + // are just booleans and are meaningless.) + // + clients map[chan Message]bool + + // Channel into which new clients can be pushed + // + newClients chan chan Message + + // Channel into which disconnected clients should be pushed + // + defunctClients chan chan Message + + // Channel into which messages are pushed to be broadcast out + // to attahed clients. + // + messages chan Message + + handler *Handler +} + +type MessageKind string + +const ( + KindInit MessageKind = `init` +) + +type Message struct { + Kind MessageKind `json:"kind"` + Data interface{} `json:"data"` +} + +func NewBroker() *Broker { + return &Broker{ + clients: make(map[chan Message]bool), + newClients: make(chan (chan Message)), + defunctClients: make(chan (chan Message)), + messages: make(chan Message), + } +} + +// This Broker method starts a new goroutine. It handles +// the addition & removal of clients, as well as the broadcasting +// of messages out to clients that are currently attached. +// +func (b *Broker) Start() { + + // Start a goroutine + // + go func() { + + // Loop endlessly + // + for { + + // Block until we receive from one of the + // three following channels. + select { + + case s := <-b.newClients: + + // There is a new client attached and we + // want to start sending them messages. + b.clients[s] = true + log.Info("Added new client") + + case s := <-b.defunctClients: + + // A client has dettached and we want to + // stop sending them messages. + delete(b.clients, s) + close(s) + + log.Info("Removed client") + + case msg := <-b.messages: + + // There is a new message to send. For each + // attached client, push the new message + // into the client's message channel. + for s := range b.clients { + s <- msg + } + log.Printf("Broadcast message to %d clients", len(b.clients)) + } + } + }() +} + +func (b *Broker) Serve(c echo.Context) error { + + rw := c.Response().Writer + + // Make sure that the writer supports flushing. + // + f, ok := rw.(http.Flusher) + if !ok { + return echo.NewHTTPError(http.StatusInternalServerError, "Streaming not supported!") + } + + // Create a new channel, over which the broker can + // send this client messages. + messageChan := make(chan Message) + + // Add this client to the map of those that should + // receive updates + b.newClients <- messageChan + + // Listen to the closing of the http connection via the CloseNotifier + // FIXME: use Done() + notify := rw.(http.CloseNotifier).CloseNotify() + go func() { + <-notify + // Remove this client from the map of attached clients + // when `EventHandler` exits. + b.defunctClients <- messageChan + log.Info("HTTP connection closed.") + }() + + // Set the headers related to event streaming. + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + rw.Header().Set("Transfer-Encoding", "chunked") + + // Push the initial list to the user + data, err := b.handler.readAll(c) + if err != nil { + log.Errorf("Error getting initial data: %s", err) + } + + sendJSONMessage(&Message{ + Kind: KindInit, + Data: data, + }, f, rw) + + // Don't close the connection, instead loop endlessly. + for { + + // Read from our messageChan. + msg, open := <-messageChan + + if !open { + // If our messageChan was closed, this means that the client has + // disconnected. + break + } + + sendJSONMessage(&msg, f, rw) + } + + // Done. + log.Info("Finished HTTP request") + return nil +} + +func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) { + jsonMessage, err := json.Marshal(m) + if err != nil { + log.Errorf("Error serializing json: %s", err) + } + + _, err = fmt.Fprintf(rw, "%s\n\n", string(jsonMessage)) + if err != nil { + log.Errorf("Error sending message to client: %s", err) + } + // Flush the response. This is only possible if + // the repsonse supports streaming. + f.Flush() +}