package leaf import ( "context" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" "github.com/eclipse/paho.golang/paho/log" "sync" ) type Router interface { RegisterHandler(string, ...HandlerFunc) UnregisterHandler(string) Route(*packets.Publish) SetDebugLogger(log.Logger) Use(...HandlerFunc) } // HandlerFunc defines the handler used by gin middleware as return value. type HandlerFunc func(*Context) // OptionFunc defines the function to change the default configuration type OptionFunc func(*Engine) // HandlersChain defines a HandlerFunc slice. type HandlersChain []HandlerFunc // Last returns the last handler in the chain. i.e. the last handler is the main one. func (c HandlersChain) Last() HandlerFunc { if length := len(c); length > 0 { return c[length-1] } return nil } type Engine struct { mu sync.RWMutex queueWg sync.WaitGroup ctx context.Context cancelCtx context.CancelFunc done chan struct{} Handlers HandlersChain defaultHandler HandlersChain subscriptions map[string]HandlersChain aliases map[uint16]string debug log.Logger } func New(ctx context.Context) *Engine { c, cancel := context.WithCancel(ctx) return &Engine{ ctx: c, cancelCtx: cancel, done: make(chan struct{}), Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), debug: log.NOOPLogger{}, } } func Default(ctx context.Context) *Engine { c, cancel := context.WithCancel(ctx) engine := &Engine{ ctx: c, cancelCtx: cancel, done: make(chan struct{}), Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), debug: log.NOOPLogger{}, } engine.Use(Recovery()) return engine } func (e *Engine) RegisterHandler(topic string, handlers ...HandlerFunc) { e.debug.Println("registering handler for:", topic) e.mu.Lock() defer e.mu.Unlock() e.subscriptions[topic] = e.combineHandlers(handlers) } func (e *Engine) UnregisterHandler(topic string) { e.debug.Println("unregistering handler for:", topic) e.mu.Lock() defer e.mu.Unlock() delete(e.subscriptions, topic) } func (e *Engine) Route(pb *packets.Publish) { e.debug.Println("routing message for:", pb.Topic) e.mu.Lock() defer e.mu.Unlock() m := paho.PublishFromPacketPublish(pb) var topic string if pb.Properties.TopicAlias != nil { e.debug.Println("message is using topic aliasing") if pb.Topic != "" { // Register new alias e.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic) e.aliases[*pb.Properties.TopicAlias] = pb.Topic } if t, ok := e.aliases[*pb.Properties.TopicAlias]; ok { e.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic) topic = t } } else { topic = m.Topic } handlerCalled := false for route, handlers := range e.subscriptions { if match(route, topic) { e.debug.Println("found handler for:", route) e.queueWg.Add(1) go func() { defer e.queueWg.Done() WithLeafContext(e.ctx, m, e, handlers).Next() }() handlerCalled = true } } if !handlerCalled && e.defaultHandler != nil { e.queueWg.Add(1) go func() { defer e.queueWg.Done() WithLeafContext(e.ctx, m, e, e.defaultHandler).Next() }() } } func (e *Engine) SetDebugLogger(l log.Logger) { e.debug = l } func (e *Engine) Use(middleware ...HandlerFunc) { e.Handlers = append(e.Handlers, middleware...) } func (e *Engine) DefaultHandler(h HandlerFunc) { e.debug.Println("registering default handler") e.mu.Lock() defer e.mu.Unlock() e.defaultHandler = e.combineHandlers(HandlersChain{h}) } func (e *Engine) Disconnect(ctx context.Context) error { go func() { e.queueWg.Wait() close(e.done) }() e.cancelCtx() select { case <-e.done: // wait for goroutine to exit return nil case <-ctx.Done(): return ctx.Err() } } func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain { finalSize := len(e.Handlers) + len(handlers) assert1(finalSize < int(abortIndex), "too many handlers") mergedHandlers := make(HandlersChain, finalSize) copy(mergedHandlers, e.Handlers) copy(mergedHandlers[len(e.Handlers):], handlers) return mergedHandlers }