package internal import ( "context" "fmt" "game-driver/config" "game-driver/internal/common" "game-driver/internal/middleware" "game-driver/internal/routes" "game-driver/internal/schema" "game-driver/leaf" "game-driver/logger" "game-driver/pkg/relay" "game-driver/pkg/tts" "game-driver/pkg/utils" "log" "net" "net/url" "os" "os/signal" "syscall" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "go.uber.org/zap" ) func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopaho.ClientConfig { u, err := url.Parse(c.Url) if err != nil { zap.S().Panicln("mqtt url parse error: ", err) } subscriptions := make([]paho.SubscribeOptions, 0) for _, topic := range subTopics { zap.S().Infoln("订阅主题: ", topic) subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: topic, QoS: 0}) } mqttConfig := autopaho.ClientConfig{ ServerUrls: []*url.URL{u}, KeepAlive: 20, ConnectPassword: []byte(c.Password), CleanStartOnInitialConnection: false, SessionExpiryInterval: 60, OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { zap.S().Infoln("MQTT 连接成功") if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ Subscriptions: subscriptions, }); err != nil { zap.S().Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) return } zap.S().Infoln("订阅完成") }, OnConnectError: func(err error) { zap.S().Infof("MQTT 连接异常: %s\n", err) }, ClientConfig: paho.ClientConfig{ ClientID: c.ClientID, OnPublishReceived: []func(paho.PublishReceived) (bool, error){ func(pr paho.PublishReceived) (bool, error) { r.Route(pr.Packet.Packet()) return true, nil }, }, OnClientError: func(err error) { zap.S().Errorf("client error: %s\n", err) }, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { zap.S().Warnf("server requested disconnect: %s\n", d.Properties.ReasonString) } else { zap.S().Warnf("server requested disconnect; reason code: %d\n", d.ReasonCode) } }, }, } return mqttConfig } func Run() { cls, err := logger.NewTenCls(fmt.Sprintf("game-driver-%s-%v", config.C.Location, config.C.Point)) if err != nil { log.Println("初始化腾讯云日志服务异常: ", err) logger.InitDevLogger() } else { cls.Start() defer cls.Close() logger.InitProLogger(cls) } // 应用退出时刷新所有缓冲日志 defer logger.Sync() // 获取当前IP,并打印当前IP addrs, err := net.InterfaceAddrs() if err != nil { log.Panicln("网络连接异常: ", err) } zap.S().Infoln("当前IP: ", addrs) // 启动时关闭屏幕 utils.BlankClose() topicPrefix := fmt.Sprintf("server/%s/%v/", config.C.Location, config.C.Point) publishTopic := fmt.Sprintf("device/%s/%v/status", config.C.Location, config.C.Point) ctx, cancel := context.WithCancel(context.Background()) defer cancel() router := leaf.Default(ctx) logAt, _ := zap.NewStdLogAt(zap.L(), zap.DebugLevel) router.SetDebugLogger(logAt) router.DefaultHandler(func(c *leaf.Context) { zap.S().Infof("未处理消息,topic: %s\n payload: %s\n", c.Topic, c.Payload) }) // 构建语音合成对象 tts.DefaultTTS = tts.New(ctx, config.C.Aliyun) // 构建继电器对象 var r relay.Relay if config.C.Relay != "" { r, err = relay.New(config.C.Relay) if err != nil { zap.S().Errorln("继电器连接异常: ", err) } defer r.Close() } // 构建全局设备变量 device := common.DefaultDevice(ctx, publishTopic) // 设备状态变化 device.OnChange = func() { device.PublishStatus() } // 处理启动报文 router.RegisterHandler(topicPrefix+"play", middleware.RunLog(), middleware.PayloadJSON[schema.PlayModal](), middleware.DeviceLock(device), middleware.PauseWait(common.PassCtrl), middleware.EmergencyStop(common.GlobalStopper), middleware.SoundStart(), middleware.RelayMaster(r), middleware.TimeoutOver(config.C.MaxTimeout), middleware.TickerAction(), middleware.PlayBgm(), routes.PlayRouter(ctx, config.C.Location, config.C.Point), ) // 处理待机报文 router.RegisterHandler(topicPrefix+"wait", middleware.RunLog(), middleware.PayloadJSON[schema.WaitModel](), middleware.Cache(config.C.StandbyCache), middleware.Unique(common.GlobalBgStopper), middleware.EmergencyStop(common.GlobalBgStopper), routes.StandbyAction(common.PassCtrl, device), ) // 处理指令 router.RegisterHandler(topicPrefix+"command", middleware.RunLog(), routes.Command(device), ) // 从缓存中读取待机报文,如果存在则直接执行 publish, err := config.C.StandbyCache.Get() if err != nil { zap.S().Infoln("读取待机缓存失败: ", err) } else { router.HandlerRun(topicPrefix+"wait", publish) } // 构建 MQTT 连接 mqttBuild := buildMqtt(config.C.Mqtt, router, topicPrefix+"#") // 开始连接 MQTT cm, err := autopaho.NewConnection(ctx, mqttBuild) if err != nil { zap.S().Panicln("创建 MQTT 连接器异常: ", err) } utils.GlobalMqttClient = cm // 启动完成发送一次设备状态 device.PublishStatus() sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) <-sig zap.S().Infoln("接收到关闭命令 - 正在关闭程序") ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if e := cm.Disconnect(ctx); e != nil { zap.S().Errorln("断开连接异常", e) } if e := router.Disconnect(ctx); e != nil { zap.S().Errorln("停止所以任务超时", e) } zap.S().Infoln("关闭完成") }