From febcdfdbf79522b762417e5945a0bc2eb051cf1f Mon Sep 17 00:00:00 2001 From: mapleafgo Date: Fri, 7 Mar 2025 16:04:19 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E5=8A=A0=E5=85=A5=E8=AE=A4=E8=AF=81?= =?UTF-8?q?=EF=BC=8C=E4=BC=98=E5=8C=96=E8=A7=86=E9=A2=91=E6=92=AD=E6=94=BE?= =?UTF-8?q?=E4=B8=8E=E6=B5=8F=E8=A7=88=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 4 ++- internal/common/pause_sub.go | 32 ++++++++++++++---------- internal/middleware/stop.go | 5 ++-- internal/routes/play/only_video.go | 4 +-- internal/routes/standby/video.go | 4 +-- internal/routes/standby_ctrl/pause.go | 11 +++------ internal/routes/standby_ctrl/time.go | 3 +++ internal/routes/wait.go | 2 +- internal/server.go | 7 +++--- pkg/browser/browser.go | 17 +------------ pkg/utils/link_video.go | 35 ++++----------------------- pkg/video/play.go | 14 ++++++++--- readme.md | 4 ++- 13 files changed, 60 insertions(+), 82 deletions(-) diff --git a/config/config.go b/config/config.go index 3a75f00..91b543f 100644 --- a/config/config.go +++ b/config/config.go @@ -5,7 +5,9 @@ import ( ) type MqttConfig struct { - Url string + Url string + ClientID string + Password string } type AliyunConfig struct { diff --git a/internal/common/pause_sub.go b/internal/common/pause_sub.go index 1b211a8..2fa1e0b 100644 --- a/internal/common/pause_sub.go +++ b/internal/common/pause_sub.go @@ -1,45 +1,50 @@ package common -import "sync" +import ( + "context" + "sync" +) type PauseSub struct { - ctrl *CtrlWait - // 回调函数 + ctrl *CtrlWait items []chan int8 m sync.RWMutex } -// Add 添加一个暂停项 -func (p *PauseSub) Add(item chan int8) { +func (p *PauseSub) GetNew() chan int8 { p.m.Lock() defer p.m.Unlock() - p.items = append(p.items, item) + sub := make(chan int8) + p.items = append(p.items, sub) + return sub } -// Remove 移除一个暂停项 -func (p *PauseSub) Remove(item chan int8) { +func (p *PauseSub) Close(sub chan int8) { p.m.Lock() defer p.m.Unlock() + close(sub) for i, v := range p.items { - if v == item { + if v == sub { p.items = append(p.items[:i], p.items[i+1:]...) } } } // Run 开始监听 -func (p *PauseSub) Run() { +func (p *PauseSub) Run(ctx context.Context) { p.ctrl.Open() defer p.ctrl.Close() for { select { - case <-p.ctrl.C: + case <-ctx.Done(): + return + case c := <-p.ctrl.C: go func() { p.m.RLock() defer p.m.RUnlock() for _, item := range p.items { - item <- 1 + item <- c } }() } @@ -48,6 +53,7 @@ func (p *PauseSub) Run() { func NewPauseSub(c *CtrlWait) *PauseSub { return &PauseSub{ - ctrl: c, + ctrl: c, + items: make([]chan int8, 0), } } diff --git a/internal/middleware/stop.go b/internal/middleware/stop.go index cc1c3a3..366eb7c 100644 --- a/internal/middleware/stop.go +++ b/internal/middleware/stop.go @@ -13,6 +13,9 @@ func EmergencyStop(stopper common.Stopper) leaf.HandlerFunc { cancel := leaf.WithCancel(c) defer stopper.Reset() + zap.S().Infoln("监听停止信号") + defer zap.S().Infoln("结束停止信号监听") + // 等待组 var wait sync.WaitGroup defer wait.Wait() @@ -22,11 +25,9 @@ func EmergencyStop(stopper common.Stopper) leaf.HandlerFunc { // 发送结束信号 defer close(a) - zap.S().Infoln("监听停止信号") wait.Add(1) go func() { defer wait.Done() - defer zap.S().Infoln("结束停止信号监听") select { case <-a: diff --git a/internal/routes/play/only_video.go b/internal/routes/play/only_video.go index 94d1d71..f42c368 100644 --- a/internal/routes/play/only_video.go +++ b/internal/routes/play/only_video.go @@ -16,11 +16,11 @@ func OnlyVideo(c *leaf.Context) { defer utils.BlankClose() if url, ok := payload.Game["video"]; ok { - local, err := utils.LinkVideo(url.(string)) + path, local, err := utils.LinkVideo(url.(string)) if err != nil { zap.S().Errorln("视频文件获取异常: ", err) return } - _ = video.Play(c, local) + _ = video.Play(c, path, local) } } diff --git a/internal/routes/standby/video.go b/internal/routes/standby/video.go index 6cf1846..d99d03d 100644 --- a/internal/routes/standby/video.go +++ b/internal/routes/standby/video.go @@ -12,7 +12,7 @@ import ( func Video(item schema.WaitItemModel) func(c context.Context) error { return func(c context.Context) error { - local, err := utils.LinkVideo(item.Data) + path, local, err := utils.LinkVideo(item.Data) if err != nil { return fmt.Errorf("视频文件获取异常: %w", err) } @@ -24,7 +24,7 @@ func Video(item schema.WaitItemModel) func(c context.Context) error { defer utils.BlankClose() for { - err := video.Play(c, local) + err := video.Play(c, path, local) if err != nil { return fmt.Errorf("视频播放异常: %w", err) } diff --git a/internal/routes/standby_ctrl/pause.go b/internal/routes/standby_ctrl/pause.go index 82ba5da..68bf0b1 100644 --- a/internal/routes/standby_ctrl/pause.go +++ b/internal/routes/standby_ctrl/pause.go @@ -13,14 +13,11 @@ func Pause(ps *common.PauseSub, isPause bool, play func(c context.Context) error run := true if isPause { - p := make(chan int8) - defer close(p) + zap.S().Infoln("待机暂停控制器") + defer zap.S().Infoln("待机暂停控制器结束") - ps.Add(p) - defer ps.Remove(p) - - zap.S().Infoln("待机控制器") - defer zap.S().Infoln("待机控制器结束") + p := ps.GetNew() + defer ps.Close(p) // 等待组 var wait sync.WaitGroup diff --git a/internal/routes/standby_ctrl/time.go b/internal/routes/standby_ctrl/time.go index 3bd2e0e..7a4e987 100644 --- a/internal/routes/standby_ctrl/time.go +++ b/internal/routes/standby_ctrl/time.go @@ -25,6 +25,9 @@ func Time(rootRules []cronrange.Rule, cron string, play func(c context.Context) a := make(chan bool) defer close(a) + zap.S().Infoln("待机时间控制器") + defer zap.S().Infoln("待机时间控制器结束") + // 等待组 var waitGroup sync.WaitGroup defer waitGroup.Wait() diff --git a/internal/routes/wait.go b/internal/routes/wait.go index c5476bf..a296c14 100644 --- a/internal/routes/wait.go +++ b/internal/routes/wait.go @@ -38,7 +38,7 @@ func WaitAction(ctrl *common.CtrlWait) leaf.HandlerFunc { waitGroup.Add(1) go func() { defer waitGroup.Done() - ps.Run() + ps.Run(c) }() // 处理每个待机控制 diff --git a/internal/server.go b/internal/server.go index 79e7434..ef82b57 100644 --- a/internal/server.go +++ b/internal/server.go @@ -40,6 +40,7 @@ func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopah mqttConfig := autopaho.ClientConfig{ ServerUrls: []*url.URL{u}, KeepAlive: 20, + ConnectPassword: []byte(c.Password), CleanStartOnInitialConnection: false, SessionExpiryInterval: 60, OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { @@ -56,7 +57,7 @@ func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopah zap.S().Infof("MQTT 连接异常: %s\n", err) }, ClientConfig: paho.ClientConfig{ - ClientID: fmt.Sprintf("game-driver-%s-%v", config.C.Location, config.C.Point), + ClientID: c.ClientID, OnPublishReceived: []func(paho.PublishReceived) (bool, error){ func(pr paho.PublishReceived) (bool, error) { r.Route(pr.Packet.Packet()) @@ -109,8 +110,8 @@ func Run() { router := leaf.Default(ctx) - log, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel) - router.SetDebugLogger(log) + logAt, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel) + router.SetDebugLogger(logAt) router.DefaultHandler(func(c *leaf.Context) { zap.S().Infof("未处理消息,topic: %s\n payload: %s\n", c.Topic, c.Payload) diff --git a/pkg/browser/browser.go b/pkg/browser/browser.go index b547ebb..5bffcac 100644 --- a/pkg/browser/browser.go +++ b/pkg/browser/browser.go @@ -5,8 +5,6 @@ import ( "github.com/go-rod/rod" "github.com/go-rod/rod/lib/launcher" "github.com/go-rod/rod/lib/launcher/flags" - "github.com/go-rod/rod/lib/proto" - "go.uber.org/zap" ) // OpenApp 用APP模式打开网页 @@ -26,18 +24,5 @@ func OpenApp(c context.Context, url string) { b := rod.New().ControlURL(p).MustConnect() defer b.MustClose() - s := make(chan struct{}) - - wait := b.EachEvent(func(e *proto.TargetTargetDestroyed) { - zap.S().Infoln("浏览器关闭事件") - s <- struct{}{} - }) - go wait() - - select { - case <-c.Done(): - b.MustClose() - <-s - case <-s: - } + <-c.Done() } diff --git a/pkg/utils/link_video.go b/pkg/utils/link_video.go index 52415c1..7aa4f38 100644 --- a/pkg/utils/link_video.go +++ b/pkg/utils/link_video.go @@ -2,16 +2,12 @@ package utils import ( "fmt" - "io" - "net/http" "net/url" - "os" - "path" "strings" ) // LinkVideo 链接视频,解析链接,网络文件会下载到临时目录并返回本地路径 -func LinkVideo(link string) (local string, err error) { +func LinkVideo(link string) (path string, local bool, err error) { if link == "" { return } @@ -20,35 +16,14 @@ func LinkVideo(link string) (local string, err error) { err = fmt.Errorf("URL 解析错误: %v", err) } else { if u.Scheme == "file" { - local, _ = strings.CutPrefix(link, "file://") + local = true + path, _ = strings.CutPrefix(link, "file://") } else if u.Scheme == "http" || u.Scheme == "https" { - p, _ := url.PathUnescape(u.EscapedPath()) - tmpLocal := path.Join(os.TempDir(), path.Base(p)) - err = Download(link, tmpLocal) - if err != nil { - err = fmt.Errorf("链接文件获取失败: %v", err) - return - } - local = tmpLocal + local = false + path = link } else { err = fmt.Errorf("不支持的链接协议: %v", u.String()) } } return } - -// Download 下载文件 -func Download(link string, local string) (err error) { - resp, err := http.Get(link) - if err != nil { - return - } - defer resp.Body.Close() - f, err := os.OpenFile(local, os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return - } - defer f.Close() - _, err = io.Copy(f, resp.Body) - return -} diff --git a/pkg/video/play.go b/pkg/video/play.go index dda8be4..a69cd80 100644 --- a/pkg/video/play.go +++ b/pkg/video/play.go @@ -6,7 +6,7 @@ import ( libvlc "github.com/adrg/libvlc-go/v3" ) -func Play(ctx context.Context, file string) error { +func Play(ctx context.Context, path string, local bool) error { // 1. 初始化 VLC if err := libvlc.Init("--no-xlib"); err != nil { return fmt.Errorf("VLC初始化失败: %w", err) @@ -38,8 +38,14 @@ func Play(ctx context.Context, file string) error { } // 4. 加载并播放文件 - if _, err := player.LoadMediaFromPath(file); err != nil { - return fmt.Errorf("文件加载失败: %w", err) + if local { + if _, err := player.LoadMediaFromPath(path); err != nil { + return fmt.Errorf("文件加载失败: %w", err) + } + } else { + if _, err := player.LoadMediaFromURL(path); err != nil { + return fmt.Errorf("文件加载失败: %w", err) + } } if err := player.Play(); err != nil { @@ -57,7 +63,7 @@ func Play(ctx context.Context, file string) error { } // 5. 等待事件 - fmt.Printf("正在播放: %s\n", file) + fmt.Printf("正在播放: %s\n", path) select { case <-ctx.Done(): return fmt.Errorf("播放被用户中断") diff --git a/readme.md b/readme.md index bdc63c4..6813eee 100644 --- a/readme.md +++ b/readme.md @@ -133,11 +133,13 @@ Payload: "items": [ { // 执行的时间区间 - "cron": "17:20-21:35 1-5 * *", + "cron": "17:20-21:35 1-5 * *", // 间隔时间(s), 类型>2时, 该项无效, default 0 "interval": 0, // 事件类型(0: 音频; 1: 视频; 2: TTS; 3: 继电器; 4: 网页), default 0 "type": 2, + // Game 指令执行时是否暂停(默认 false) + "pause": true, // 事件数据(TTS为文字, 继电器为端口号, 其他都为地址链接。支持 file:// 本地文件地址、 http(s):// 远程文件地址) "data": "", },