initial commit

This commit is contained in:
2022-12-16 23:27:24 +01:00
commit 60487a79db
13 changed files with 550 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
package MqttService
import (
"errors"
"strings"
)
func PinNameFromTopic(topic string) (string,error) {
topicComponents := strings.Split(topic, "/")
if len(topicComponents) >= 2 {
gpioCh := topicComponents[len(topicComponents)-2]
return gpioCh, nil
} else {
return "", errors.New("invalid topic")
}
}

View File

@@ -0,0 +1,102 @@
package MqttService
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"rpiMqttControl/internal/PinControlService"
"time"
)
type OnCompletionHandler func()
type MqttService struct {
mqttClient mqtt.Client
quit chan struct{}
config MqttServiceConfig
pinService *PinControlService.PinControlService
}
func (m *MqttService) pinEventCallback(pinName string, pinState PinControlService.PinState) {
topic := m.config.TopicPrefix + "/gpio/" + pinName + "/event"
token := m.mqttClient.Publish(topic, 1, false, string(pinState))
go m._asyncWait(token, nil)
}
func (m *MqttService) pinCyclicCallback(pinName string, pinState PinControlService.PinState) {
topic := m.config.TopicPrefix + "/gpio/" + pinName + "/value"
token := m.mqttClient.Publish(topic, 1, false, string(pinState))
go m._asyncWait(token, nil)
}
func (m *MqttService) controlCallback(client mqtt.Client, message mqtt.Message) {
if pinNo, err := PinNameFromTopic(message.Topic()); err == nil {
if err := m.pinService.Command(pinNo, PinControlService.PinCommand(message.Payload())); err != nil {
log.Error(err.Error())
}
} else {
log.Error(err.Error())
}
}
func (m *MqttService) Stop() {
m.quit <- struct{}{}
m.mqttClient.Disconnect(100)
}
func (m *MqttService) _asyncWait(token mqtt.Token, onComplete OnCompletionHandler) {
for {
select {
case <-token.Done():
if err := token.Error(); err != nil {
log.Fatal(err)
} else {
if onComplete != nil {
onComplete()
}
}
return
case <-m.quit:
return
}
}
}
func (m *MqttService) _subscribe() {
topic := m.config.TopicPrefix + "/gpio/+/control"
token := m.mqttClient.Subscribe(topic, 1, m.controlCallback)
go m._asyncWait(token, nil)
}
func (m *MqttService) Start() {
token := m.mqttClient.Connect()
go m._asyncWait(token, func() {
m._subscribe()
})
}
func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService {
clientOptions := mqtt.NewClientOptions()
clientOptions.AddBroker(config.BrokerAddress)
if config.Username != "" {
clientOptions.Username = config.Username
clientOptions.Password = config.Password
}
clientOptions.SetConnectRetry(true)
clientOptions.SetConnectRetryInterval(10 * time.Second)
clientOptions.SetAutoReconnect(config.AutoReconnect)
clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond)
mqttClient := mqtt.NewClient(clientOptions)
m := MqttService{
mqttClient: mqttClient,
quit: make(chan struct{}, 1),
pinService: pinService,
config: config,
}
pinService.OnCycleCallback = m.pinCyclicCallback
pinService.OnChangeCallback = m.pinEventCallback
return m
}

View File

@@ -0,0 +1,21 @@
package MqttService
type MqttServiceConfig struct {
BrokerAddress string `yaml:"broker-address"`
TopicPrefix string `yaml:"topic-prefix"`
AutoReconnect bool `yaml:"auto-reconnect"`
ConnectTimeoutMs int `yaml:"connect-timeout-ms"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
func NewMqttServiceConfig() MqttServiceConfig {
return MqttServiceConfig{
BrokerAddress: "tcp://localhost:1883",
TopicPrefix: "/rpicontrol",
AutoReconnect: true,
ConnectTimeoutMs: 10000,
}
}