- fix re-connection after connection loss to broker

- add subscribe call after reconnect
  - additional config settings for timeout values
- add more info/warn outputs
- change default log level from "warn" to "info"
This commit is contained in:
Thomas Vogl 2022-12-31 11:09:28 +01:00
parent 005b167f5e
commit 6b64e5aa00
6 changed files with 48 additions and 22 deletions

View File

@ -1,8 +1,10 @@
package MqttService
import (
"crypto/tls"
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"net/url"
"rpiMqttControl/internal/PinControlService"
"time"
)
@ -66,10 +68,7 @@ func (m *MqttService) _subscribe() {
}
func (m *MqttService) Start() {
token := m.mqttClient.Connect()
go m._asyncWait(token, func() {
m._subscribe()
})
m.mqttClient.Connect()
}
func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService {
@ -82,10 +81,23 @@ func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinC
}
clientOptions.SetConnectRetry(true)
clientOptions.SetConnectRetryInterval(10 * time.Second)
clientOptions.SetConnectRetryInterval(time.Duration(config.ConnectRetryIntervalMs) * time.Millisecond)
clientOptions.SetMaxReconnectInterval(time.Duration(config.MaxReconnectIntervalMs) * time.Millisecond)
clientOptions.SetAutoReconnect(config.AutoReconnect)
clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond)
clientOptions.SetConnectionLostHandler(func(client mqtt.Client, err error) {
log.Warn("connection lost to broker")
})
clientOptions.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
log.Infof("connection attempt to %s", broker.String())
return tlsCfg
})
var mqttService *MqttService
clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
log.Info("successfully connected to broker")
mqttService._subscribe()
})
mqttClient := mqtt.NewClient(clientOptions)
m := MqttService{
@ -94,6 +106,7 @@ func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinC
pinService: pinService,
config: config,
}
mqttService = &m
pinService.OnCycleCallback = m.pinCyclicCallback
pinService.OnChangeCallback = m.pinEventCallback

View File

@ -1,21 +1,23 @@
package MqttService
type MqttServiceConfig struct {
BrokerAddress string `yaml:"broker-address"`
TopicPrefix string `yaml:"topic-prefix"`
AutoReconnect bool `yaml:"auto-reconnect"`
ConnectTimeoutMs int `yaml:"connect-timeout-ms"`
Username string `yaml:"username"`
Password string `yaml:"password"`
BrokerAddress string `yaml:"broker-address"`
TopicPrefix string `yaml:"topic-prefix"`
AutoReconnect bool `yaml:"auto-reconnect"`
ConnectTimeoutMs int `yaml:"connect-timeout-ms"`
ConnectRetryIntervalMs int `yaml:"connect-retry-interval-ms"`
MaxReconnectIntervalMs int `yaml:"max-reconnect-interval-ms"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
func NewMqttServiceConfig() MqttServiceConfig {
return MqttServiceConfig{
BrokerAddress: "tcp://localhost:1883",
TopicPrefix: "/rpicontrol",
AutoReconnect: true,
ConnectTimeoutMs: 10000,
BrokerAddress: "tcp://localhost:1883",
TopicPrefix: "/rpicontrol",
AutoReconnect: true,
ConnectTimeoutMs: 10000,
ConnectRetryIntervalMs: 10000,
MaxReconnectIntervalMs: 10000,
}
}

View File

@ -55,7 +55,7 @@ func (p *Pin) State() PinState {
}
func (p *Pin) Command(cmd PinCommand) error {
log.Debugf("try to send command %s for pin %s (pin no: %d)", cmd, p.Name, p.Id)
log.Infof("send command \"%s\" for pin %s (pin no: %d)", cmd, p.Name, p.Id)
if p.Direction != Output {
return errors.New("pin is not an output")
}
@ -75,9 +75,12 @@ func (p *Pin) Command(cmd PinCommand) error {
func (p *Pin) Configure() {
if p.Direction == Input {
log.Infof("configuring pin %s (pin no: %d) as Input", p.Name, p.Id)
p.PinHandle.Input()
} else if p.Direction == Output {
log.Infof("configuring pin %s (pin no: %d) as Output", p.Name, p.Id)
p.PinHandle.Output()
log.Infof("set initial state \"%s\" for pin %s (pin no: %d)", p.InitialState, p.Name, p.Id)
_ = p.Command(p.InitialState)
}
@ -94,5 +97,9 @@ func (p *Pin) Configure() {
}
func (p *Pin) Changed() bool {
return p.PinHandle.EdgeDetected()
ret := p.PinHandle.EdgeDetected()
if ret {
log.Infof("pin %s (pin no: %d) changed state", p.Name, p.Id)
}
return ret
}

View File

@ -49,7 +49,7 @@ func (p *PinControlService) _task() {
select {
case <-p.timer.C:
for pinName, pin := range p.Pins {
log.Debug("timer event")
//log.Debug("timer event")
if pin.Changed() {
log.Debugf("detected pin change for pin %s (pin no %d)", pin.Name, pin.Id)
if pin.SendChangeEvents && p.OnChangeCallback != nil {

View File

@ -12,7 +12,7 @@ import (
func main() {
flagConfig := flag.String("config", "/etc/rpicontrol/rpicontrol.conf", "path to config file")
flagLogLevel := flag.String("log", "warn", "set log level for console output")
flagLogLevel := flag.String("log", "info", "set log level for console output")
flag.Parse()
@ -20,6 +20,7 @@ func main() {
log.SetLevel(log.WarnLevel)
log.Warnf("could not set log level. %s", err.Error())
} else {
log.SetLevel(level)
}

View File

@ -1,5 +1,8 @@
mqtt-config:
broker-address: "tcp://nuc.fritz.box:1883"
broker-address: "tcp://localhost:1883"
connect-timeout-ms: 2000
connect-retry-interval-ms: 2000
max-reconnect-interval-ms: 15000
pin-control-config:
gpio-pins: