Started adding sse broker
continuous-integration/drone/push Build is passing Details

This commit is contained in:
kolaente 2019-09-03 21:24:47 +02:00
parent 00ea9111ef
commit 6b007411cd
Signed by: konrad
GPG Key ID: F40E70337AB24C9B
3 changed files with 214 additions and 10 deletions

View File

@ -17,18 +17,26 @@ type UpdatedMessage struct {
Data models.Managable `json:"data"`
}
func (h *Handler) ReadAll(c echo.Context) error {
func (h *Handler) readAll(c echo.Context) (interface{}, error) {
str := h.str()
if err := c.Bind(str); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
return nil, echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
asc := c.QueryParam("asc")
data, err := str.ReadAll(asc)
if err != nil {
log.Error(err)
return echo.ErrInternalServerError
return nil, echo.ErrInternalServerError
}
return data, nil
}
func (h *Handler) ReadAll(c echo.Context) error {
data, err := h.readAll(c)
if err != nil {
return err
}
return c.JSON(http.StatusOK, data)
}

View File

@ -1,6 +1,9 @@
package router
import (
"html/template"
"net/http"
"git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/config"
"git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/models"
"github.com/gorilla/sessions"
@ -8,8 +11,6 @@ import (
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/labstack/gommon/log"
"html/template"
"net/http"
)
func NewEcho() *echo.Echo {
@ -65,8 +66,16 @@ func RegisterRoutes(e *echo.Echo) {
}
e.GET("/list", handler.ReadAll)
// Fancy message broker with SSE
b := NewBroker()
b.Start()
b.handler = &handler
e.GET("/events", b.Serve)
// Routes with auth
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
a := e.Group("")
a.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if !isLoggedIn(c) {
return echo.NewHTTPError(http.StatusForbidden, "Login first.")
@ -74,7 +83,7 @@ func RegisterRoutes(e *echo.Echo) {
return next(c)
}
})
e.POST("/update", handler.Update)
e.POST("/delete", handler.Delete)
e.POST("/add", handler.Create)
a.POST("/update", handler.Update)
a.POST("/delete", handler.Delete)
a.POST("/add", handler.Create)
}

187
pkg/router/sse.go Normal file
View File

@ -0,0 +1,187 @@
package router
import (
"encoding/json"
"fmt"
"github.com/labstack/echo/v4"
"github.com/labstack/gommon/log"
"net/http"
)
// A single Broker will be created in this program. It is responsible
// for keeping a list of which clients (browsers) are currently attached
// and broadcasting events (messages) to those clients.
//
type Broker struct {
// Create a map of clients, the keys of the map are the channels
// over which we can push messages to attached clients. (The values
// are just booleans and are meaningless.)
//
clients map[chan Message]bool
// Channel into which new clients can be pushed
//
newClients chan chan Message
// Channel into which disconnected clients should be pushed
//
defunctClients chan chan Message
// Channel into which messages are pushed to be broadcast out
// to attahed clients.
//
messages chan Message
handler *Handler
}
type MessageKind string
const (
KindInit MessageKind = `init`
)
type Message struct {
Kind MessageKind `json:"kind"`
Data interface{} `json:"data"`
}
func NewBroker() *Broker {
return &Broker{
clients: make(map[chan Message]bool),
newClients: make(chan (chan Message)),
defunctClients: make(chan (chan Message)),
messages: make(chan Message),
}
}
// This Broker method starts a new goroutine. It handles
// the addition & removal of clients, as well as the broadcasting
// of messages out to clients that are currently attached.
//
func (b *Broker) Start() {
// Start a goroutine
//
go func() {
// Loop endlessly
//
for {
// Block until we receive from one of the
// three following channels.
select {
case s := <-b.newClients:
// There is a new client attached and we
// want to start sending them messages.
b.clients[s] = true
log.Info("Added new client")
case s := <-b.defunctClients:
// A client has dettached and we want to
// stop sending them messages.
delete(b.clients, s)
close(s)
log.Info("Removed client")
case msg := <-b.messages:
// There is a new message to send. For each
// attached client, push the new message
// into the client's message channel.
for s := range b.clients {
s <- msg
}
log.Printf("Broadcast message to %d clients", len(b.clients))
}
}
}()
}
func (b *Broker) Serve(c echo.Context) error {
rw := c.Response().Writer
// Make sure that the writer supports flushing.
//
f, ok := rw.(http.Flusher)
if !ok {
return echo.NewHTTPError(http.StatusInternalServerError, "Streaming not supported!")
}
// Create a new channel, over which the broker can
// send this client messages.
messageChan := make(chan Message)
// Add this client to the map of those that should
// receive updates
b.newClients <- messageChan
// Listen to the closing of the http connection via the CloseNotifier
// FIXME: use Done()
notify := rw.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
// Remove this client from the map of attached clients
// when `EventHandler` exits.
b.defunctClients <- messageChan
log.Info("HTTP connection closed.")
}()
// Set the headers related to event streaming.
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Transfer-Encoding", "chunked")
// Push the initial list to the user
data, err := b.handler.readAll(c)
if err != nil {
log.Errorf("Error getting initial data: %s", err)
}
sendJSONMessage(&Message{
Kind: KindInit,
Data: data,
}, f, rw)
// Don't close the connection, instead loop endlessly.
for {
// Read from our messageChan.
msg, open := <-messageChan
if !open {
// If our messageChan was closed, this means that the client has
// disconnected.
break
}
sendJSONMessage(&msg, f, rw)
}
// Done.
log.Info("Finished HTTP request")
return nil
}
func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) {
jsonMessage, err := json.Marshal(m)
if err != nil {
log.Errorf("Error serializing json: %s", err)
}
_, err = fmt.Fprintf(rw, "%s\n\n", string(jsonMessage))
if err != nil {
log.Errorf("Error sending message to client: %s", err)
}
// Flush the response. This is only possible if
// the repsonse supports streaming.
f.Flush()
}