From 6b64e5aa00ee9fec51908f502feff216131dacd2 Mon Sep 17 00:00:00 2001 From: Thomas Vogl Date: Sat, 31 Dec 2022 11:09:28 +0100 Subject: [PATCH] - fix re-connection after connection loss to broker - add subscribe call after reconnect - additional config settings for timeout values - add more info/warn outputs - change default log level from "warn" to "info" --- internal/MqttService/MqttService.go | 23 ++++++++++++---- internal/MqttService/MqttServiceConfig.go | 26 ++++++++++--------- internal/PinControlService/Pin.go | 11 ++++++-- .../PinControlService/PinControlService.go | 2 +- main.go | 3 ++- rpicontrol.config.example | 5 +++- 6 files changed, 48 insertions(+), 22 deletions(-) diff --git a/internal/MqttService/MqttService.go b/internal/MqttService/MqttService.go index 8e33b24..47574fe 100644 --- a/internal/MqttService/MqttService.go +++ b/internal/MqttService/MqttService.go @@ -1,8 +1,10 @@ package MqttService import ( + "crypto/tls" mqtt "github.com/eclipse/paho.mqtt.golang" log "github.com/sirupsen/logrus" + "net/url" "rpiMqttControl/internal/PinControlService" "time" ) @@ -66,10 +68,7 @@ func (m *MqttService) _subscribe() { } func (m *MqttService) Start() { - token := m.mqttClient.Connect() - go m._asyncWait(token, func() { - m._subscribe() - }) + m.mqttClient.Connect() } func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinControlService) MqttService { @@ -82,10 +81,23 @@ func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinC } clientOptions.SetConnectRetry(true) - clientOptions.SetConnectRetryInterval(10 * time.Second) + clientOptions.SetConnectRetryInterval(time.Duration(config.ConnectRetryIntervalMs) * time.Millisecond) + clientOptions.SetMaxReconnectInterval(time.Duration(config.MaxReconnectIntervalMs) * time.Millisecond) clientOptions.SetAutoReconnect(config.AutoReconnect) clientOptions.SetConnectTimeout(time.Duration(config.ConnectTimeoutMs) * time.Millisecond) + clientOptions.SetConnectionLostHandler(func(client mqtt.Client, err error) { + log.Warn("connection lost to broker") + }) + clientOptions.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { + log.Infof("connection attempt to %s", broker.String()) + return tlsCfg + }) + var mqttService *MqttService + clientOptions.SetOnConnectHandler(func(client mqtt.Client) { + log.Info("successfully connected to broker") + mqttService._subscribe() + }) mqttClient := mqtt.NewClient(clientOptions) m := MqttService{ @@ -94,6 +106,7 @@ func NewMqttService(config MqttServiceConfig, pinService *PinControlService.PinC pinService: pinService, config: config, } + mqttService = &m pinService.OnCycleCallback = m.pinCyclicCallback pinService.OnChangeCallback = m.pinEventCallback diff --git a/internal/MqttService/MqttServiceConfig.go b/internal/MqttService/MqttServiceConfig.go index 9ef279c..6e7cf66 100644 --- a/internal/MqttService/MqttServiceConfig.go +++ b/internal/MqttService/MqttServiceConfig.go @@ -1,21 +1,23 @@ package MqttService type MqttServiceConfig struct { - BrokerAddress string `yaml:"broker-address"` - TopicPrefix string `yaml:"topic-prefix"` - AutoReconnect bool `yaml:"auto-reconnect"` - ConnectTimeoutMs int `yaml:"connect-timeout-ms"` - Username string `yaml:"username"` - Password string `yaml:"password"` - + BrokerAddress string `yaml:"broker-address"` + TopicPrefix string `yaml:"topic-prefix"` + AutoReconnect bool `yaml:"auto-reconnect"` + ConnectTimeoutMs int `yaml:"connect-timeout-ms"` + ConnectRetryIntervalMs int `yaml:"connect-retry-interval-ms"` + MaxReconnectIntervalMs int `yaml:"max-reconnect-interval-ms"` + Username string `yaml:"username"` + Password string `yaml:"password"` } func NewMqttServiceConfig() MqttServiceConfig { return MqttServiceConfig{ - BrokerAddress: "tcp://localhost:1883", - TopicPrefix: "/rpicontrol", - AutoReconnect: true, - ConnectTimeoutMs: 10000, - + BrokerAddress: "tcp://localhost:1883", + TopicPrefix: "/rpicontrol", + AutoReconnect: true, + ConnectTimeoutMs: 10000, + ConnectRetryIntervalMs: 10000, + MaxReconnectIntervalMs: 10000, } } diff --git a/internal/PinControlService/Pin.go b/internal/PinControlService/Pin.go index 8c01ef1..49a2383 100644 --- a/internal/PinControlService/Pin.go +++ b/internal/PinControlService/Pin.go @@ -55,7 +55,7 @@ func (p *Pin) State() PinState { } func (p *Pin) Command(cmd PinCommand) error { - log.Debugf("try to send command %s for pin %s (pin no: %d)", cmd, p.Name, p.Id) + log.Infof("send command \"%s\" for pin %s (pin no: %d)", cmd, p.Name, p.Id) if p.Direction != Output { return errors.New("pin is not an output") } @@ -75,9 +75,12 @@ func (p *Pin) Command(cmd PinCommand) error { func (p *Pin) Configure() { if p.Direction == Input { + log.Infof("configuring pin %s (pin no: %d) as Input", p.Name, p.Id) p.PinHandle.Input() } else if p.Direction == Output { + log.Infof("configuring pin %s (pin no: %d) as Output", p.Name, p.Id) p.PinHandle.Output() + log.Infof("set initial state \"%s\" for pin %s (pin no: %d)", p.InitialState, p.Name, p.Id) _ = p.Command(p.InitialState) } @@ -94,5 +97,9 @@ func (p *Pin) Configure() { } func (p *Pin) Changed() bool { - return p.PinHandle.EdgeDetected() + ret := p.PinHandle.EdgeDetected() + if ret { + log.Infof("pin %s (pin no: %d) changed state", p.Name, p.Id) + } + return ret } diff --git a/internal/PinControlService/PinControlService.go b/internal/PinControlService/PinControlService.go index 7dad069..b1fd6f0 100644 --- a/internal/PinControlService/PinControlService.go +++ b/internal/PinControlService/PinControlService.go @@ -49,7 +49,7 @@ func (p *PinControlService) _task() { select { case <-p.timer.C: for pinName, pin := range p.Pins { - log.Debug("timer event") + //log.Debug("timer event") if pin.Changed() { log.Debugf("detected pin change for pin %s (pin no %d)", pin.Name, pin.Id) if pin.SendChangeEvents && p.OnChangeCallback != nil { diff --git a/main.go b/main.go index e23c0f4..811270b 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ import ( func main() { flagConfig := flag.String("config", "/etc/rpicontrol/rpicontrol.conf", "path to config file") - flagLogLevel := flag.String("log", "warn", "set log level for console output") + flagLogLevel := flag.String("log", "info", "set log level for console output") flag.Parse() @@ -20,6 +20,7 @@ func main() { log.SetLevel(log.WarnLevel) log.Warnf("could not set log level. %s", err.Error()) } else { + log.SetLevel(level) } diff --git a/rpicontrol.config.example b/rpicontrol.config.example index 9422932..9d32ad6 100644 --- a/rpicontrol.config.example +++ b/rpicontrol.config.example @@ -1,5 +1,8 @@ mqtt-config: - broker-address: "tcp://nuc.fritz.box:1883" + broker-address: "tcp://localhost:1883" + connect-timeout-ms: 2000 + connect-retry-interval-ms: 2000 + max-reconnect-interval-ms: 15000 pin-control-config: gpio-pins: