package leaf import ( "context" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" "github.com/eclipse/paho.golang/paho/log" "sync" ) type Router interface { RegisterHandler(string, ...HandlerFunc) UnregisterHandler(string) Route(*packets.Publish) SetDebugLogger(log.Logger) Use(...HandlerFunc) } // HandlerFunc defines the handler used by gin middleware as return value. type HandlerFunc func(*Context) // OptionFunc defines the function to change the default configuration type OptionFunc func(*Engine) // HandlersChain defines a HandlerFunc slice. type HandlersChain []HandlerFunc // Last returns the last handler in the chain. i.e. the last handler is the main one. func (c HandlersChain) Last() HandlerFunc { if length := len(c); length > 0 { return c[length-1] } return nil } type Engine struct { mu sync.RWMutex ctx context.Context Handlers HandlersChain defaultHandler HandlersChain subscriptions map[string]HandlersChain aliases map[uint16]string debug log.Logger } func New(ctx context.Context) *Engine { return &Engine{ ctx: ctx, Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), debug: log.NOOPLogger{}, } } func Default(ctx context.Context) *Engine { engine := &Engine{ ctx: ctx, Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), debug: log.NOOPLogger{}, } engine.Use(Recovery()) return engine } func (e *Engine) RegisterHandler(topic string, handlers ...HandlerFunc) { e.debug.Println("registering handler for:", topic) e.mu.Lock() defer e.mu.Unlock() e.subscriptions[topic] = e.combineHandlers(handlers) } func (e *Engine) UnregisterHandler(topic string) { e.debug.Println("unregistering handler for:", topic) e.mu.Lock() defer e.mu.Unlock() delete(e.subscriptions, topic) } func (e *Engine) Route(pb *packets.Publish) { e.debug.Println("routing message for:", pb.Topic) e.mu.Lock() defer e.mu.Unlock() m := paho.PublishFromPacketPublish(pb) var topic string if pb.Properties.TopicAlias != nil { e.debug.Println("message is using topic aliasing") if pb.Topic != "" { // Register new alias e.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic) e.aliases[*pb.Properties.TopicAlias] = pb.Topic } if t, ok := e.aliases[*pb.Properties.TopicAlias]; ok { e.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic) topic = t } } else { topic = m.Topic } handlerCalled := false for route, handlers := range e.subscriptions { if match(route, topic) { e.debug.Println("found handler for:", route) go WithLeafContext(e.ctx, m, e, handlers).Next() handlerCalled = true } } if !handlerCalled && e.defaultHandler != nil { go WithLeafContext(e.ctx, m, e, e.defaultHandler).Next() } } func (e *Engine) SetDebugLogger(l log.Logger) { e.debug = l } func (e *Engine) Use(middleware ...HandlerFunc) { e.Handlers = append(e.Handlers, middleware...) } func (e *Engine) DefaultHandler(h HandlerFunc) { e.debug.Println("registering default handler") e.mu.Lock() defer e.mu.Unlock() e.defaultHandler = e.combineHandlers(HandlersChain{h}) } func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain { finalSize := len(e.Handlers) + len(handlers) assert1(finalSize < int(abortIndex), "too many handlers") mergedHandlers := make(HandlersChain, finalSize) copy(mergedHandlers, e.Handlers) copy(mergedHandlers[len(e.Handlers):], handlers) return mergedHandlers }