From 3a2fc431ac0f9e9304cb47b3be8ca590e9c37b46 Mon Sep 17 00:00:00 2001 From: mapleafgo Date: Fri, 28 Feb 2025 17:05:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E7=AD=89=E5=BE=85=E7=BB=A7?= =?UTF-8?q?=E7=94=B5=E5=99=A8=E9=87=8A=E6=94=BE=EF=BC=8C=E5=8A=A0=E5=85=A5?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=81=9C=E6=AD=A2=E6=89=A7=E8=A1=8C=E6=97=B6?= =?UTF-8?q?=E9=87=8A=E6=94=BE=E6=89=80=E6=9C=89=E8=B5=84=E6=BA=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 2 +- internal/server.go | 5 ++++- leaf/leaf.go | 46 ++++++++++++++++++++++++++++++++++++++++------ logger/logger.go | 2 +- pkg/relay/relay.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 9 deletions(-) diff --git a/config.yml b/config.yml index b383814..f404df6 100755 --- a/config.yml +++ b/config.yml @@ -11,7 +11,7 @@ log: maxAge: 30 compress: true mqtt: - url: mqtt://58.144.199.46:1883 + url: mqtt://wushan-mqtt.chaoshengshuzi.com:1883 aliyun: accessKeyID: accessKeySecret: diff --git a/internal/server.go b/internal/server.go index 66079a7..305dbf9 100644 --- a/internal/server.go +++ b/internal/server.go @@ -126,7 +126,7 @@ func Run() { if config.C.Relay != "" { r, err = relay.New(config.C.Relay) if err != nil { - zap.S().Panicln("继电器连接异常: ", err) + zap.S().Errorln("继电器连接异常: ", err) } defer r.Close() } @@ -179,6 +179,9 @@ func Run() { if e := cm.Disconnect(ctx); e != nil { zap.S().Errorln("断开连接异常", e) } + if e := router.Disconnect(ctx); e != nil { + zap.S().Errorln("停止所以任务超时", e) + } zap.S().Infoln("关闭完成") } diff --git a/leaf/leaf.go b/leaf/leaf.go index d63d2b4..1e5d92d 100644 --- a/leaf/leaf.go +++ b/leaf/leaf.go @@ -34,8 +34,13 @@ func (c HandlersChain) Last() HandlerFunc { } type Engine struct { - mu sync.RWMutex - ctx context.Context + mu sync.RWMutex + queueWg sync.WaitGroup + + ctx context.Context + cancelCtx context.CancelFunc + done chan struct{} + Handlers HandlersChain defaultHandler HandlersChain subscriptions map[string]HandlersChain @@ -44,8 +49,11 @@ type Engine struct { } func New(ctx context.Context) *Engine { + c, cancel := context.WithCancel(ctx) return &Engine{ - ctx: ctx, + ctx: c, + cancelCtx: cancel, + done: make(chan struct{}), Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), @@ -54,8 +62,11 @@ func New(ctx context.Context) *Engine { } func Default(ctx context.Context) *Engine { + c, cancel := context.WithCancel(ctx) engine := &Engine{ - ctx: ctx, + ctx: c, + cancelCtx: cancel, + done: make(chan struct{}), Handlers: make(HandlersChain, 0), subscriptions: make(map[string]HandlersChain), aliases: make(map[uint16]string), @@ -108,13 +119,21 @@ func (e *Engine) Route(pb *packets.Publish) { for route, handlers := range e.subscriptions { if match(route, topic) { e.debug.Println("found handler for:", route) - go WithLeafContext(e.ctx, m, e, handlers).Next() + e.queueWg.Add(1) + go func() { + defer e.queueWg.Done() + WithLeafContext(e.ctx, m, e, handlers).Next() + }() handlerCalled = true } } if !handlerCalled && e.defaultHandler != nil { - go WithLeafContext(e.ctx, m, e, e.defaultHandler).Next() + e.queueWg.Add(1) + go func() { + defer e.queueWg.Done() + WithLeafContext(e.ctx, m, e, e.defaultHandler).Next() + }() } } @@ -134,6 +153,21 @@ func (e *Engine) DefaultHandler(h HandlerFunc) { e.defaultHandler = e.combineHandlers(HandlersChain{h}) } +func (e *Engine) Disconnect(ctx context.Context) error { + go func() { + e.queueWg.Wait() + close(e.done) + }() + + e.cancelCtx() + select { + case <-e.done: // wait for goroutine to exit + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func (e *Engine) combineHandlers(handlers HandlersChain) HandlersChain { finalSize := len(e.Handlers) + len(handlers) assert1(finalSize < int(abortIndex), "too many handlers") diff --git a/logger/logger.go b/logger/logger.go index 0f5911e..ab1f722 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -51,7 +51,7 @@ func InitProLogger(cls *TenCls) { cn := zapcore.NewJSONEncoder(nc) // 多个输出 - mws := zapcore.NewMultiWriteSyncer(zapcore.AddSync(cls), zapcore.AddSync(os.Stdout)) + mws := zapcore.NewMultiWriteSyncer(zapcore.AddSync(cls), zapcore.AddSync(config.C.Log.File), zapcore.AddSync(os.Stdout)) // 核心配置 core := zapcore.NewCore(cn, mws, level) diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index d1fc53c..73d0b3a 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -1,9 +1,15 @@ package relay import ( + "context" + "errors" "github.com/grid-x/modbus" "go.uber.org/zap" "io" + "io/fs" + "os" + "strings" + "time" ) type Relay interface { @@ -56,6 +62,29 @@ func (r *device) Off(num int) error { func New(address string) (Relay, error) { zap.S().Infoln("连接继电器: ", address) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + a := make(chan struct{}) + go func() { + defer close(a) + for { + select { + case <-ctx.Done(): + zap.S().Infoln("等待继电器资源释放超时:", address) + return + case <-time.After(3 * time.Second): + _, err := os.OpenFile(address, os.O_RDWR, 0666) + var e *fs.PathError + if errors.As(err, &e) && strings.Contains(e.Error(), "busy") { + zap.S().Infoln("等待继电器资源释放:", address) + } else { + return + } + } + } + }() + <-a + h := modbus.NewRTUClientHandler(address) h.SlaveID = 1 h.BaudRate = 9600