Support posting power to an MQTT topic for Home Assistant.

This commit is contained in:
MarkBryanMilligan 2021-02-21 18:56:58 -06:00 committed by mmilligan
parent 90002ab4d4
commit ea07715c46
7 changed files with 342 additions and 160 deletions

View File

@ -42,6 +42,11 @@
<artifactId>lantern-util-http</artifactId> <artifactId>lantern-util-http</artifactId>
<version>1.0.0</version> <version>1.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<resources> <resources>

View File

@ -6,6 +6,7 @@ import com.lanternsoftware.currentmonitor.util.NetworkMonitor;
import com.lanternsoftware.currentmonitor.wifi.WifiConfig; import com.lanternsoftware.currentmonitor.wifi.WifiConfig;
import com.lanternsoftware.datamodel.currentmonitor.Breaker; import com.lanternsoftware.datamodel.currentmonitor.Breaker;
import com.lanternsoftware.datamodel.currentmonitor.BreakerConfig; import com.lanternsoftware.datamodel.currentmonitor.BreakerConfig;
import com.lanternsoftware.datamodel.currentmonitor.BreakerGroup;
import com.lanternsoftware.datamodel.currentmonitor.BreakerHub; import com.lanternsoftware.datamodel.currentmonitor.BreakerHub;
import com.lanternsoftware.datamodel.currentmonitor.BreakerPower; import com.lanternsoftware.datamodel.currentmonitor.BreakerPower;
import com.lanternsoftware.datamodel.currentmonitor.BreakerPowerMinute; import com.lanternsoftware.datamodel.currentmonitor.BreakerPowerMinute;
@ -72,6 +73,7 @@ public class MonitorApp {
} else } else
LOG.info("Panel{} - Space{} Power: {}W", _p.getPanel(), Breaker.toSpaceDisplay(_p.getSpace()), String.format("%.3f", _p.getPower())); LOG.info("Panel{} - Space{} Power: {}W", _p.getPanel(), Breaker.toSpaceDisplay(_p.getSpace()), String.format("%.3f", _p.getPower()));
}; };
private static MqttPoster mqttPoster;
public static void main(String[] args) { public static void main(String[] args) {
config = DaoSerializer.parse(ResourceLoader.loadFileAsString(WORKING_DIR + "config.json"), MonitorConfig.class); config = DaoSerializer.parse(ResourceLoader.loadFileAsString(WORKING_DIR + "config.json"), MonitorConfig.class);
@ -80,6 +82,7 @@ public class MonitorApp {
return; return;
} }
pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout()); pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout());
if (NullUtils.isNotEmpty(config.getHost()))
host = NullUtils.terminateWith(config.getHost(), "/"); host = NullUtils.terminateWith(config.getHost(), "/");
monitor.setDebug(config.isDebug()); monitor.setDebug(config.isDebug());
monitor.start(); monitor.start();
@ -168,20 +171,44 @@ public class MonitorApp {
bluetoothConfig.start(); bluetoothConfig.start();
if (NullUtils.isNotEmpty(config.getAuthCode())) if (NullUtils.isNotEmpty(config.getAuthCode()))
authCode = config.getAuthCode(); authCode = config.getAuthCode();
else { else if (NullUtils.isNotEmpty(host)) {
HttpGet auth = new HttpGet(host + "auth"); HttpGet auth = new HttpGet(host + "auth");
HttpPool.addBasicAuthHeader(auth, config.getUsername(), config.getPassword()); HttpPool.addBasicAuthHeader(auth, config.getUsername(), config.getPassword());
authCode = DaoSerializer.getString(DaoSerializer.parse(pool.executeToString(auth)), "auth_code"); authCode = DaoSerializer.getString(DaoSerializer.parse(pool.executeToString(auth)), "auth_code");
} }
if (NullUtils.isNotEmpty(config.getMqttBrokerUrl()))
mqttPoster = new MqttPoster(config);
if (NullUtils.isNotEmpty(host)) {
while (true) { while (true) {
HttpGet get = new HttpGet(host + "config"); HttpGet get = new HttpGet(host + "config");
get.addHeader("auth_code", authCode); get.addHeader("auth_code", authCode);
breakerConfig = DaoSerializer.parse(pool.executeToString(get), BreakerConfig.class); breakerConfig = DaoSerializer.parse(pool.executeToString(get), BreakerConfig.class);
if (breakerConfig != null) if ((breakerConfig != null) || (mqttPoster != null))
break; break;
LOG.error("Failed to load breaker config. Retrying in 5 seconds..."); LOG.error("Failed to load breaker config. Retrying in 5 seconds...");
ConcurrencyUtils.sleep(5000); ConcurrencyUtils.sleep(5000);
} }
}
if ((mqttPoster != null) && (breakerConfig == null)) {
LOG.info("Hub not configured by a Lantern Power Monitor server, defaulting to MQTT mode only");
BreakerHub hub = new BreakerHub();
hub.setHub(config.getHub());
hub.setVoltageCalibrationFactor(config.getFinalVoltageCalibrationFactor());
hub.setPortCalibrationFactor(config.getMqttPortCalibrationFactor());
hub.setFrequency(config.getMqttFrequency());
breakerConfig = new BreakerConfig();
breakerConfig.setBreakerHubs(CollectionUtils.asArrayList(hub));
int groupId = 0;
breakerConfig.setBreakerGroups(new ArrayList<>());
for (Breaker b : CollectionUtils.makeNotNull(config.getMqttBreakers())) {
BreakerGroup g = new BreakerGroup();
g.setName(b.getName());
g.setBreakers(CollectionUtils.asArrayList(b));
g.setId(String.valueOf(++groupId));
g.setAccountId(-1);
breakerConfig.getBreakerGroups().add(g);
}
}
LOG.info("Breaker Config loaded"); LOG.info("Breaker Config loaded");
BreakerHub hub = breakerConfig.getHub(config.getHub()); BreakerHub hub = breakerConfig.getHub(config.getHub());
if (hub != null) { if (hub != null) {
@ -246,8 +273,10 @@ public class MonitorApp {
DaoEntity post = null; DaoEntity post = null;
DaoEntity minutePost = null; DaoEntity minutePost = null;
int curMinute = (int) (new Date().getTime() / 60000); int curMinute = (int) (new Date().getTime() / 60000);
List<BreakerPower> mqttReadings = new ArrayList<>();
synchronized (readings) { synchronized (readings) {
if (!readings.isEmpty()) { if (!readings.isEmpty()) {
mqttReadings.addAll(readings);
post = new DaoEntity("readings", DaoSerializer.toDaoEntities(readings)); post = new DaoEntity("readings", DaoSerializer.toDaoEntities(readings));
if (curMinute != lastMinute) { if (curMinute != lastMinute) {
HubPowerMinute minute = new HubPowerMinute(); HubPowerMinute minute = new HubPowerMinute();
@ -272,6 +301,7 @@ public class MonitorApp {
readings.clear(); readings.clear();
} }
} }
if (NullUtils.isNotEmpty(host)) {
if (minutePost != null) { if (minutePost != null) {
byte[] payload = DaoSerializer.toZipBson(minutePost); byte[] payload = DaoSerializer.toZipBson(minutePost);
if (!post(payload, "power/hub")) { if (!post(payload, "power/hub")) {
@ -294,6 +324,12 @@ public class MonitorApp {
} }
} }
} }
}
if (mqttPoster != null) {
for (BreakerPower p : mqttReadings) {
monitor.submit(() -> mqttPoster.postPower(p));
}
}
if (DateUtils.diffInSeconds(new Date(), lastUpdateCheck) >= config.getUpdateInterval()) { if (DateUtils.diffInSeconds(new Date(), lastUpdateCheck) >= config.getUpdateInterval()) {
lastUpdateCheck = new Date(); lastUpdateCheck = new Date();
monitor.submit(new UpdateChecker()); monitor.submit(new UpdateChecker());
@ -323,6 +359,8 @@ public class MonitorApp {
} }
private static boolean post(byte[] _payload, String _path) { private static boolean post(byte[] _payload, String _path) {
if (NullUtils.isEmpty(host))
return false;
HttpPost post = new HttpPost(host + _path); HttpPost post = new HttpPost(host + _path);
post.addHeader("auth_code", authCode); post.addHeader("auth_code", authCode);
post.setEntity(new ByteArrayEntity(_payload, ContentType.APPLICATION_OCTET_STREAM)); post.setEntity(new ByteArrayEntity(_payload, ContentType.APPLICATION_OCTET_STREAM));
@ -338,6 +376,7 @@ public class MonitorApp {
private static final class UpdateChecker implements Runnable { private static final class UpdateChecker implements Runnable {
@Override @Override
public void run() { public void run() {
if (NullUtils.isNotEmpty(host)) {
DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(new HttpGet(host + "update/version"))); DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(new HttpGet(host + "update/version")));
String newVersion = DaoSerializer.getString(meta, "version"); String newVersion = DaoSerializer.getString(meta, "version");
if (NullUtils.isNotEqual(newVersion, version)) { if (NullUtils.isNotEqual(newVersion, version)) {
@ -348,7 +387,7 @@ public class MonitorApp {
ResourceLoader.writeFile(WORKING_DIR + "lantern-currentmonitor.jar", jar); ResourceLoader.writeFile(WORKING_DIR + "lantern-currentmonitor.jar", jar);
ConcurrencyUtils.sleep(10000); ConcurrencyUtils.sleep(10000);
try { try {
Runtime.getRuntime().exec(new String[]{"systemctl","restart","currentmonitor"}); Runtime.getRuntime().exec(new String[]{"systemctl", "restart", "currentmonitor"});
} catch (IOException _e) { } catch (IOException _e) {
LOG.error("Exception occurred while trying to restart", _e); LOG.error("Exception occurred while trying to restart", _e);
} }
@ -356,18 +395,19 @@ public class MonitorApp {
} }
} }
} }
}
private static final class CommandChecker implements Runnable { private static final class CommandChecker implements Runnable {
@Override @Override
public void run() { public void run() {
if (NullUtils.isNotEmpty(host)) {
HttpGet get = new HttpGet(host + "command"); HttpGet get = new HttpGet(host + "command");
get.addHeader("auth_code", authCode); get.addHeader("auth_code", authCode);
DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(get)); DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(get));
for (String command : DaoSerializer.getList(meta, "commands", String.class)) { for (String command : DaoSerializer.getList(meta, "commands", String.class)) {
if (NullUtils.isEqual(command, "log")) { if (NullUtils.isEqual(command, "log")) {
uploadLog(); uploadLog();
} } else if (NullUtils.makeNotNull(command).startsWith("timeout")) {
else if (NullUtils.makeNotNull(command).startsWith("timeout")) {
LOG.info("Updating timeouts..."); LOG.info("Updating timeouts...");
String[] timeouts = NullUtils.cleanSplit(command, "-"); String[] timeouts = NullUtils.cleanSplit(command, "-");
if (CollectionUtils.size(timeouts) != 3) if (CollectionUtils.size(timeouts) != 3)
@ -377,23 +417,20 @@ public class MonitorApp {
HttpPool old = pool; HttpPool old = pool;
pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout()); pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout());
old.shutdown(); old.shutdown();
ResourceLoader.writeFile(WORKING_DIR+"config.json", DaoSerializer.toJson(config)); ResourceLoader.writeFile(WORKING_DIR + "config.json", DaoSerializer.toJson(config));
} } else if (NullUtils.isEqual(command, "extend_filesystem")) {
else if (NullUtils.isEqual(command, "extend_filesystem")) {
LOG.info("Extending filesystem and rebooting"); LOG.info("Extending filesystem and rebooting");
try { try {
Runtime.getRuntime().exec(new String[]{"sudo","raspi-config","--expand-rootfs"}); Runtime.getRuntime().exec(new String[]{"sudo", "raspi-config", "--expand-rootfs"});
ConcurrencyUtils.sleep(5000); ConcurrencyUtils.sleep(5000);
Runtime.getRuntime().exec(new String[]{"reboot","now"}); Runtime.getRuntime().exec(new String[]{"reboot", "now"});
} catch (IOException _e) { } catch (IOException _e) {
LOG.error("Exception occurred while trying to extend filesystem", _e); LOG.error("Exception occurred while trying to extend filesystem", _e);
} }
} else if (NullUtils.isEqual(command, "restart")) {
}
else if (NullUtils.isEqual(command, "restart")) {
LOG.info("Restarting..."); LOG.info("Restarting...");
try { try {
Runtime.getRuntime().exec(new String[]{"systemctl","restart","currentmonitor"}); Runtime.getRuntime().exec(new String[]{"systemctl", "restart", "currentmonitor"});
} catch (IOException _e) { } catch (IOException _e) {
LOG.error("Exception occurred while trying to restart", _e); LOG.error("Exception occurred while trying to restart", _e);
} }
@ -401,6 +438,7 @@ public class MonitorApp {
} }
} }
} }
}
private static String getVersionNumber() { private static String getVersionNumber() {
InputStream is = null; InputStream is = null;

View File

@ -1,8 +1,11 @@
package com.lanternsoftware.currentmonitor; package com.lanternsoftware.currentmonitor;
import com.lanternsoftware.datamodel.currentmonitor.Breaker;
import com.lanternsoftware.util.dao.annotations.DBSerializable; import com.lanternsoftware.util.dao.annotations.DBSerializable;
import java.util.List;
@DBSerializable @DBSerializable
public class MonitorConfig { public class MonitorConfig {
private String host; private String host;
@ -16,6 +19,13 @@ public class MonitorConfig {
private int updateInterval; private int updateInterval;
private float autoCalibrationVoltage; private float autoCalibrationVoltage;
private boolean needsCalibration; private boolean needsCalibration;
private String mqttBrokerUrl;
private String mqttUserName;
private String mqttPassword;
private double mqttVoltageCalibrationFactor;
private double mqttPortCalibrationFactor;
private int mqttFrequency;
private List<Breaker> mqttBreakers;
public MonitorConfig() { public MonitorConfig() {
} }
@ -74,7 +84,7 @@ public class MonitorConfig {
} }
public int getConnectTimeout() { public int getConnectTimeout() {
return connectTimeout == 0?3000:connectTimeout; return connectTimeout == 0 ? 3000 : connectTimeout;
} }
public void setConnectTimeout(int _connectTimeout) { public void setConnectTimeout(int _connectTimeout) {
@ -82,7 +92,7 @@ public class MonitorConfig {
} }
public int getSocketTimeout() { public int getSocketTimeout() {
return socketTimeout == 0?5000:socketTimeout; return socketTimeout == 0 ? 5000 : socketTimeout;
} }
public void setSocketTimeout(int _socketTimeout) { public void setSocketTimeout(int _socketTimeout) {
@ -90,7 +100,7 @@ public class MonitorConfig {
} }
public int getUpdateInterval() { public int getUpdateInterval() {
return updateInterval == 0?300:updateInterval; return updateInterval == 0 ? 300 : updateInterval;
} }
public void setUpdateInterval(int _updateInterval) { public void setUpdateInterval(int _updateInterval) {
@ -112,4 +122,70 @@ public class MonitorConfig {
public void setNeedsCalibration(boolean _needsCalibration) { public void setNeedsCalibration(boolean _needsCalibration) {
needsCalibration = _needsCalibration; needsCalibration = _needsCalibration;
} }
public String getMqttBrokerUrl() {
return mqttBrokerUrl;
}
public void setMqttBrokerUrl(String _mqttBrokerUrl) {
mqttBrokerUrl = _mqttBrokerUrl;
}
public String getMqttUserName() {
return mqttUserName;
}
public void setMqttUserName(String _mqttUserName) {
mqttUserName = _mqttUserName;
}
public String getMqttPassword() {
return mqttPassword;
}
public void setMqttPassword(String _mqttPassword) {
mqttPassword = _mqttPassword;
}
public double getMqttVoltageCalibrationFactor() {
return mqttVoltageCalibrationFactor;
}
public double getFinalVoltageCalibrationFactor() {
if (mqttVoltageCalibrationFactor == 0.0)
mqttVoltageCalibrationFactor = 1.0;
return 0.3445* mqttVoltageCalibrationFactor;
}
public void setMqttVoltageCalibrationFactor(double _mqttVoltageCalibrationFactor) {
mqttVoltageCalibrationFactor = _mqttVoltageCalibrationFactor;
}
public double getMqttPortCalibrationFactor() {
if (mqttPortCalibrationFactor == 0.0)
mqttPortCalibrationFactor = 1.0;
return mqttPortCalibrationFactor;
}
public void setMqttPortCalibrationFactor(double _mqttPortCalibrationFactor) {
mqttPortCalibrationFactor = _mqttPortCalibrationFactor;
}
public int getMqttFrequency() {
if (mqttFrequency == 0)
mqttFrequency = 60;
return mqttFrequency;
}
public void setMqttFrequency(int _mqttFrequency) {
mqttFrequency = _mqttFrequency;
}
public List<Breaker> getMqttBreakers() {
return mqttBreakers;
}
public void setMqttBreakers(List<Breaker> _mqttBreakers) {
mqttBreakers = _mqttBreakers;
}
} }

View File

@ -0,0 +1,45 @@
package com.lanternsoftware.currentmonitor;
import com.lanternsoftware.datamodel.currentmonitor.BreakerPower;
import com.lanternsoftware.util.NullUtils;
import com.lanternsoftware.util.dao.DaoSerializer;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttPoster {
private static final Logger LOG = LoggerFactory.getLogger(MqttPoster.class);
private final IMqttClient client;
public MqttPoster(MonitorConfig _config) {
IMqttClient c = null;
try {
c = new MqttClient(_config.getMqttBrokerUrl(), String.format("Lantern_Power_Monitor_Hub_%d", _config.getHub()));
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
if (NullUtils.isNotEmpty(_config.getMqttUserName()))
options.setUserName(_config.getMqttUserName());
if (NullUtils.isNotEmpty(_config.getMqttPassword()))
options.setUserName(_config.getMqttPassword());
c.connect(options);
} catch (MqttException e) {
LOG.error("Failed to create MQTT client", e);
}
client = c;
}
public void postPower(BreakerPower _power) {
String topic = "lantern_power_monitor/breaker_power/" + _power.getKey();
MqttMessage msg = new MqttMessage(NullUtils.toByteArray(DaoSerializer.toJson(_power)));
msg.setQos(2);
msg.setRetained(true);
try {
client.publish(topic, msg);
} catch (MqttException e) {
LOG.error("Failed to publish message to {}", topic, e);
}
}
}

View File

@ -71,7 +71,10 @@ public class BleApplication implements GattApplication1, ObjectManager {
try { try {
advertisement.stop(); advertisement.stop();
appManager.UnregisterApplication(appPath); appManager.UnregisterApplication(appPath);
BleHelper.unExportObject(this);
getManagedObjects().forEach(BleHelper::unExportObject);
BleHelper.connection.disconnect(); BleHelper.connection.disconnect();
LOG.info("Bluetooth service and advertisement stopped");
} }
catch (Exception _e) { catch (Exception _e) {
LOG.error("Failed to unregister application", _e); LOG.error("Failed to unregister application", _e);

View File

@ -1,6 +1,7 @@
package com.lanternsoftware.currentmonitor.dao; package com.lanternsoftware.currentmonitor.dao;
import com.lanternsoftware.currentmonitor.MonitorConfig; import com.lanternsoftware.currentmonitor.MonitorConfig;
import com.lanternsoftware.datamodel.currentmonitor.Breaker;
import com.lanternsoftware.util.dao.AbstractDaoSerializer; import com.lanternsoftware.util.dao.AbstractDaoSerializer;
import com.lanternsoftware.util.dao.DaoEntity; import com.lanternsoftware.util.dao.DaoEntity;
import com.lanternsoftware.util.dao.DaoProxyType; import com.lanternsoftware.util.dao.DaoProxyType;
@ -36,6 +37,13 @@ public class MonitorConfigSerializer extends AbstractDaoSerializer<MonitorConfig
d.put("update_interval", _o.getUpdateInterval()); d.put("update_interval", _o.getUpdateInterval());
d.put("auto_calibration_voltage", _o.getAutoCalibrationVoltage()); d.put("auto_calibration_voltage", _o.getAutoCalibrationVoltage());
d.put("needs_calibration", _o.isNeedsCalibration()); d.put("needs_calibration", _o.isNeedsCalibration());
d.put("mqtt_broker_url", _o.getMqttBrokerUrl());
d.put("mqtt_user_name", _o.getMqttUserName());
d.put("mqtt_password", _o.getMqttPassword());
d.put("mqtt_voltage_calibration_factor", _o.getMqttVoltageCalibrationFactor());
d.put("mqtt_port_calibration_factor", _o.getMqttPortCalibrationFactor());
d.put("mqtt_frequency", _o.getMqttFrequency());
d.put("mqtt_breakers", DaoSerializer.toDaoEntities(_o.getMqttBreakers(), DaoProxyType.MONGO));
return d; return d;
} }
@ -54,6 +62,13 @@ public class MonitorConfigSerializer extends AbstractDaoSerializer<MonitorConfig
o.setUpdateInterval(DaoSerializer.getInteger(_d, "update_interval")); o.setUpdateInterval(DaoSerializer.getInteger(_d, "update_interval"));
o.setAutoCalibrationVoltage(DaoSerializer.getFloat(_d, "auto_calibration_voltage")); o.setAutoCalibrationVoltage(DaoSerializer.getFloat(_d, "auto_calibration_voltage"));
o.setNeedsCalibration(DaoSerializer.getBoolean(_d, "needs_calibration")); o.setNeedsCalibration(DaoSerializer.getBoolean(_d, "needs_calibration"));
o.setMqttBrokerUrl(DaoSerializer.getString(_d, "mqtt_broker_url"));
o.setMqttUserName(DaoSerializer.getString(_d, "mqtt_user_name"));
o.setMqttPassword(DaoSerializer.getString(_d, "mqtt_password"));
o.setMqttVoltageCalibrationFactor(DaoSerializer.getDouble(_d, "mqtt_voltage_calibration_factor"));
o.setMqttPortCalibrationFactor(DaoSerializer.getDouble(_d, "mqtt_port_calibration_factor"));
o.setMqttFrequency(DaoSerializer.getInteger(_d, "mqtt_frequency"));
o.setMqttBreakers(DaoSerializer.getList(_d, "mqtt_breakers", Breaker.class));
return o; return o;
} }
} }

View File

@ -136,7 +136,7 @@ public class Breaker {
} }
public double getLowPassFilter() { public double getLowPassFilter() {
return lowPassFilter; return Math.abs(lowPassFilter) < 0.05 ? 1.6: lowPassFilter;
} }
public void setLowPassFilter(double _lowPassFilter) { public void setLowPassFilter(double _lowPassFilter) {