package internal import ( "context" "fmt" "game-driver/config" "game-driver/internal/common" "game-driver/internal/middleware" "game-driver/internal/routes" "game-driver/leaf" "game-driver/pkg/tts" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "log" "net/url" "os" "os/signal" "syscall" "time" ) func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopaho.ClientConfig { u, err := url.Parse(c.Url) if err != nil { log.Panicln("mqtt url parse error: ", err) } subscriptions := make([]paho.SubscribeOptions, 0) for _, topic := range subTopics { subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: topic, QoS: 0}) } mqttConfig := autopaho.ClientConfig{ ServerUrls: []*url.URL{u}, KeepAlive: 20, CleanStartOnInitialConnection: false, SessionExpiryInterval: 60, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { log.Println("mqtt connection up") if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ Subscriptions: subscriptions, }); err != nil { log.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) return } log.Println("mqtt subscription made") }, OnConnectError: func(err error) { log.Printf("error whilst attempting connection: %s\n", err) }, ClientConfig: paho.ClientConfig{ ClientID: "TestSubscriber", OnPublishReceived: []func(paho.PublishReceived) (bool, error){ func(pr paho.PublishReceived) (bool, error) { r.Route(pr.Packet.Packet()) return true, nil }, }, OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) }, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString) } else { fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode) } }, }, } return mqttConfig } func Run() { topicPrefix := fmt.Sprintf("server/%s/%v/", config.C.Location, config.C.Point) publishTopic := fmt.Sprintf("device/%s/%v/status", config.C.Location, config.C.Point) log.Println("topicPrefix: ", topicPrefix) ctx, cancel := context.WithCancel(context.Background()) defer cancel() router := leaf.Default(ctx) router.DefaultHandler(func(c *leaf.Context) { log.Println("接收到未处理消息: " + string(c.Payload)) }) // 构建 MQTT 连接 mqttBuild := buildMqtt(config.C.Mqtt, router, topicPrefix+"#") // 连接 MQTT cm, err := autopaho.NewConnection(ctx, mqttBuild) if err != nil { log.Panicln("连接 MQTT 异常: ", err) } // 构建语音合成对象 t := tts.New(ctx, config.C.Aliyun) // 构建继电器对象 //r, err := relay.New(config.C.Relay, func(msg string) { // log.Println("串口返回: ", msg) //}) //if err != nil { // log.Panicln("串口连接异常: ", err) //} //defer r.Close() // 构建全局设备变量 device := common.DefaultDevice(ctx, cm, publishTopic) // 设备状态变化 device.OnChange = func() { device.PublishStatus() } // 处理启动报文 router.RegisterHandler(topicPrefix+"play", middleware.PayloadJSON(), middleware.DeviceLock(device), middleware.EmergencyStop(), middleware.SoundStart(t), middleware.RelayMaster(nil), middleware.TimerAction(t, config.C.Game.MaxTimeout), middleware.PlayBgm(), func(c *leaf.Context) { log.Println("接收到启动消息: ", string(c.Payload)) select { case <-c.Done(): log.Println("程序已关闭") case <-time.After(10 * time.Second): log.Println("10s 结束") } }, ) // 处理指令 router.RegisterHandler(topicPrefix+"command", routes.Command(device)) // 启动完成发送一次设备状态 device.PublishStatus() sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) <-sig fmt.Println("接收到关闭命令 - 正在关闭程序") ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := cm.Disconnect(ctx); err != nil { fmt.Printf("断开连接异常: %s\n", err) } fmt.Println("关闭完成") }