116 lines
3.2 KiB
Go
116 lines
3.2 KiB
Go
package MqttService
|
|
|
|
import (
|
|
"crypto/tls"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
log "github.com/sirupsen/logrus"
|
|
"net/url"
|
|
"rpiMqttControl/internal/PinControlService"
|
|
"time"
|
|
)
|
|
|
|
type OnCompletionHandler func()
|
|
|
|
type MqttService struct {
|
|
mqttClient mqtt.Client
|
|
quit chan struct{}
|
|
config MqttServiceConfig
|
|
pinService *PinControlService.PinControlService
|
|
}
|
|
|
|
func (m *MqttService) pinEventCallback(pinName string, pinState PinControlService.PinState) {
|
|
topic := m.config.TopicPrefix + "/gpio/" + pinName + "/event"
|
|
token := m.mqttClient.Publish(topic, 1, false, string(pinState))
|
|
go m._asyncWait(token, nil)
|
|
}
|
|
|
|
func (m *MqttService) pinCyclicCallback(pinName string, pinState PinControlService.PinState) {
|
|
topic := m.config.TopicPrefix + "/gpio/" + pinName + "/value"
|
|
token := m.mqttClient.Publish(topic, 1, false, string(pinState))
|
|
go m._asyncWait(token, nil)
|
|
}
|
|
func (m *MqttService) controlCallback(_ mqtt.Client, message mqtt.Message) {
|
|
if pinNo, err := PinNameFromTopic(message.Topic()); err == nil {
|
|
if err := m.pinService.Command(pinNo, PinControlService.PinCommand(message.Payload())); err != nil {
|
|
log.Error(err.Error())
|
|
}
|
|
} else {
|
|
log.Error(err.Error())
|
|
}
|
|
}
|
|
func (m *MqttService) Stop() {
|
|
m.quit <- struct{}{}
|
|
m.mqttClient.Disconnect(100)
|
|
}
|
|
|
|
func (m *MqttService) _asyncWait(token mqtt.Token, onComplete OnCompletionHandler) {
|
|
for {
|
|
select {
|
|
case <-token.Done():
|
|
if err := token.Error(); err != nil {
|
|
log.Fatal(err)
|
|
} else {
|
|
if onComplete != nil {
|
|
onComplete()
|
|
}
|
|
}
|
|
return
|
|
case <-m.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
func (m *MqttService) _subscribe() {
|
|
topic := m.config.TopicPrefix + "/gpio/+/control"
|
|
token := m.mqttClient.Subscribe(topic, 1, m.controlCallback)
|
|
|
|
go m._asyncWait(token, nil)
|
|
}
|
|
|
|
func (m *MqttService) Start() {
|
|
m.mqttClient.Connect()
|
|
}
|
|
|
|
func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService {
|
|
clientOptions := mqtt.NewClientOptions()
|
|
clientOptions.AddBroker(config.BrokerAddress)
|
|
|
|
if config.Username != "" {
|
|
clientOptions.Username = config.Username
|
|
clientOptions.Password = config.Password
|
|
}
|
|
|
|
clientOptions.SetConnectRetry(true)
|
|
clientOptions.SetConnectRetryInterval(time.Duration(config.ConnectRetryIntervalMs) * time.Millisecond)
|
|
clientOptions.SetMaxReconnectInterval(time.Duration(config.MaxReconnectIntervalMs) * time.Millisecond)
|
|
clientOptions.SetAutoReconnect(config.AutoReconnect)
|
|
clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond)
|
|
clientOptions.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
|
log.Warn("connection lost to broker")
|
|
})
|
|
|
|
clientOptions.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
|
|
log.Infof("connection attempt to %s", broker.String())
|
|
return tlsCfg
|
|
})
|
|
var mqttService *MqttService
|
|
clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
|
|
log.Info("successfully connected to broker")
|
|
mqttService._subscribe()
|
|
})
|
|
mqttClient := mqtt.NewClient(clientOptions)
|
|
|
|
m := MqttService{
|
|
mqttClient: mqttClient,
|
|
quit: make(chan struct{}, 1),
|
|
pinService: pinService,
|
|
config: config,
|
|
}
|
|
mqttService = &m
|
|
|
|
pinService.OnCycleCallback = m.pinCyclicCallback
|
|
pinService.OnChangeCallback = m.pinEventCallback
|
|
|
|
return m
|
|
}
|