157 lines
4.3 KiB
Go
157 lines
4.3 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"game-driver/config"
|
|
"game-driver/internal/common"
|
|
"game-driver/internal/middleware"
|
|
"game-driver/internal/routes"
|
|
"game-driver/internal/schema"
|
|
"game-driver/leaf"
|
|
"game-driver/pkg/tts"
|
|
"github.com/eclipse/paho.golang/autopaho"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"log"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
func buildMqtt(c config.MqttConfig, r *leaf.Engine, subTopics ...string) autopaho.ClientConfig {
|
|
u, err := url.Parse(c.Url)
|
|
if err != nil {
|
|
log.Panicln("mqtt url parse error: ", err)
|
|
}
|
|
|
|
subscriptions := make([]paho.SubscribeOptions, 0)
|
|
for _, topic := range subTopics {
|
|
log.Println("订阅主题: ", topic)
|
|
subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: topic, QoS: 0})
|
|
}
|
|
|
|
mqttConfig := autopaho.ClientConfig{
|
|
ServerUrls: []*url.URL{u},
|
|
KeepAlive: 20,
|
|
CleanStartOnInitialConnection: false,
|
|
SessionExpiryInterval: 60,
|
|
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
|
|
log.Println("mqtt connection up")
|
|
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
|
|
Subscriptions: subscriptions,
|
|
}); err != nil {
|
|
log.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
|
|
return
|
|
}
|
|
log.Println("mqtt subscription made")
|
|
},
|
|
OnConnectError: func(err error) {
|
|
log.Printf("error whilst attempting connection: %s\n", err)
|
|
},
|
|
ClientConfig: paho.ClientConfig{
|
|
ClientID: "TestSubscriber",
|
|
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
|
|
func(pr paho.PublishReceived) (bool, error) {
|
|
r.Route(pr.Packet.Packet())
|
|
return true, nil
|
|
},
|
|
},
|
|
OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
|
|
OnServerDisconnect: func(d *paho.Disconnect) {
|
|
if d.Properties != nil {
|
|
fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
|
|
} else {
|
|
fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
return mqttConfig
|
|
}
|
|
|
|
func Run() {
|
|
topicPrefix := fmt.Sprintf("server/%s/%v/", config.C.Location, config.C.Point)
|
|
publishTopic := fmt.Sprintf("device/%s/%v/status", config.C.Location, config.C.Point)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
router := leaf.Default(ctx)
|
|
|
|
router.DefaultHandler(func(c *leaf.Context) {
|
|
log.Printf("未处理消息 topic: %s\n payload: %s\n", c.Topic, c.Payload)
|
|
})
|
|
|
|
// 构建 MQTT 连接
|
|
mqttBuild := buildMqtt(config.C.Mqtt, router, topicPrefix+"#")
|
|
|
|
// 连接 MQTT
|
|
cm, err := autopaho.NewConnection(ctx, mqttBuild)
|
|
if err != nil {
|
|
log.Panicln("连接 MQTT 异常: ", err)
|
|
}
|
|
|
|
// 构建语音合成对象
|
|
tts.DefaultTTS = tts.New(ctx, config.C.Aliyun)
|
|
|
|
// 构建继电器对象
|
|
//r, err := relay.New(config.C.Relay, func(msg string) {
|
|
// log.Println("串口返回: ", msg)
|
|
//})
|
|
//if err != nil {
|
|
// log.Panicln("串口连接异常: ", err)
|
|
//}
|
|
//defer r.Close()
|
|
|
|
// 构建全局设备变量
|
|
device := common.DefaultDevice(ctx, cm, publishTopic)
|
|
// 设备状态变化
|
|
device.OnChange = func() { device.PublishStatus() }
|
|
|
|
// 处理启动报文
|
|
router.RegisterHandler(topicPrefix+"play",
|
|
middleware.RunLog(),
|
|
middleware.PayloadJSON[schema.PlayModal](),
|
|
middleware.DeviceLock(device),
|
|
middleware.EmergencyStop(common.GlobalStopper),
|
|
middleware.SoundStart(),
|
|
middleware.RelayMaster(nil),
|
|
middleware.TimeoutOver(config.C.Game.MaxTimeout),
|
|
middleware.TickerAction(),
|
|
middleware.PlayBgm(),
|
|
routes.PlayRouter(config.C.Location, config.C.Point),
|
|
)
|
|
// 处理待机线程报文
|
|
router.RegisterHandler(topicPrefix+"wait",
|
|
middleware.RunLog(),
|
|
middleware.PayloadJSON[schema.WaitModel](),
|
|
middleware.EmergencyStop(common.GlobalBgStopper),
|
|
routes.WaitAction,
|
|
)
|
|
// 处理指令
|
|
router.RegisterHandler(topicPrefix+"command",
|
|
middleware.RunLog(),
|
|
routes.Command(device),
|
|
)
|
|
|
|
// 启动完成发送一次设备状态
|
|
device.PublishStatus()
|
|
|
|
sig := make(chan os.Signal, 1)
|
|
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
|
|
|
|
<-sig
|
|
fmt.Println("接收到关闭命令 - 正在关闭程序")
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if e := cm.Disconnect(ctx); e != nil {
|
|
fmt.Printf("断开连接异常: %s\n", e)
|
|
}
|
|
|
|
fmt.Println("关闭完成")
|
|
}
|