Konfi-Castle-Kasino/pkg/broker/broker.go

208 lines
5.1 KiB
Go

package broker
import (
"encoding/json"
"fmt"
"github.com/labstack/echo/v4"
"github.com/labstack/gommon/log"
"net/http"
)
// Broker is is responsible for keeping a list of which clients (browsers)
// are currently attached and broadcasting events (messages) to those clients.
type Broker struct {
// Create a map of clients, the keys of the map are the channels
// over which we can push messages to attached clients. (The values
// are just booleans and are meaningless.)
//
clients map[chan Message]bool
// Channel into which new clients can be pushed
//
newClients chan chan Message
// Channel into which disconnected clients should be pushed
//
defunctClients chan chan Message
// Channel into which messages are pushed to be broadcast out
// to attahed clients.
//
messages chan Message
// This function is called every time a new clients connects.
// Its output is sent to the client as initial connect message.
init func(c echo.Context) (interface{}, error)
}
// MessageKind represents a message kind
type MessageKind string
// These are all valid message kinds
const (
KindInit MessageKind = `init`
KindUpdate MessageKind = `update`
KindCreate MessageKind = `create`
KindDelete MessageKind = `delete`
)
// Message represents a message
type Message struct {
Kind MessageKind `json:"kind"`
Data interface{} `json:"data"`
}
var broker *Broker
// Init creates a new local broker
func Init(init func(c echo.Context) (interface{}, error)) {
broker = &Broker{
clients: make(map[chan Message]bool),
newClients: make(chan (chan Message)),
defunctClients: make(chan (chan Message)),
messages: make(chan Message),
init: init,
}
}
// Start starts a new goroutine. It handles the addition & removal of clients,
// as well as the broadcasting of messages out to clients that are currently attached.
func Start() {
if broker.init == nil {
panic("Init function must not be nil!")
}
// Start a goroutine
//
go func() {
// Loop endlessly
//
for {
// Block until we receive from one of the
// three following channels.
select {
case s := <-broker.newClients:
// There is a new client attached and we
// want to start sending them messages.
broker.clients[s] = true
log.Debug("Added new client")
case s := <-broker.defunctClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
close(s)
log.Debug("Removed client")
case msg := <-broker.messages:
// There is a new message to send. For each
// attached client, push the new message
// into the client's message channel.
for s := range broker.clients {
s <- msg
}
log.Printf("Broadcast message to %d clients", len(broker.clients))
}
}
}()
}
// Serve is the web handler clients use to connect to the broker
func Serve(c echo.Context) error {
rw := c.Response().Writer
// Make sure that the writer supports flushing.
//
f, ok := rw.(http.Flusher)
if !ok {
return echo.NewHTTPError(http.StatusInternalServerError, "Streaming not supported!")
}
// Create a new channel, over which the broker can
// send this client messages.
messageChan := make(chan Message)
// Add this client to the map of those that should
// receive updates
broker.newClients <- messageChan
// Listen to the closing of the http connection via the CloseNotifier
// FIXME: use Done()
notify := rw.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
// Remove this client from the map of attached clients
// when `EventHandler` exits.
broker.defunctClients <- messageChan
log.Debug("HTTP connection closed.")
}()
// Set the headers related to event streaming.
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Transfer-Encoding", "chunked")
// Push the initial list to the user
data, err := broker.init(c)
if err != nil {
log.Errorf("Error getting initial data: %s", err)
}
sendJSONMessage(&Message{
Kind: KindInit,
Data: data,
}, f, rw)
// Don't close the connection, instead loop endlessly.
for {
// Read from our messageChan.
msg, open := <-messageChan
if !open {
// If our messageChan was closed, this means that the client has
// disconnected.
break
}
sendJSONMessage(&msg, f, rw)
}
// Done.
log.Debug("Finished HTTP request")
return nil
}
// SendMessage lets any package send a message to clients
func SendMessage(m Message) {
broker.messages <- m
}
func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) {
jsonMessage, err := json.Marshal(m.Data)
if err != nil {
log.Errorf("Error serializing json: %s", err)
}
// The format is defined and has to be like this.
// This makes it however possible to register different event handlers for each event kind
_, err = fmt.Fprintf(rw, "event:%s\ndata: %s\n\n", m.Kind, string(jsonMessage))
if err != nil {
log.Errorf("Error sending message to client: %s", err)
}
// Flush the response. This is only possible if
// the repsonse supports streaming.
f.Flush()
}