145 lines
3.7 KiB
Go
145 lines
3.7 KiB
Go
package leaf
|
|
|
|
import (
|
|
"context"
|
|
"github.com/eclipse/paho.golang/packets"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"github.com/eclipse/paho.golang/paho/log"
|
|
"sync"
|
|
)
|
|
|
|
type Router interface {
|
|
RegisterHandler(string, ...HandlerFunc)
|
|
UnregisterHandler(string)
|
|
Route(*packets.Publish)
|
|
SetDebugLogger(log.Logger)
|
|
Use(...HandlerFunc)
|
|
}
|
|
|
|
// HandlerFunc defines the handler used by gin middleware as return value.
|
|
type HandlerFunc func(*Context)
|
|
|
|
// OptionFunc defines the function to change the default configuration
|
|
type OptionFunc func(*Engine)
|
|
|
|
// HandlersChain defines a HandlerFunc slice.
|
|
type HandlersChain []HandlerFunc
|
|
|
|
// Last returns the last handler in the chain. i.e. the last handler is the main one.
|
|
func (c HandlersChain) Last() HandlerFunc {
|
|
if length := len(c); length > 0 {
|
|
return c[length-1]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
Handlers HandlersChain
|
|
defaultHandler HandlersChain
|
|
subscriptions map[string]HandlersChain
|
|
aliases map[uint16]string
|
|
debug log.Logger
|
|
}
|
|
|
|
func New(ctx context.Context) *Engine {
|
|
return &Engine{
|
|
ctx: ctx,
|
|
Handlers: make(HandlersChain, 0),
|
|
subscriptions: make(map[string]HandlersChain),
|
|
aliases: make(map[uint16]string),
|
|
debug: log.NOOPLogger{},
|
|
}
|
|
}
|
|
|
|
func Default(ctx context.Context) *Engine {
|
|
engine := &Engine{
|
|
ctx: ctx,
|
|
Handlers: make(HandlersChain, 0),
|
|
subscriptions: make(map[string]HandlersChain),
|
|
aliases: make(map[uint16]string),
|
|
debug: log.NOOPLogger{},
|
|
}
|
|
engine.Use(Recovery())
|
|
return engine
|
|
}
|
|
|
|
func (e *Engine) RegisterHandler(topic string, handlers ...HandlerFunc) {
|
|
e.debug.Println("registering handler for:", topic)
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
e.subscriptions[topic] = e.combineHandlers(handlers)
|
|
}
|
|
|
|
func (e *Engine) UnregisterHandler(topic string) {
|
|
e.debug.Println("unregistering handler for:", topic)
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
delete(e.subscriptions, topic)
|
|
}
|
|
|
|
func (e *Engine) Route(pb *packets.Publish) {
|
|
e.debug.Println("routing message for:", pb.Topic)
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
m := paho.PublishFromPacketPublish(pb)
|
|
|
|
var topic string
|
|
if pb.Properties.TopicAlias != nil {
|
|
e.debug.Println("message is using topic aliasing")
|
|
if pb.Topic != "" {
|
|
// Register new alias
|
|
e.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic)
|
|
e.aliases[*pb.Properties.TopicAlias] = pb.Topic
|
|
}
|
|
if t, ok := e.aliases[*pb.Properties.TopicAlias]; ok {
|
|
e.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic)
|
|
topic = t
|
|
}
|
|
} else {
|
|
topic = m.Topic
|
|
}
|
|
|
|
handlerCalled := false
|
|
for route, handlers := range e.subscriptions {
|
|
if match(route, topic) {
|
|
e.debug.Println("found handler for:", route)
|
|
go WithLeafContext(e.ctx, m, e, handlers).Next()
|
|
handlerCalled = true
|
|
}
|
|
}
|
|
|
|
if !handlerCalled && e.defaultHandler != nil {
|
|
go WithLeafContext(e.ctx, m, e, e.defaultHandler).Next()
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SetDebugLogger(l log.Logger) {
|
|
e.debug = l
|
|
}
|
|
|
|
func (e *Engine) Use(middleware ...HandlerFunc) {
|
|
e.Handlers = append(e.Handlers, middleware...)
|
|
}
|
|
|
|
func (e *Engine) DefaultHandler(h HandlerFunc) {
|
|
e.debug.Println("registering default handler")
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
e.defaultHandler = e.combineHandlers(HandlersChain{h})
|
|
}
|
|
|
|
func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain {
|
|
finalSize := len(e.Handlers) + len(handlers)
|
|
assert1(finalSize < int(abortIndex), "too many handlers")
|
|
mergedHandlers := make(HandlersChain, finalSize)
|
|
copy(mergedHandlers, e.Handlers)
|
|
copy(mergedHandlers[len(e.Handlers):], handlers)
|
|
return mergedHandlers
|
|
}
|