Resolves #58: Message queue

This commit is contained in:
estevez 2018-10-02 17:23:19 +03:00
parent 41297150c2
commit 95d80fbbfc

View File

@ -6,6 +6,7 @@ class HomeAssistant {
String _hassioAuthType; String _hassioAuthType;
IOWebSocketChannel _hassioChannel; IOWebSocketChannel _hassioChannel;
SendMessageQueue _messageQueue;
int _currentMessageId = 0; int _currentMessageId = 0;
int _statesMessageId = 0; int _statesMessageId = 0;
@ -36,6 +37,7 @@ class HomeAssistant {
_hassioAuthType = authType; _hassioAuthType = authType;
_entities = EntityCollection(); _entities = EntityCollection();
_uiBuilder = UIBuilder(); _uiBuilder = UIBuilder();
_messageQueue = SendMessageQueue(20);
} }
Future fetch() { Future fetch() {
@ -65,7 +67,7 @@ class HomeAssistant {
Future _reConnectSocket() { Future _reConnectSocket() {
if ((_connectionCompleter != null) && (!_connectionCompleter.isCompleted)) { if ((_connectionCompleter != null) && (!_connectionCompleter.isCompleted)) {
TheLogger.log("Warning","Previous connection is not complited"); TheLogger.log("Debug","Previous connection is not complited");
} else { } else {
if ((_hassioChannel == null) || (_hassioChannel.closeCode != null)) { if ((_hassioChannel == null) || (_hassioChannel.closeCode != null)) {
TheLogger.log("Debug", "Socket connecting..."); TheLogger.log("Debug", "Socket connecting...");
@ -131,8 +133,7 @@ class HomeAssistant {
var data = json.decode(message); var data = json.decode(message);
//TheLogger.log("Debug","[Received] => Message type: ${data['type']}"); //TheLogger.log("Debug","[Received] => Message type: ${data['type']}");
if (data["type"] == "auth_required") { if (data["type"] == "auth_required") {
_finishConnecting(null); _sendAuthMessageRaw('{"type": "auth","$_hassioAuthType": "$_hassioPassword"}');
_sendMessageRaw('{"type": "auth","$_hassioAuthType": "$_hassioPassword"}');
} else if (data["type"] == "auth_ok") { } else if (data["type"] == "auth_ok") {
_finishConnecting(null); _finishConnecting(null);
_sendSubscribe(); _sendSubscribe();
@ -164,14 +165,14 @@ class HomeAssistant {
void _sendSubscribe() { void _sendSubscribe() {
_incrementMessageId(); _incrementMessageId();
_subscriptionMessageId = _currentMessageId; _subscriptionMessageId = _currentMessageId;
_sendMessageRaw('{"id": $_subscriptionMessageId, "type": "subscribe_events", "event_type": "state_changed"}'); _sendMessageRaw('{"id": $_subscriptionMessageId, "type": "subscribe_events", "event_type": "state_changed"}', false);
} }
Future _getConfig() { Future _getConfig() {
_configCompleter = new Completer(); _configCompleter = new Completer();
_incrementMessageId(); _incrementMessageId();
_configMessageId = _currentMessageId; _configMessageId = _currentMessageId;
_sendMessageRaw('{"id": $_configMessageId, "type": "get_config"}'); _sendMessageRaw('{"id": $_configMessageId, "type": "get_config"}', false);
return _configCompleter.future; return _configCompleter.future;
} }
@ -180,7 +181,7 @@ class HomeAssistant {
_statesCompleter = new Completer(); _statesCompleter = new Completer();
_incrementMessageId(); _incrementMessageId();
_statesMessageId = _currentMessageId; _statesMessageId = _currentMessageId;
_sendMessageRaw('{"id": $_statesMessageId, "type": "get_states"}'); _sendMessageRaw('{"id": $_statesMessageId, "type": "get_states"}', false);
return _statesCompleter.future; return _statesCompleter.future;
} }
@ -189,7 +190,7 @@ class HomeAssistant {
_servicesCompleter = new Completer(); _servicesCompleter = new Completer();
_incrementMessageId(); _incrementMessageId();
_servicesMessageId = _currentMessageId; _servicesMessageId = _currentMessageId;
_sendMessageRaw('{"id": $_servicesMessageId, "type": "get_services"}'); _sendMessageRaw('{"id": $_servicesMessageId, "type": "get_services"}', false);
return _servicesCompleter.future; return _servicesCompleter.future;
} }
@ -198,15 +199,23 @@ class HomeAssistant {
_currentMessageId += 1; _currentMessageId += 1;
} }
_sendMessageRaw(String message) { void _sendAuthMessageRaw(String message) {
var sendCompleter = Completer();
_reConnectSocket().then((r) {
if (message.indexOf('"type": "auth"') > 0) {
TheLogger.log("Debug", "[Sending] ==> auth request"); TheLogger.log("Debug", "[Sending] ==> auth request");
} else {
TheLogger.log("Debug", "[Sending] ==> $message");
}
_hassioChannel.sink.add(message); _hassioChannel.sink.add(message);
}
_sendMessageRaw(String message, bool queued) {
var sendCompleter = Completer();
if (queued) _messageQueue.add(message);
_reConnectSocket().then((r) {
_messageQueue.getActualMessages().forEach((message){
TheLogger.log("Debug", "[Sending queued] ==> $message");
_hassioChannel.sink.add(message);
});
if (!queued) {
TheLogger.log("Debug", "[Sending] ==> $message");
_hassioChannel.sink.add(message);
}
sendCompleter.complete(); sendCompleter.complete();
}).catchError((e){ }).catchError((e){
sendCompleter.completeError(e); sendCompleter.completeError(e);
@ -223,7 +232,7 @@ class HomeAssistant {
}); });
} }
message += '}}'; message += '}}';
return _sendMessageRaw(message); return _sendMessageRaw(message, true);
} }
void _handleEntityStateChange(Map eventData) { void _handleEntityStateChange(Map eventData) {
@ -277,3 +286,43 @@ class HomeAssistant {
_statesCompleter.complete(); _statesCompleter.complete();
} }
} }
class SendMessageQueue {
int _messageTimeout;
List<HAMessage> _queue = [];
SendMessageQueue(this._messageTimeout);
void add(String message) {
_queue.add(HAMessage(_messageTimeout, message));
}
List<String> getActualMessages() {
_queue.removeWhere((item) => item.isExpired());
List<String> result = [];
_queue.forEach((haMessage){
result.add(haMessage.message);
});
this.clear();
return result;
}
void clear() {
_queue.clear();
}
}
class HAMessage {
DateTime _timeStamp;
int _messageTimeout;
String message;
HAMessage(this._messageTimeout, this.message) {
_timeStamp = DateTime.now();
}
bool isExpired() {
return _timeStamp.difference(DateTime.now()).inSeconds > _messageTimeout;
}
}