基本逻辑完成
This commit is contained in:
144
leaf/leaf.go
Normal file
144
leaf/leaf.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package leaf
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/eclipse/paho.golang/packets"
|
||||
"github.com/eclipse/paho.golang/paho"
|
||||
"github.com/eclipse/paho.golang/paho/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Router interface {
|
||||
RegisterHandler(string, ...HandlerFunc)
|
||||
UnregisterHandler(string)
|
||||
Route(*packets.Publish)
|
||||
SetDebugLogger(log.Logger)
|
||||
Use(...HandlerFunc)
|
||||
}
|
||||
|
||||
// HandlerFunc defines the handler used by gin middleware as return value.
|
||||
type HandlerFunc func(*Context)
|
||||
|
||||
// OptionFunc defines the function to change the default configuration
|
||||
type OptionFunc func(*Engine)
|
||||
|
||||
// HandlersChain defines a HandlerFunc slice.
|
||||
type HandlersChain []HandlerFunc
|
||||
|
||||
// Last returns the last handler in the chain. i.e. the last handler is the main one.
|
||||
func (c HandlersChain) Last() HandlerFunc {
|
||||
if length := len(c); length > 0 {
|
||||
return c[length-1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
Handlers HandlersChain
|
||||
defaultHandler HandlersChain
|
||||
subscriptions map[string]HandlersChain
|
||||
aliases map[uint16]string
|
||||
debug log.Logger
|
||||
}
|
||||
|
||||
func New(ctx context.Context) *Engine {
|
||||
return &Engine{
|
||||
ctx: ctx,
|
||||
Handlers: make(HandlersChain, 0),
|
||||
subscriptions: make(map[string]HandlersChain),
|
||||
aliases: make(map[uint16]string),
|
||||
debug: log.NOOPLogger{},
|
||||
}
|
||||
}
|
||||
|
||||
func Default(ctx context.Context) *Engine {
|
||||
engine := &Engine{
|
||||
ctx: ctx,
|
||||
Handlers: make(HandlersChain, 0),
|
||||
subscriptions: make(map[string]HandlersChain),
|
||||
aliases: make(map[uint16]string),
|
||||
debug: log.NOOPLogger{},
|
||||
}
|
||||
engine.Use(Recovery())
|
||||
return engine
|
||||
}
|
||||
|
||||
func (e *Engine) RegisterHandler(topic string, handlers ...HandlerFunc) {
|
||||
e.debug.Println("registering handler for:", topic)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.subscriptions[topic] = e.combineHandlers(handlers)
|
||||
}
|
||||
|
||||
func (e *Engine) UnregisterHandler(topic string) {
|
||||
e.debug.Println("unregistering handler for:", topic)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
delete(e.subscriptions, topic)
|
||||
}
|
||||
|
||||
func (e *Engine) Route(pb *packets.Publish) {
|
||||
e.debug.Println("routing message for:", pb.Topic)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
m := paho.PublishFromPacketPublish(pb)
|
||||
|
||||
var topic string
|
||||
if pb.Properties.TopicAlias != nil {
|
||||
e.debug.Println("message is using topic aliasing")
|
||||
if pb.Topic != "" {
|
||||
// Register new alias
|
||||
e.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic)
|
||||
e.aliases[*pb.Properties.TopicAlias] = pb.Topic
|
||||
}
|
||||
if t, ok := e.aliases[*pb.Properties.TopicAlias]; ok {
|
||||
e.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic)
|
||||
topic = t
|
||||
}
|
||||
} else {
|
||||
topic = m.Topic
|
||||
}
|
||||
|
||||
handlerCalled := false
|
||||
for route, handlers := range e.subscriptions {
|
||||
if match(route, topic) {
|
||||
e.debug.Println("found handler for:", route)
|
||||
go WithLeafContext(e.ctx, m, e, handlers).Next()
|
||||
handlerCalled = true
|
||||
}
|
||||
}
|
||||
|
||||
if !handlerCalled && e.defaultHandler != nil {
|
||||
go WithLeafContext(e.ctx, m, e, e.defaultHandler).Next()
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) SetDebugLogger(l log.Logger) {
|
||||
e.debug = l
|
||||
}
|
||||
|
||||
func (e *Engine) Use(middleware ...HandlerFunc) {
|
||||
e.Handlers = append(e.Handlers, middleware...)
|
||||
}
|
||||
|
||||
func (e *Engine) DefaultHandler(h HandlerFunc) {
|
||||
e.debug.Println("registering default handler")
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.defaultHandler = e.combineHandlers(HandlersChain{h})
|
||||
}
|
||||
|
||||
func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain {
|
||||
finalSize := len(e.Handlers) + len(handlers)
|
||||
assert1(finalSize < int(abortIndex), "too many handlers")
|
||||
mergedHandlers := make(HandlersChain, finalSize)
|
||||
copy(mergedHandlers, e.Handlers)
|
||||
copy(mergedHandlers[len(e.Handlers):], handlers)
|
||||
return mergedHandlers
|
||||
}
|
||||
Reference in New Issue
Block a user