mqtt加入认证,优化视频播放与浏览器

This commit is contained in:
2025-03-07 16:04:19 +08:00
parent c71e8bc13d
commit febcdfdbf7
13 changed files with 60 additions and 82 deletions

View File

@@ -5,7 +5,9 @@ import (
) )
type MqttConfig struct { type MqttConfig struct {
Url string Url string
ClientID string
Password string
} }
type AliyunConfig struct { type AliyunConfig struct {

View File

@@ -1,45 +1,50 @@
package common package common
import "sync" import (
"context"
"sync"
)
type PauseSub struct { type PauseSub struct {
ctrl *CtrlWait ctrl *CtrlWait
// 回调函数
items []chan int8 items []chan int8
m sync.RWMutex m sync.RWMutex
} }
// Add 添加一个暂停项 func (p *PauseSub) GetNew() chan int8 {
func (p *PauseSub) Add(item chan int8) {
p.m.Lock() p.m.Lock()
defer p.m.Unlock() defer p.m.Unlock()
p.items = append(p.items, item) sub := make(chan int8)
p.items = append(p.items, sub)
return sub
} }
// Remove 移除一个暂停项 func (p *PauseSub) Close(sub chan int8) {
func (p *PauseSub) Remove(item chan int8) {
p.m.Lock() p.m.Lock()
defer p.m.Unlock() defer p.m.Unlock()
close(sub)
for i, v := range p.items { for i, v := range p.items {
if v == item { if v == sub {
p.items = append(p.items[:i], p.items[i+1:]...) p.items = append(p.items[:i], p.items[i+1:]...)
} }
} }
} }
// Run 开始监听 // Run 开始监听
func (p *PauseSub) Run() { func (p *PauseSub) Run(ctx context.Context) {
p.ctrl.Open() p.ctrl.Open()
defer p.ctrl.Close() defer p.ctrl.Close()
for { for {
select { select {
case <-p.ctrl.C: case <-ctx.Done():
return
case c := <-p.ctrl.C:
go func() { go func() {
p.m.RLock() p.m.RLock()
defer p.m.RUnlock() defer p.m.RUnlock()
for _, item := range p.items { for _, item := range p.items {
item <- 1 item <- c
} }
}() }()
} }
@@ -48,6 +53,7 @@ func (p *PauseSub) Run() {
func NewPauseSub(c *CtrlWait) *PauseSub { func NewPauseSub(c *CtrlWait) *PauseSub {
return &PauseSub{ return &PauseSub{
ctrl: c, ctrl: c,
items: make([]chan int8, 0),
} }
} }

View File

@@ -13,6 +13,9 @@ func EmergencyStop(stopper common.Stopper) leaf.HandlerFunc {
cancel := leaf.WithCancel(c) cancel := leaf.WithCancel(c)
defer stopper.Reset() defer stopper.Reset()
zap.S().Infoln("监听停止信号")
defer zap.S().Infoln("结束停止信号监听")
// 等待组 // 等待组
var wait sync.WaitGroup var wait sync.WaitGroup
defer wait.Wait() defer wait.Wait()
@@ -22,11 +25,9 @@ func EmergencyStop(stopper common.Stopper) leaf.HandlerFunc {
// 发送结束信号 // 发送结束信号
defer close(a) defer close(a)
zap.S().Infoln("监听停止信号")
wait.Add(1) wait.Add(1)
go func() { go func() {
defer wait.Done() defer wait.Done()
defer zap.S().Infoln("结束停止信号监听")
select { select {
case <-a: case <-a:

View File

@@ -16,11 +16,11 @@ func OnlyVideo(c *leaf.Context) {
defer utils.BlankClose() defer utils.BlankClose()
if url, ok := payload.Game["video"]; ok { if url, ok := payload.Game["video"]; ok {
local, err := utils.LinkVideo(url.(string)) path, local, err := utils.LinkVideo(url.(string))
if err != nil { if err != nil {
zap.S().Errorln("视频文件获取异常: ", err) zap.S().Errorln("视频文件获取异常: ", err)
return return
} }
_ = video.Play(c, local) _ = video.Play(c, path, local)
} }
} }

View File

@@ -12,7 +12,7 @@ import (
func Video(item schema.WaitItemModel) func(c context.Context) error { func Video(item schema.WaitItemModel) func(c context.Context) error {
return func(c context.Context) error { return func(c context.Context) error {
local, err := utils.LinkVideo(item.Data) path, local, err := utils.LinkVideo(item.Data)
if err != nil { if err != nil {
return fmt.Errorf("视频文件获取异常: %w", err) return fmt.Errorf("视频文件获取异常: %w", err)
} }
@@ -24,7 +24,7 @@ func Video(item schema.WaitItemModel) func(c context.Context) error {
defer utils.BlankClose() defer utils.BlankClose()
for { for {
err := video.Play(c, local) err := video.Play(c, path, local)
if err != nil { if err != nil {
return fmt.Errorf("视频播放异常: %w", err) return fmt.Errorf("视频播放异常: %w", err)
} }

View File

@@ -13,14 +13,11 @@ func Pause(ps *common.PauseSub, isPause bool, play func(c context.Context) error
run := true run := true
if isPause { if isPause {
p := make(chan int8) zap.S().Infoln("待机暂停控制器")
defer close(p) defer zap.S().Infoln("待机暂停控制器结束")
ps.Add(p) p := ps.GetNew()
defer ps.Remove(p) defer ps.Close(p)
zap.S().Infoln("待机控制器")
defer zap.S().Infoln("待机控制器结束")
// 等待组 // 等待组
var wait sync.WaitGroup var wait sync.WaitGroup

View File

@@ -25,6 +25,9 @@ func Time(rootRules []cronrange.Rule, cron string, play func(c context.Context)
a := make(chan bool) a := make(chan bool)
defer close(a) defer close(a)
zap.S().Infoln("待机时间控制器")
defer zap.S().Infoln("待机时间控制器结束")
// 等待组 // 等待组
var waitGroup sync.WaitGroup var waitGroup sync.WaitGroup
defer waitGroup.Wait() defer waitGroup.Wait()

View File

@@ -38,7 +38,7 @@ func WaitAction(ctrl *common.CtrlWait) leaf.HandlerFunc {
waitGroup.Add(1) waitGroup.Add(1)
go func() { go func() {
defer waitGroup.Done() defer waitGroup.Done()
ps.Run() ps.Run(c)
}() }()
// 处理每个待机控制 // 处理每个待机控制

View File

@@ -40,6 +40,7 @@ func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopah
mqttConfig := autopaho.ClientConfig{ mqttConfig := autopaho.ClientConfig{
ServerUrls: []*url.URL{u}, ServerUrls: []*url.URL{u},
KeepAlive: 20, KeepAlive: 20,
ConnectPassword: []byte(c.Password),
CleanStartOnInitialConnection: false, CleanStartOnInitialConnection: false,
SessionExpiryInterval: 60, SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) {
@@ -56,7 +57,7 @@ func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopah
zap.S().Infof("MQTT 连接异常: %s\n", err) zap.S().Infof("MQTT 连接异常: %s\n", err)
}, },
ClientConfig: paho.ClientConfig{ ClientConfig: paho.ClientConfig{
ClientID: fmt.Sprintf("game-driver-%s-%v", config.C.Location, config.C.Point), ClientID: c.ClientID,
OnPublishReceived: []func(paho.PublishReceived) (bool, error){ OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) { func(pr paho.PublishReceived) (bool, error) {
r.Route(pr.Packet.Packet()) r.Route(pr.Packet.Packet())
@@ -109,8 +110,8 @@ func Run() {
router := leaf.Default(ctx) router := leaf.Default(ctx)
log, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel) logAt, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel)
router.SetDebugLogger(log) router.SetDebugLogger(logAt)
router.DefaultHandler(func(c *leaf.Context) { router.DefaultHandler(func(c *leaf.Context) {
zap.S().Infof("未处理消息topic: %s\n payload: %s\n", c.Topic, c.Payload) zap.S().Infof("未处理消息topic: %s\n payload: %s\n", c.Topic, c.Payload)

View File

@@ -5,8 +5,6 @@ import (
"github.com/go-rod/rod" "github.com/go-rod/rod"
"github.com/go-rod/rod/lib/launcher" "github.com/go-rod/rod/lib/launcher"
"github.com/go-rod/rod/lib/launcher/flags" "github.com/go-rod/rod/lib/launcher/flags"
"github.com/go-rod/rod/lib/proto"
"go.uber.org/zap"
) )
// OpenApp 用APP模式打开网页 // OpenApp 用APP模式打开网页
@@ -26,18 +24,5 @@ func OpenApp(c context.Context, url string) {
b := rod.New().ControlURL(p).MustConnect() b := rod.New().ControlURL(p).MustConnect()
defer b.MustClose() defer b.MustClose()
s := make(chan struct{}) <-c.Done()
wait := b.EachEvent(func(e *proto.TargetTargetDestroyed) {
zap.S().Infoln("浏览器关闭事件")
s <- struct{}{}
})
go wait()
select {
case <-c.Done():
b.MustClose()
<-s
case <-s:
}
} }

View File

@@ -2,16 +2,12 @@ package utils
import ( import (
"fmt" "fmt"
"io"
"net/http"
"net/url" "net/url"
"os"
"path"
"strings" "strings"
) )
// LinkVideo 链接视频,解析链接,网络文件会下载到临时目录并返回本地路径 // LinkVideo 链接视频,解析链接,网络文件会下载到临时目录并返回本地路径
func LinkVideo(link string) (local string, err error) { func LinkVideo(link string) (path string, local bool, err error) {
if link == "" { if link == "" {
return return
} }
@@ -20,35 +16,14 @@ func LinkVideo(link string) (local string, err error) {
err = fmt.Errorf("URL 解析错误: %v", err) err = fmt.Errorf("URL 解析错误: %v", err)
} else { } else {
if u.Scheme == "file" { if u.Scheme == "file" {
local, _ = strings.CutPrefix(link, "file://") local = true
path, _ = strings.CutPrefix(link, "file://")
} else if u.Scheme == "http" || u.Scheme == "https" { } else if u.Scheme == "http" || u.Scheme == "https" {
p, _ := url.PathUnescape(u.EscapedPath()) local = false
tmpLocal := path.Join(os.TempDir(), path.Base(p)) path = link
err = Download(link, tmpLocal)
if err != nil {
err = fmt.Errorf("链接文件获取失败: %v", err)
return
}
local = tmpLocal
} else { } else {
err = fmt.Errorf("不支持的链接协议: %v", u.String()) err = fmt.Errorf("不支持的链接协议: %v", u.String())
} }
} }
return return
} }
// Download 下载文件
func Download(link string, local string) (err error) {
resp, err := http.Get(link)
if err != nil {
return
}
defer resp.Body.Close()
f, err := os.OpenFile(local, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return
}
defer f.Close()
_, err = io.Copy(f, resp.Body)
return
}

View File

@@ -6,7 +6,7 @@ import (
libvlc "github.com/adrg/libvlc-go/v3" libvlc "github.com/adrg/libvlc-go/v3"
) )
func Play(ctx context.Context, file string) error { func Play(ctx context.Context, path string, local bool) error {
// 1. 初始化 VLC // 1. 初始化 VLC
if err := libvlc.Init("--no-xlib"); err != nil { if err := libvlc.Init("--no-xlib"); err != nil {
return fmt.Errorf("VLC初始化失败: %w", err) return fmt.Errorf("VLC初始化失败: %w", err)
@@ -38,8 +38,14 @@ func Play(ctx context.Context, file string) error {
} }
// 4. 加载并播放文件 // 4. 加载并播放文件
if _, err := player.LoadMediaFromPath(file); err != nil { if local {
return fmt.Errorf("文件加载失败: %w", err) if _, err := player.LoadMediaFromPath(path); err != nil {
return fmt.Errorf("文件加载失败: %w", err)
}
} else {
if _, err := player.LoadMediaFromURL(path); err != nil {
return fmt.Errorf("文件加载失败: %w", err)
}
} }
if err := player.Play(); err != nil { if err := player.Play(); err != nil {
@@ -57,7 +63,7 @@ func Play(ctx context.Context, file string) error {
} }
// 5. 等待事件 // 5. 等待事件
fmt.Printf("正在播放: %s\n", file) fmt.Printf("正在播放: %s\n", path)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("播放被用户中断") return fmt.Errorf("播放被用户中断")

View File

@@ -138,6 +138,8 @@ Payload:
"interval": 0, "interval": 0,
// 事件类型(0: 音频; 1: 视频; 2: TTS; 3: 继电器; 4: 网页), default 0 // 事件类型(0: 音频; 1: 视频; 2: TTS; 3: 继电器; 4: 网页), default 0
"type": 2, "type": 2,
// Game 指令执行时是否暂停(默认 false
"pause": true,
// 事件数据(TTS为文字, 继电器为端口号, 其他都为地址链接。支持 file:// 本地文件地址、 http(s):// 远程文件地址) // 事件数据(TTS为文字, 继电器为端口号, 其他都为地址链接。支持 file:// 本地文件地址、 http(s):// 远程文件地址)
"data": "", "data": "",
}, },