加入等待继电器释放,加入程序停止执行时释放所有资源
This commit is contained in:
46
leaf/leaf.go
46
leaf/leaf.go
@@ -34,8 +34,13 @@ func (c HandlersChain) Last() HandlerFunc {
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
mu sync.RWMutex
|
||||
queueWg sync.WaitGroup
|
||||
|
||||
ctx context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
done chan struct{}
|
||||
|
||||
Handlers HandlersChain
|
||||
defaultHandler HandlersChain
|
||||
subscriptions map[string]HandlersChain
|
||||
@@ -44,8 +49,11 @@ type Engine struct {
|
||||
}
|
||||
|
||||
func New(ctx context.Context) *Engine {
|
||||
c, cancel := context.WithCancel(ctx)
|
||||
return &Engine{
|
||||
ctx: ctx,
|
||||
ctx: c,
|
||||
cancelCtx: cancel,
|
||||
done: make(chan struct{}),
|
||||
Handlers: make(HandlersChain, 0),
|
||||
subscriptions: make(map[string]HandlersChain),
|
||||
aliases: make(map[uint16]string),
|
||||
@@ -54,8 +62,11 @@ func New(ctx context.Context) *Engine {
|
||||
}
|
||||
|
||||
func Default(ctx context.Context) *Engine {
|
||||
c, cancel := context.WithCancel(ctx)
|
||||
engine := &Engine{
|
||||
ctx: ctx,
|
||||
ctx: c,
|
||||
cancelCtx: cancel,
|
||||
done: make(chan struct{}),
|
||||
Handlers: make(HandlersChain, 0),
|
||||
subscriptions: make(map[string]HandlersChain),
|
||||
aliases: make(map[uint16]string),
|
||||
@@ -108,13 +119,21 @@ func (e *Engine) Route(pb *packets.Publish) {
|
||||
for route, handlers := range e.subscriptions {
|
||||
if match(route, topic) {
|
||||
e.debug.Println("found handler for:", route)
|
||||
go WithLeafContext(e.ctx, m, e, handlers).Next()
|
||||
e.queueWg.Add(1)
|
||||
go func() {
|
||||
defer e.queueWg.Done()
|
||||
WithLeafContext(e.ctx, m, e, handlers).Next()
|
||||
}()
|
||||
handlerCalled = true
|
||||
}
|
||||
}
|
||||
|
||||
if !handlerCalled && e.defaultHandler != nil {
|
||||
go WithLeafContext(e.ctx, m, e, e.defaultHandler).Next()
|
||||
e.queueWg.Add(1)
|
||||
go func() {
|
||||
defer e.queueWg.Done()
|
||||
WithLeafContext(e.ctx, m, e, e.defaultHandler).Next()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,6 +153,21 @@ func (e *Engine) DefaultHandler(h HandlerFunc) {
|
||||
e.defaultHandler = e.combineHandlers(HandlersChain{h})
|
||||
}
|
||||
|
||||
func (e *Engine) Disconnect(ctx context.Context) error {
|
||||
go func() {
|
||||
e.queueWg.Wait()
|
||||
close(e.done)
|
||||
}()
|
||||
|
||||
e.cancelCtx()
|
||||
select {
|
||||
case <-e.done: // wait for goroutine to exit
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain {
|
||||
finalSize := len(e.Handlers) + len(handlers)
|
||||
assert1(finalSize < int(abortIndex), "too many handlers")
|
||||
|
||||
Reference in New Issue
Block a user