diff --git a/currentmonitor/lantern-currentmonitor/pom.xml b/currentmonitor/lantern-currentmonitor/pom.xml index b6966ad..9a79aa3 100644 --- a/currentmonitor/lantern-currentmonitor/pom.xml +++ b/currentmonitor/lantern-currentmonitor/pom.xml @@ -42,6 +42,11 @@ lantern-util-http 1.0.0 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + diff --git a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorApp.java b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorApp.java index ac5d61f..a70423c 100644 --- a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorApp.java +++ b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorApp.java @@ -6,6 +6,7 @@ import com.lanternsoftware.currentmonitor.util.NetworkMonitor; import com.lanternsoftware.currentmonitor.wifi.WifiConfig; import com.lanternsoftware.datamodel.currentmonitor.Breaker; import com.lanternsoftware.datamodel.currentmonitor.BreakerConfig; +import com.lanternsoftware.datamodel.currentmonitor.BreakerGroup; import com.lanternsoftware.datamodel.currentmonitor.BreakerHub; import com.lanternsoftware.datamodel.currentmonitor.BreakerPower; import com.lanternsoftware.datamodel.currentmonitor.BreakerPowerMinute; @@ -72,6 +73,7 @@ public class MonitorApp { } else LOG.info("Panel{} - Space{} Power: {}W", _p.getPanel(), Breaker.toSpaceDisplay(_p.getSpace()), String.format("%.3f", _p.getPower())); }; + private static MqttPoster mqttPoster; public static void main(String[] args) { config = DaoSerializer.parse(ResourceLoader.loadFileAsString(WORKING_DIR + "config.json"), MonitorConfig.class); @@ -80,7 +82,8 @@ public class MonitorApp { return; } pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout()); - host = NullUtils.terminateWith(config.getHost(), "/"); + if (NullUtils.isNotEmpty(config.getHost())) + host = NullUtils.terminateWith(config.getHost(), "/"); monitor.setDebug(config.isDebug()); monitor.start(); LEDFlasher.setLEDOn(false); @@ -168,19 +171,43 @@ public class MonitorApp { bluetoothConfig.start(); if (NullUtils.isNotEmpty(config.getAuthCode())) authCode = config.getAuthCode(); - else { + else if (NullUtils.isNotEmpty(host)) { HttpGet auth = new HttpGet(host + "auth"); HttpPool.addBasicAuthHeader(auth, config.getUsername(), config.getPassword()); authCode = DaoSerializer.getString(DaoSerializer.parse(pool.executeToString(auth)), "auth_code"); } - while (true) { - HttpGet get = new HttpGet(host + "config"); - get.addHeader("auth_code", authCode); - breakerConfig = DaoSerializer.parse(pool.executeToString(get), BreakerConfig.class); - if (breakerConfig != null) - break; - LOG.error("Failed to load breaker config. Retrying in 5 seconds..."); - ConcurrencyUtils.sleep(5000); + if (NullUtils.isNotEmpty(config.getMqttBrokerUrl())) + mqttPoster = new MqttPoster(config); + if (NullUtils.isNotEmpty(host)) { + while (true) { + HttpGet get = new HttpGet(host + "config"); + get.addHeader("auth_code", authCode); + breakerConfig = DaoSerializer.parse(pool.executeToString(get), BreakerConfig.class); + if ((breakerConfig != null) || (mqttPoster != null)) + break; + LOG.error("Failed to load breaker config. Retrying in 5 seconds..."); + ConcurrencyUtils.sleep(5000); + } + } + if ((mqttPoster != null) && (breakerConfig == null)) { + LOG.info("Hub not configured by a Lantern Power Monitor server, defaulting to MQTT mode only"); + BreakerHub hub = new BreakerHub(); + hub.setHub(config.getHub()); + hub.setVoltageCalibrationFactor(config.getFinalVoltageCalibrationFactor()); + hub.setPortCalibrationFactor(config.getMqttPortCalibrationFactor()); + hub.setFrequency(config.getMqttFrequency()); + breakerConfig = new BreakerConfig(); + breakerConfig.setBreakerHubs(CollectionUtils.asArrayList(hub)); + int groupId = 0; + breakerConfig.setBreakerGroups(new ArrayList<>()); + for (Breaker b : CollectionUtils.makeNotNull(config.getMqttBreakers())) { + BreakerGroup g = new BreakerGroup(); + g.setName(b.getName()); + g.setBreakers(CollectionUtils.asArrayList(b)); + g.setId(String.valueOf(++groupId)); + g.setAccountId(-1); + breakerConfig.getBreakerGroups().add(g); + } } LOG.info("Breaker Config loaded"); BreakerHub hub = breakerConfig.getHub(config.getHub()); @@ -246,8 +273,10 @@ public class MonitorApp { DaoEntity post = null; DaoEntity minutePost = null; int curMinute = (int) (new Date().getTime() / 60000); + List mqttReadings = new ArrayList<>(); synchronized (readings) { if (!readings.isEmpty()) { + mqttReadings.addAll(readings); post = new DaoEntity("readings", DaoSerializer.toDaoEntities(readings)); if (curMinute != lastMinute) { HubPowerMinute minute = new HubPowerMinute(); @@ -272,28 +301,35 @@ public class MonitorApp { readings.clear(); } } - if (minutePost != null) { - byte[] payload = DaoSerializer.toZipBson(minutePost); - if (!post(payload, "power/hub")) { - LOG.info("Failed Posting HubPowerMinute, writing cache"); - ResourceLoader.writeFile(WORKING_DIR + "cache/" + UUID.randomUUID().toString() + ".min", payload); + if (NullUtils.isNotEmpty(host)) { + if (minutePost != null) { + byte[] payload = DaoSerializer.toZipBson(minutePost); + if (!post(payload, "power/hub")) { + LOG.info("Failed Posting HubPowerMinute, writing cache"); + ResourceLoader.writeFile(WORKING_DIR + "cache/" + UUID.randomUUID().toString() + ".min", payload); + } } - } - if (post != null) { - byte[] payload = DaoSerializer.toZipBson(post); - if (post(payload, "power/batch")) { - File[] files = new File(WORKING_DIR + "cache").listFiles(); - if (files != null) { - for (File file : files) { - payload = ResourceLoader.loadFile(file.getAbsolutePath()); - if (post(payload, file.getName().endsWith("dat") ? "power/batch" : "power/hub")) - file.delete(); - else - break; + if (post != null) { + byte[] payload = DaoSerializer.toZipBson(post); + if (post(payload, "power/batch")) { + File[] files = new File(WORKING_DIR + "cache").listFiles(); + if (files != null) { + for (File file : files) { + payload = ResourceLoader.loadFile(file.getAbsolutePath()); + if (post(payload, file.getName().endsWith("dat") ? "power/batch" : "power/hub")) + file.delete(); + else + break; + } } } } } + if (mqttPoster != null) { + for (BreakerPower p : mqttReadings) { + monitor.submit(() -> mqttPoster.postPower(p)); + } + } if (DateUtils.diffInSeconds(new Date(), lastUpdateCheck) >= config.getUpdateInterval()) { lastUpdateCheck = new Date(); monitor.submit(new UpdateChecker()); @@ -323,6 +359,8 @@ public class MonitorApp { } private static boolean post(byte[] _payload, String _path) { + if (NullUtils.isEmpty(host)) + return false; HttpPost post = new HttpPost(host + _path); post.addHeader("auth_code", authCode); post.setEntity(new ByteArrayEntity(_payload, ContentType.APPLICATION_OCTET_STREAM)); @@ -338,19 +376,21 @@ public class MonitorApp { private static final class UpdateChecker implements Runnable { @Override public void run() { - DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(new HttpGet(host + "update/version"))); - String newVersion = DaoSerializer.getString(meta, "version"); - if (NullUtils.isNotEqual(newVersion, version)) { - LOG.info("New version found, {}, downloading...", newVersion); - byte[] jar = pool.executeToByteArray(new HttpGet(host + "update")); - if (CollectionUtils.length(jar) == DaoSerializer.getInteger(meta, "size") && NullUtils.isEqual(DigestUtils.md5Hex(jar), DaoSerializer.getString(meta, "checksum"))) { - LOG.info("Update downloaded, writing jar and restarting..."); - ResourceLoader.writeFile(WORKING_DIR + "lantern-currentmonitor.jar", jar); - ConcurrencyUtils.sleep(10000); - try { - Runtime.getRuntime().exec(new String[]{"systemctl","restart","currentmonitor"}); - } catch (IOException _e) { - LOG.error("Exception occurred while trying to restart", _e); + if (NullUtils.isNotEmpty(host)) { + DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(new HttpGet(host + "update/version"))); + String newVersion = DaoSerializer.getString(meta, "version"); + if (NullUtils.isNotEqual(newVersion, version)) { + LOG.info("New version found, {}, downloading...", newVersion); + byte[] jar = pool.executeToByteArray(new HttpGet(host + "update")); + if (CollectionUtils.length(jar) == DaoSerializer.getInteger(meta, "size") && NullUtils.isEqual(DigestUtils.md5Hex(jar), DaoSerializer.getString(meta, "checksum"))) { + LOG.info("Update downloaded, writing jar and restarting..."); + ResourceLoader.writeFile(WORKING_DIR + "lantern-currentmonitor.jar", jar); + ConcurrencyUtils.sleep(10000); + try { + Runtime.getRuntime().exec(new String[]{"systemctl", "restart", "currentmonitor"}); + } catch (IOException _e) { + LOG.error("Exception occurred while trying to restart", _e); + } } } } @@ -360,42 +400,40 @@ public class MonitorApp { private static final class CommandChecker implements Runnable { @Override public void run() { - HttpGet get = new HttpGet(host + "command"); - get.addHeader("auth_code", authCode); - DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(get)); - for (String command : DaoSerializer.getList(meta, "commands", String.class)) { - if (NullUtils.isEqual(command, "log")) { - uploadLog(); - } - else if (NullUtils.makeNotNull(command).startsWith("timeout")) { - LOG.info("Updating timeouts..."); - String[] timeouts = NullUtils.cleanSplit(command, "-"); - if (CollectionUtils.size(timeouts) != 3) - continue; - config.setConnectTimeout(DaoSerializer.toInteger(timeouts[1])); - config.setSocketTimeout(DaoSerializer.toInteger(timeouts[2])); - HttpPool old = pool; - pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout()); - old.shutdown(); - ResourceLoader.writeFile(WORKING_DIR+"config.json", DaoSerializer.toJson(config)); - } - else if (NullUtils.isEqual(command, "extend_filesystem")) { - LOG.info("Extending filesystem and rebooting"); - try { - Runtime.getRuntime().exec(new String[]{"sudo","raspi-config","--expand-rootfs"}); - ConcurrencyUtils.sleep(5000); - Runtime.getRuntime().exec(new String[]{"reboot","now"}); - } catch (IOException _e) { - LOG.error("Exception occurred while trying to extend filesystem", _e); - } - - } - else if (NullUtils.isEqual(command, "restart")) { - LOG.info("Restarting..."); - try { - Runtime.getRuntime().exec(new String[]{"systemctl","restart","currentmonitor"}); - } catch (IOException _e) { - LOG.error("Exception occurred while trying to restart", _e); + if (NullUtils.isNotEmpty(host)) { + HttpGet get = new HttpGet(host + "command"); + get.addHeader("auth_code", authCode); + DaoEntity meta = DaoSerializer.fromZipBson(pool.executeToByteArray(get)); + for (String command : DaoSerializer.getList(meta, "commands", String.class)) { + if (NullUtils.isEqual(command, "log")) { + uploadLog(); + } else if (NullUtils.makeNotNull(command).startsWith("timeout")) { + LOG.info("Updating timeouts..."); + String[] timeouts = NullUtils.cleanSplit(command, "-"); + if (CollectionUtils.size(timeouts) != 3) + continue; + config.setConnectTimeout(DaoSerializer.toInteger(timeouts[1])); + config.setSocketTimeout(DaoSerializer.toInteger(timeouts[2])); + HttpPool old = pool; + pool = new HttpPool(10, 10, config.getSocketTimeout(), config.getConnectTimeout(), config.getSocketTimeout()); + old.shutdown(); + ResourceLoader.writeFile(WORKING_DIR + "config.json", DaoSerializer.toJson(config)); + } else if (NullUtils.isEqual(command, "extend_filesystem")) { + LOG.info("Extending filesystem and rebooting"); + try { + Runtime.getRuntime().exec(new String[]{"sudo", "raspi-config", "--expand-rootfs"}); + ConcurrencyUtils.sleep(5000); + Runtime.getRuntime().exec(new String[]{"reboot", "now"}); + } catch (IOException _e) { + LOG.error("Exception occurred while trying to extend filesystem", _e); + } + } else if (NullUtils.isEqual(command, "restart")) { + LOG.info("Restarting..."); + try { + Runtime.getRuntime().exec(new String[]{"systemctl", "restart", "currentmonitor"}); + } catch (IOException _e) { + LOG.error("Exception occurred while trying to restart", _e); + } } } } diff --git a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorConfig.java b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorConfig.java index dcadc8f..f39a5af 100644 --- a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorConfig.java +++ b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MonitorConfig.java @@ -1,115 +1,191 @@ package com.lanternsoftware.currentmonitor; +import com.lanternsoftware.datamodel.currentmonitor.Breaker; import com.lanternsoftware.util.dao.annotations.DBSerializable; +import java.util.List; + @DBSerializable public class MonitorConfig { - private String host; - private String authCode; - private String username; - private String password; - private int hub; - private boolean debug; - private int connectTimeout; - private int socketTimeout; - private int updateInterval; - private float autoCalibrationVoltage; - private boolean needsCalibration; + private String host; + private String authCode; + private String username; + private String password; + private int hub; + private boolean debug; + private int connectTimeout; + private int socketTimeout; + private int updateInterval; + private float autoCalibrationVoltage; + private boolean needsCalibration; + private String mqttBrokerUrl; + private String mqttUserName; + private String mqttPassword; + private double mqttVoltageCalibrationFactor; + private double mqttPortCalibrationFactor; + private int mqttFrequency; + private List mqttBreakers; - public MonitorConfig() { - } + public MonitorConfig() { + } - public MonitorConfig(int _hub, String _host) { - hub = _hub; - host = _host; - } + public MonitorConfig(int _hub, String _host) { + hub = _hub; + host = _host; + } - public String getHost() { - return host; - } + public String getHost() { + return host; + } - public void setHost(String _host) { - host = _host; - } + public void setHost(String _host) { + host = _host; + } - public String getAuthCode() { - return authCode; - } + public String getAuthCode() { + return authCode; + } - public void setAuthCode(String _authCode) { - authCode = _authCode; - } + public void setAuthCode(String _authCode) { + authCode = _authCode; + } - public String getUsername() { - return username; - } + public String getUsername() { + return username; + } - public void setUsername(String _username) { - username = _username; - } + public void setUsername(String _username) { + username = _username; + } - public String getPassword() { - return password; - } + public String getPassword() { + return password; + } - public void setPassword(String _password) { - password = _password; - } + public void setPassword(String _password) { + password = _password; + } - public int getHub() { - return hub; - } + public int getHub() { + return hub; + } - public void setHub(int _hub) { - hub = _hub; - } + public void setHub(int _hub) { + hub = _hub; + } - public boolean isDebug() { - return debug; - } + public boolean isDebug() { + return debug; + } - public void setDebug(boolean _debug) { - debug = _debug; - } + public void setDebug(boolean _debug) { + debug = _debug; + } - public int getConnectTimeout() { - return connectTimeout == 0?3000:connectTimeout; - } + public int getConnectTimeout() { + return connectTimeout == 0 ? 3000 : connectTimeout; + } - public void setConnectTimeout(int _connectTimeout) { - connectTimeout = _connectTimeout; - } + public void setConnectTimeout(int _connectTimeout) { + connectTimeout = _connectTimeout; + } - public int getSocketTimeout() { - return socketTimeout == 0?5000:socketTimeout; - } + public int getSocketTimeout() { + return socketTimeout == 0 ? 5000 : socketTimeout; + } - public void setSocketTimeout(int _socketTimeout) { - socketTimeout = _socketTimeout; - } + public void setSocketTimeout(int _socketTimeout) { + socketTimeout = _socketTimeout; + } - public int getUpdateInterval() { - return updateInterval == 0?300:updateInterval; - } + public int getUpdateInterval() { + return updateInterval == 0 ? 300 : updateInterval; + } - public void setUpdateInterval(int _updateInterval) { - updateInterval = _updateInterval; - } + public void setUpdateInterval(int _updateInterval) { + updateInterval = _updateInterval; + } - public float getAutoCalibrationVoltage() { - return autoCalibrationVoltage; - } + public float getAutoCalibrationVoltage() { + return autoCalibrationVoltage; + } - public void setAutoCalibrationVoltage(float _autoCalibrationVoltage) { - autoCalibrationVoltage = _autoCalibrationVoltage; - } + public void setAutoCalibrationVoltage(float _autoCalibrationVoltage) { + autoCalibrationVoltage = _autoCalibrationVoltage; + } - public boolean isNeedsCalibration() { - return needsCalibration; - } + public boolean isNeedsCalibration() { + return needsCalibration; + } - public void setNeedsCalibration(boolean _needsCalibration) { - needsCalibration = _needsCalibration; - } + public void setNeedsCalibration(boolean _needsCalibration) { + needsCalibration = _needsCalibration; + } + + public String getMqttBrokerUrl() { + return mqttBrokerUrl; + } + + public void setMqttBrokerUrl(String _mqttBrokerUrl) { + mqttBrokerUrl = _mqttBrokerUrl; + } + + public String getMqttUserName() { + return mqttUserName; + } + + public void setMqttUserName(String _mqttUserName) { + mqttUserName = _mqttUserName; + } + + public String getMqttPassword() { + return mqttPassword; + } + + public void setMqttPassword(String _mqttPassword) { + mqttPassword = _mqttPassword; + } + + public double getMqttVoltageCalibrationFactor() { + return mqttVoltageCalibrationFactor; + } + + public double getFinalVoltageCalibrationFactor() { + if (mqttVoltageCalibrationFactor == 0.0) + mqttVoltageCalibrationFactor = 1.0; + return 0.3445* mqttVoltageCalibrationFactor; + } + + public void setMqttVoltageCalibrationFactor(double _mqttVoltageCalibrationFactor) { + mqttVoltageCalibrationFactor = _mqttVoltageCalibrationFactor; + } + + public double getMqttPortCalibrationFactor() { + if (mqttPortCalibrationFactor == 0.0) + mqttPortCalibrationFactor = 1.0; + return mqttPortCalibrationFactor; + } + + public void setMqttPortCalibrationFactor(double _mqttPortCalibrationFactor) { + mqttPortCalibrationFactor = _mqttPortCalibrationFactor; + } + + public int getMqttFrequency() { + if (mqttFrequency == 0) + mqttFrequency = 60; + return mqttFrequency; + } + + public void setMqttFrequency(int _mqttFrequency) { + mqttFrequency = _mqttFrequency; + } + + public List getMqttBreakers() { + return mqttBreakers; + } + + public void setMqttBreakers(List _mqttBreakers) { + mqttBreakers = _mqttBreakers; + } } diff --git a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MqttPoster.java b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MqttPoster.java new file mode 100644 index 0000000..a9b21bb --- /dev/null +++ b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/MqttPoster.java @@ -0,0 +1,45 @@ +package com.lanternsoftware.currentmonitor; + +import com.lanternsoftware.datamodel.currentmonitor.BreakerPower; +import com.lanternsoftware.util.NullUtils; +import com.lanternsoftware.util.dao.DaoSerializer; +import org.eclipse.paho.client.mqttv3.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttPoster { + private static final Logger LOG = LoggerFactory.getLogger(MqttPoster.class); + + private final IMqttClient client; + + public MqttPoster(MonitorConfig _config) { + IMqttClient c = null; + try { + c = new MqttClient(_config.getMqttBrokerUrl(), String.format("Lantern_Power_Monitor_Hub_%d", _config.getHub())); + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + if (NullUtils.isNotEmpty(_config.getMqttUserName())) + options.setUserName(_config.getMqttUserName()); + if (NullUtils.isNotEmpty(_config.getMqttPassword())) + options.setUserName(_config.getMqttPassword()); + c.connect(options); + } catch (MqttException e) { + LOG.error("Failed to create MQTT client", e); + } + client = c; + } + + public void postPower(BreakerPower _power) { + String topic = "lantern_power_monitor/breaker_power/" + _power.getKey(); + MqttMessage msg = new MqttMessage(NullUtils.toByteArray(DaoSerializer.toJson(_power))); + msg.setQos(2); + msg.setRetained(true); + try { + client.publish(topic, msg); + } catch (MqttException e) { + LOG.error("Failed to publish message to {}", topic, e); + } + } +} diff --git a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/bluetooth/BleApplication.java b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/bluetooth/BleApplication.java index e4ca956..2b2f639 100644 --- a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/bluetooth/BleApplication.java +++ b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/bluetooth/BleApplication.java @@ -71,7 +71,10 @@ public class BleApplication implements GattApplication1, ObjectManager { try { advertisement.stop(); appManager.UnregisterApplication(appPath); + BleHelper.unExportObject(this); + getManagedObjects().forEach(BleHelper::unExportObject); BleHelper.connection.disconnect(); + LOG.info("Bluetooth service and advertisement stopped"); } catch (Exception _e) { LOG.error("Failed to unregister application", _e); diff --git a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/dao/MonitorConfigSerializer.java b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/dao/MonitorConfigSerializer.java index f60bb0a..2206cd8 100644 --- a/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/dao/MonitorConfigSerializer.java +++ b/currentmonitor/lantern-currentmonitor/src/main/java/com/lanternsoftware/currentmonitor/dao/MonitorConfigSerializer.java @@ -1,6 +1,7 @@ package com.lanternsoftware.currentmonitor.dao; import com.lanternsoftware.currentmonitor.MonitorConfig; +import com.lanternsoftware.datamodel.currentmonitor.Breaker; import com.lanternsoftware.util.dao.AbstractDaoSerializer; import com.lanternsoftware.util.dao.DaoEntity; import com.lanternsoftware.util.dao.DaoProxyType; @@ -36,6 +37,13 @@ public class MonitorConfigSerializer extends AbstractDaoSerializer