Files
game-driver/leaf/leaf.go

184 lines
4.4 KiB
Go

package leaf
import (
"context"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
"github.com/eclipse/paho.golang/paho/log"
"sync"
)
type Router interface {
RegisterHandler(string, ...HandlerFunc)
UnregisterHandler(string)
Route(*packets.Publish)
HandlerRun(t string, p *paho.Publish) bool
SetDebugLogger(log.Logger)
Use(...HandlerFunc)
}
// HandlerFunc defines the handler used by gin middleware as return value.
type HandlerFunc func(*Context)
// OptionFunc defines the function to change the default configuration
type OptionFunc func(*Engine)
// HandlersChain defines a HandlerFunc slice.
type HandlersChain []HandlerFunc
// Last returns the last handler in the chain. i.e. the last handler is the main one.
func (c HandlersChain) Last() HandlerFunc {
if length := len(c); length > 0 {
return c[length-1]
}
return nil
}
type Engine struct {
mu sync.RWMutex
queueWg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
done chan struct{}
Handlers HandlersChain
defaultHandler HandlersChain
subscriptions map[string]HandlersChain
aliases map[uint16]string
debug log.Logger
}
func New(ctx context.Context) *Engine {
c, cancel := context.WithCancel(ctx)
return &Engine{
ctx: c,
cancelCtx: cancel,
done: make(chan struct{}),
Handlers: make(HandlersChain, 0),
subscriptions: make(map[string]HandlersChain),
aliases: make(map[uint16]string),
debug: log.NOOPLogger{},
}
}
func Default(ctx context.Context) *Engine {
c, cancel := context.WithCancel(ctx)
engine := &Engine{
ctx: c,
cancelCtx: cancel,
done: make(chan struct{}),
Handlers: make(HandlersChain, 0),
subscriptions: make(map[string]HandlersChain),
aliases: make(map[uint16]string),
debug: log.NOOPLogger{},
}
engine.Use(Recovery())
return engine
}
func (e *Engine) RegisterHandler(topic string, handlers ...HandlerFunc) {
e.debug.Println("registering handler for:", topic)
e.mu.Lock()
defer e.mu.Unlock()
e.subscriptions[topic] = e.combineHandlers(handlers)
}
func (e *Engine) UnregisterHandler(topic string) {
e.debug.Println("unregistering handler for:", topic)
e.mu.Lock()
defer e.mu.Unlock()
delete(e.subscriptions, topic)
}
func (e *Engine) HandlerRun(t string, p *paho.Publish) bool {
for route, handlers := range e.subscriptions {
if match(route, t) {
e.debug.Println("found handler for:", route)
e.queueWg.Add(1)
go func() {
defer e.queueWg.Done()
WithLeafContext(e.ctx, p, e, handlers).Next()
}()
return true
}
}
return false
}
func (e *Engine) Route(pb *packets.Publish) {
e.debug.Println("routing message for:", pb.Topic)
e.mu.Lock()
defer e.mu.Unlock()
m := paho.PublishFromPacketPublish(pb)
var topic string
if pb.Properties.TopicAlias != nil {
e.debug.Println("message is using topic aliasing")
if pb.Topic != "" {
// Register new alias
e.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic)
e.aliases[*pb.Properties.TopicAlias] = pb.Topic
}
if t, ok := e.aliases[*pb.Properties.TopicAlias]; ok {
e.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic)
topic = t
}
} else {
topic = m.Topic
}
handlerCalled := e.HandlerRun(topic, m)
if !handlerCalled && e.defaultHandler != nil {
e.queueWg.Add(1)
go func() {
defer e.queueWg.Done()
WithLeafContext(e.ctx, m, e, e.defaultHandler).Next()
}()
}
}
func (e *Engine) SetDebugLogger(l log.Logger) {
e.debug = l
}
func (e *Engine) Use(middleware ...HandlerFunc) {
e.Handlers = append(e.Handlers, middleware...)
}
func (e *Engine) DefaultHandler(h HandlerFunc) {
e.debug.Println("registering default handler")
e.mu.Lock()
defer e.mu.Unlock()
e.defaultHandler = e.combineHandlers(HandlersChain{h})
}
func (e *Engine) Disconnect(ctx context.Context) error {
go func() {
e.queueWg.Wait()
close(e.done)
}()
e.cancelCtx()
select {
case <-e.done: // wait for goroutine to exit
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain {
finalSize := len(e.Handlers) + len(handlers)
assert1(finalSize < int(abortIndex), "too many handlers")
mergedHandlers := make(HandlersChain, finalSize)
copy(mergedHandlers, e.Handlers)
copy(mergedHandlers[len(e.Handlers):], handlers)
return mergedHandlers
}