Files
game-driver/internal/server.go

206 lines
5.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package internal
import (
"context"
"fmt"
"game-driver/config"
"game-driver/internal/common"
"game-driver/internal/middleware"
"game-driver/internal/routes"
"game-driver/internal/schema"
"game-driver/leaf"
"game-driver/logger"
"game-driver/pkg/relay"
"game-driver/pkg/tts"
"game-driver/pkg/utils"
"log"
"net"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"go.uber.org/zap"
)
func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopaho.ClientConfig {
u, err := url.Parse(c.Url)
if err != nil {
zap.S().Panicln("mqtt url parse error: ", err)
}
subscriptions := make([]paho.SubscribeOptions, 0)
for _, topic := range subTopics {
zap.S().Infoln("订阅主题: ", topic)
subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: topic, QoS: 0})
}
mqttConfig := autopaho.ClientConfig{
ServerUrls: []*url.URL{u},
KeepAlive: 20,
ConnectPassword: []byte(c.Password),
CleanStartOnInitialConnection: false,
SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) {
zap.S().Infoln("MQTT 连接成功")
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: subscriptions,
}); err != nil {
zap.S().Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
zap.S().Infoln("订阅完成")
},
OnConnectError: func(err error) {
zap.S().Infof("MQTT 连接异常: %s\n", err)
},
ClientConfig: paho.ClientConfig{
ClientID: c.ClientID,
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
r.Route(pr.Packet.Packet())
return true, nil
},
},
OnClientError: func(err error) { zap.S().Errorf("client error: %s\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
zap.S().Warnf("server requested disconnect: %s\n", d.Properties.ReasonString)
} else {
zap.S().Warnf("server requested disconnect; reason code: %d\n", d.ReasonCode)
}
},
},
}
return mqttConfig
}
func Run() {
cls, err := logger.NewTenCls(fmt.Sprintf("game-driver-%s-%v", config.C.Location, config.C.Point))
if err != nil {
log.Println("初始化腾讯云日志服务异常: ", err)
}
cls.Start()
defer cls.Close()
logger.InitProLogger(cls)
//logger.InitDevLogger()
// 应用退出时刷新所有缓冲日志
defer logger.Sync()
// 获取当前IP并打印当前IP
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Panicln("网络连接异常: ", err)
}
zap.S().Infoln("当前IP: ", addrs)
// 启动时关闭屏幕
utils.BlankClose()
topicPrefix := fmt.Sprintf("server/%s/%v/", config.C.Location, config.C.Point)
publishTopic := fmt.Sprintf("device/%s/%v/status", config.C.Location, config.C.Point)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
router := leaf.Default(ctx)
logAt, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel)
router.SetDebugLogger(logAt)
router.DefaultHandler(func(c *leaf.Context) {
zap.S().Infof("未处理消息topic: %s\n payload: %s\n", c.Topic, c.Payload)
})
// 构建语音合成对象
tts.DefaultTTS = tts.New(ctx, config.C.Aliyun)
// 构建继电器对象
var r relay.Relay
if config.C.Relay != "" {
r, err = relay.New(config.C.Relay)
if err != nil {
zap.S().Errorln("继电器连接异常: ", err)
}
defer r.Close()
}
// 构建全局设备变量
device := common.DefaultDevice(ctx, publishTopic)
// 设备状态变化
device.OnChange = func() { device.PublishStatus() }
// 处理启动报文
router.RegisterHandler(topicPrefix+"play",
middleware.RunLog(),
middleware.PayloadJSON[schema.PlayModal](),
middleware.DeviceLock(device),
middleware.PauseWait(common.PassCtrl),
middleware.EmergencyStop(common.GlobalStopper),
middleware.SoundStart(),
middleware.RelayMaster(r),
middleware.TimeoutOver(config.C.MaxTimeout),
middleware.TickerAction(),
middleware.PlayBgm(),
routes.PlayRouter(ctx, config.C.Location, config.C.Point),
)
// 处理待机报文
router.RegisterHandler(topicPrefix+"wait",
middleware.RunLog(),
middleware.PayloadJSON[schema.WaitModel](),
middleware.Cache(config.C.StandbyCache),
middleware.Unique(common.GlobalBgStopper),
middleware.EmergencyStop(common.GlobalBgStopper),
routes.StandbyAction(common.PassCtrl, device),
)
// 处理指令
router.RegisterHandler(topicPrefix+"command",
middleware.RunLog(),
routes.Command(device),
)
// 从缓存中读取待机报文,如果存在则直接执行
publish, err := config.C.StandbyCache.Get()
if err != nil {
zap.S().Infoln("读取待机缓存失败: ", err)
} else {
router.HandlerRun(topicPrefix+"wait", publish)
}
// 构建 MQTT 连接
mqttBuild := buildMqtt(config.C.Mqtt, router, topicPrefix+"#")
// 开始连接 MQTT
cm, err := autopaho.NewConnection(ctx, mqttBuild)
if err != nil {
zap.S().Panicln("创建 MQTT 连接器异常: ", err)
}
utils.GlobalMqttClient = cm
// 启动完成发送一次设备状态
device.PublishStatus()
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
<-sig
zap.S().Infoln("接收到关闭命令 - 正在关闭程序")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if e := cm.Disconnect(ctx); e != nil {
zap.S().Errorln("断开连接异常", e)
}
if e := router.Disconnect(ctx); e != nil {
zap.S().Errorln("停止所以任务超时", e)
}
zap.S().Infoln("关闭完成")
}