Resolves #347, Resolves #346 Connection and reconnection

This commit is contained in:
estevez-dev 2019-03-29 11:04:43 +02:00
parent 7c010359c3
commit b9581d3762

View File

@ -61,16 +61,26 @@ class Connection {
_tempToken = token; _tempToken = token;
}); });
} }
completer.complete(_connect()); _connect().timeout(connectTimeout, onTimeout: () {
_disconnect().then((_) {
completer.completeError(
{"errorCode": 1, "errorMessage": "Connection timeout"});
});
}).then((_) => completer.complete()).catchError((e) {
completer.completeError(e);
});
} }
return completer.future; return completer.future;
} }
Completer connecting;
Future _connect() async { Future _connect() async {
Completer completer = Completer(); if (connecting != null && !connecting.isCompleted) {
Timer connectionTimer = Timer(connectTimeout, () { Logger.w("");
if (!completer.isCompleted) completer.completeError({"errorCode": 1, "errorMessage": "Connection timeout"}); return connecting.future;
}); }
connecting = Completer();
await _disconnect(); await _disconnect();
Logger.d( "Socket connecting..."); Logger.d( "Socket connecting...");
_socket = IOWebSocketChannel.connect( _socket = IOWebSocketChannel.connect(
@ -78,18 +88,17 @@ class Connection {
_socketSubscription = _socket.stream.listen( _socketSubscription = _socket.stream.listen(
(message) { (message) {
isConnected = true; isConnected = true;
connectionTimer.cancel();
var data = json.decode(message); var data = json.decode(message);
if (data["type"] == "auth_required") { if (data["type"] == "auth_required") {
Logger.d("[Received] <== ${data.toString()}"); Logger.d("[Received] <== ${data.toString()}");
_authenticate().then((_) => completer.complete()).catchError((e) { _authenticate().then((_) => connecting.complete()).catchError((e) {
if (!completer.isCompleted) completer.completeError(e); if (!connecting.isCompleted) connecting.completeError(e);
}); });
} else if (data["type"] == "auth_ok") { } else if (data["type"] == "auth_ok") {
Logger.d("[Received] <== ${data.toString()}"); Logger.d("[Received] <== ${data.toString()}");
_messageResolver["auth"]?.complete(); _messageResolver["auth"]?.complete();
_messageResolver.remove("auth"); _messageResolver.remove("auth");
if (!completer.isCompleted) completer.complete(sendSocketMessage( if (!connecting.isCompleted) connecting.complete(sendSocketMessage(
type: "subscribe_events", type: "subscribe_events",
additionalData: {"event_type": "state_changed"}, additionalData: {"event_type": "state_changed"},
)); ));
@ -98,17 +107,17 @@ class Connection {
_messageResolver["auth"]?.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"}); _messageResolver["auth"]?.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"});
_messageResolver.remove("auth"); _messageResolver.remove("auth");
logout().then((_) { logout().then((_) {
if (!completer.isCompleted) completer.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"}); if (!connecting.isCompleted) connecting.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"});
}); });
} else { } else {
_handleMessage(data); _handleMessage(data);
} }
}, },
cancelOnError: true, cancelOnError: true,
onDone: () => _handleSocketClose(completer), onDone: () => _handleSocketClose(connecting),
onError: (e) => _handleSocketError(e, completer) onError: (e) => _handleSocketError(e, connecting)
); );
return completer.future; return connecting.future;
} }
Future _disconnect() async { Future _disconnect() async {
@ -150,7 +159,6 @@ class Connection {
if (!connectionCompleter.isCompleted) { if (!connectionCompleter.isCompleted) {
connectionCompleter.completeError({"errorCode": 82, "errorMessage": "Disconnected"}); connectionCompleter.completeError({"errorCode": 82, "errorMessage": "Disconnected"});
} else { } else {
//TODO improve
_disconnect().then((_) { _disconnect().then((_) {
Timer(Duration(seconds: 5), () { Timer(Duration(seconds: 5), () {
Logger.d("Trying to reconnect..."); Logger.d("Trying to reconnect...");
@ -166,7 +174,6 @@ class Connection {
if (!connectionCompleter.isCompleted) { if (!connectionCompleter.isCompleted) {
connectionCompleter.completeError({"errorCode": 81, "errorMessage": "Unable to connect to Home Assistant"}); connectionCompleter.completeError({"errorCode": 81, "errorMessage": "Unable to connect to Home Assistant"});
} else { } else {
//TODO improve
_disconnect().then((_) { _disconnect().then((_) {
Timer(Duration(seconds: 5), () { Timer(Duration(seconds: 5), () {
Logger.d("Trying to reconnect..."); Logger.d("Trying to reconnect...");
@ -237,9 +244,6 @@ class Connection {
Future sendSocketMessage({String type, Map additionalData, bool auth: false}) { Future sendSocketMessage({String type, Map additionalData, bool auth: false}) {
Completer _completer = Completer(); Completer _completer = Completer();
if (!isConnected) {
_completer.completeError({"errorCode": 8, "errorMessage": "No connection to Home Assistant"});
}
Map dataObject = {"type": "$type"}; Map dataObject = {"type": "$type"};
String callbackName; String callbackName;
if (!auth) { if (!auth) {
@ -253,10 +257,19 @@ class Connection {
dataObject.addAll(additionalData); dataObject.addAll(additionalData);
} }
_messageResolver[callbackName] = _completer; _messageResolver[callbackName] = _completer;
//TODO add message to q and send after reconnect
String rawMessage = json.encode(dataObject); String rawMessage = json.encode(dataObject);
Logger.d("[Sending] ==> $rawMessage"); Logger.d("[Sending] ==> $rawMessage");
if (!isConnected) {
_connect().timeout(connectTimeout, onTimeout: (){
_completer.completeError({"errorCode": 8, "errorMessage": "No connection to Home Assistant"});
}).then((_) {
_socket.sink.add(rawMessage); _socket.sink.add(rawMessage);
}).catchError((e) {
_completer.completeError(e);
});
} else {
_socket.sink.add(rawMessage);
}
return _completer.future; return _completer.future;
} }