196 lines
5.3 KiB
Go
196 lines
5.3 KiB
Go
package internal
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"game-driver/config"
|
||
"game-driver/internal/common"
|
||
"game-driver/internal/middleware"
|
||
"game-driver/internal/routes"
|
||
"game-driver/internal/schema"
|
||
"game-driver/leaf"
|
||
"game-driver/logger"
|
||
"game-driver/pkg/relay"
|
||
"game-driver/pkg/tts"
|
||
"game-driver/pkg/utils"
|
||
"github.com/eclipse/paho.golang/autopaho"
|
||
"github.com/eclipse/paho.golang/paho"
|
||
"go.uber.org/zap"
|
||
"log"
|
||
"net"
|
||
"net/url"
|
||
"os"
|
||
"os/signal"
|
||
"syscall"
|
||
"time"
|
||
)
|
||
|
||
func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopaho.ClientConfig {
|
||
u, err := url.Parse(c.Url)
|
||
if err != nil {
|
||
zap.S().Panicln("mqtt url parse error: ", err)
|
||
}
|
||
|
||
subscriptions := make([]paho.SubscribeOptions, 0)
|
||
for _, topic := range subTopics {
|
||
zap.S().Infoln("订阅主题: ", topic)
|
||
subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: topic, QoS: 0})
|
||
}
|
||
|
||
mqttConfig := autopaho.ClientConfig{
|
||
ServerUrls: []*url.URL{u},
|
||
KeepAlive: 20,
|
||
ConnectPassword: []byte(c.Password),
|
||
CleanStartOnInitialConnection: false,
|
||
SessionExpiryInterval: 60,
|
||
OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) {
|
||
zap.S().Infoln("MQTT 连接成功")
|
||
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
|
||
Subscriptions: subscriptions,
|
||
}); err != nil {
|
||
zap.S().Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
|
||
return
|
||
}
|
||
zap.S().Infoln("订阅完成")
|
||
},
|
||
OnConnectError: func(err error) {
|
||
zap.S().Infof("MQTT 连接异常: %s\n", err)
|
||
},
|
||
ClientConfig: paho.ClientConfig{
|
||
ClientID: c.ClientID,
|
||
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
|
||
func(pr paho.PublishReceived) (bool, error) {
|
||
r.Route(pr.Packet.Packet())
|
||
return true, nil
|
||
},
|
||
},
|
||
OnClientError: func(err error) { zap.S().Errorf("client error: %s\n", err) },
|
||
OnServerDisconnect: func(d *paho.Disconnect) {
|
||
if d.Properties != nil {
|
||
zap.S().Warnf("server requested disconnect: %s\n", d.Properties.ReasonString)
|
||
} else {
|
||
zap.S().Warnf("server requested disconnect; reason code: %d\n", d.ReasonCode)
|
||
}
|
||
},
|
||
},
|
||
}
|
||
|
||
return mqttConfig
|
||
}
|
||
|
||
func Run() {
|
||
cls, err := logger.NewTenCls(fmt.Sprintf("game-driver-%s-%v", config.C.Location, config.C.Point))
|
||
if err != nil {
|
||
log.Println("初始化腾讯云日志服务异常: ", err)
|
||
}
|
||
cls.Start()
|
||
defer cls.Close()
|
||
|
||
logger.InitProLogger(cls)
|
||
|
||
//logger.InitDevLogger()
|
||
// 应用退出时刷新所有缓冲日志
|
||
defer logger.Sync()
|
||
|
||
// 获取当前IP,并打印当前IP
|
||
addrs, err := net.InterfaceAddrs()
|
||
if err != nil {
|
||
log.Panicln("网络连接异常: ", err)
|
||
}
|
||
zap.S().Infoln("当前IP: ", addrs)
|
||
|
||
// 启动时关闭屏幕
|
||
utils.BlankClose()
|
||
|
||
topicPrefix := fmt.Sprintf("server/%s/%v/", config.C.Location, config.C.Point)
|
||
publishTopic := fmt.Sprintf("device/%s/%v/status", config.C.Location, config.C.Point)
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
router := leaf.Default(ctx)
|
||
|
||
logAt, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel)
|
||
router.SetDebugLogger(logAt)
|
||
|
||
router.DefaultHandler(func(c *leaf.Context) {
|
||
zap.S().Infof("未处理消息,topic: %s\n payload: %s\n", c.Topic, c.Payload)
|
||
})
|
||
|
||
// 构建语音合成对象
|
||
tts.DefaultTTS = tts.New(ctx, config.C.Aliyun)
|
||
|
||
// 构建继电器对象
|
||
var r relay.Relay
|
||
if config.C.Relay != "" {
|
||
r, err = relay.New(config.C.Relay)
|
||
if err != nil {
|
||
zap.S().Errorln("继电器连接异常: ", err)
|
||
}
|
||
defer r.Close()
|
||
}
|
||
|
||
// 构建全局设备变量
|
||
device := common.DefaultDevice(ctx, publishTopic)
|
||
// 设备状态变化
|
||
device.OnChange = func() { device.PublishStatus() }
|
||
|
||
// 处理启动报文
|
||
router.RegisterHandler(topicPrefix+"play",
|
||
middleware.RunLog(),
|
||
middleware.PayloadJSON[schema.PlayModal](),
|
||
middleware.DeviceLock(device),
|
||
middleware.PauseWait(common.PassCtrl),
|
||
middleware.EmergencyStop(common.GlobalStopper),
|
||
middleware.SoundStart(),
|
||
middleware.RelayMaster(r),
|
||
middleware.TimeoutOver(config.C.MaxTimeout),
|
||
middleware.TickerAction(),
|
||
middleware.PlayBgm(),
|
||
routes.PlayRouter(ctx, config.C.Location, config.C.Point),
|
||
)
|
||
// 处理待机报文
|
||
router.RegisterHandler(topicPrefix+"wait",
|
||
middleware.RunLog(),
|
||
middleware.PayloadJSON[schema.WaitModel](),
|
||
middleware.Unique(common.GlobalBgStopper),
|
||
middleware.EmergencyStop(common.GlobalBgStopper),
|
||
routes.WaitAction(common.PassCtrl),
|
||
)
|
||
// 处理指令
|
||
router.RegisterHandler(topicPrefix+"command",
|
||
middleware.RunLog(),
|
||
routes.Command(device),
|
||
)
|
||
|
||
// 构建 MQTT 连接
|
||
mqttBuild := buildMqtt(config.C.Mqtt, router, topicPrefix+"#")
|
||
|
||
// 连接 MQTT
|
||
cm, err := autopaho.NewConnection(ctx, mqttBuild)
|
||
if err != nil {
|
||
zap.S().Panicln("连接 MQTT 异常: ", err)
|
||
}
|
||
utils.GlobalMqttClient = cm
|
||
|
||
// 启动完成发送一次设备状态
|
||
device.PublishStatus()
|
||
|
||
sig := make(chan os.Signal, 1)
|
||
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
|
||
|
||
<-sig
|
||
zap.S().Infoln("接收到关闭命令 - 正在关闭程序")
|
||
|
||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer cancel()
|
||
if e := cm.Disconnect(ctx); e != nil {
|
||
zap.S().Errorln("断开连接异常", e)
|
||
}
|
||
if e := router.Disconnect(ctx); e != nil {
|
||
zap.S().Errorln("停止所以任务超时", e)
|
||
}
|
||
|
||
zap.S().Infoln("关闭完成")
|
||
}
|