diff --git a/lib/connection.class.dart b/lib/connection.class.dart index f949b5c..4de29c5 100644 --- a/lib/connection.class.dart +++ b/lib/connection.class.dart @@ -61,16 +61,26 @@ class Connection { _tempToken = token; }); } - completer.complete(_connect()); + _connect().timeout(connectTimeout, onTimeout: () { + _disconnect().then((_) { + completer.completeError( + {"errorCode": 1, "errorMessage": "Connection timeout"}); + }); + }).then((_) => completer.complete()).catchError((e) { + completer.completeError(e); + }); } return completer.future; } + Completer connecting; + Future _connect() async { - Completer completer = Completer(); - Timer connectionTimer = Timer(connectTimeout, () { - if (!completer.isCompleted) completer.completeError({"errorCode": 1, "errorMessage": "Connection timeout"}); - }); + if (connecting != null && !connecting.isCompleted) { + Logger.w(""); + return connecting.future; + } + connecting = Completer(); await _disconnect(); Logger.d( "Socket connecting..."); _socket = IOWebSocketChannel.connect( @@ -78,18 +88,17 @@ class Connection { _socketSubscription = _socket.stream.listen( (message) { isConnected = true; - connectionTimer.cancel(); var data = json.decode(message); if (data["type"] == "auth_required") { Logger.d("[Received] <== ${data.toString()}"); - _authenticate().then((_) => completer.complete()).catchError((e) { - if (!completer.isCompleted) completer.completeError(e); + _authenticate().then((_) => connecting.complete()).catchError((e) { + if (!connecting.isCompleted) connecting.completeError(e); }); } else if (data["type"] == "auth_ok") { Logger.d("[Received] <== ${data.toString()}"); _messageResolver["auth"]?.complete(); _messageResolver.remove("auth"); - if (!completer.isCompleted) completer.complete(sendSocketMessage( + if (!connecting.isCompleted) connecting.complete(sendSocketMessage( type: "subscribe_events", additionalData: {"event_type": "state_changed"}, )); @@ -98,17 +107,17 @@ class Connection { _messageResolver["auth"]?.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"}); _messageResolver.remove("auth"); logout().then((_) { - if (!completer.isCompleted) completer.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"}); + if (!connecting.isCompleted) connecting.completeError({"errorCode": 62, "errorMessage": "${data["message"]}"}); }); } else { _handleMessage(data); } }, cancelOnError: true, - onDone: () => _handleSocketClose(completer), - onError: (e) => _handleSocketError(e, completer) + onDone: () => _handleSocketClose(connecting), + onError: (e) => _handleSocketError(e, connecting) ); - return completer.future; + return connecting.future; } Future _disconnect() async { @@ -150,7 +159,6 @@ class Connection { if (!connectionCompleter.isCompleted) { connectionCompleter.completeError({"errorCode": 82, "errorMessage": "Disconnected"}); } else { - //TODO improve _disconnect().then((_) { Timer(Duration(seconds: 5), () { Logger.d("Trying to reconnect..."); @@ -166,7 +174,6 @@ class Connection { if (!connectionCompleter.isCompleted) { connectionCompleter.completeError({"errorCode": 81, "errorMessage": "Unable to connect to Home Assistant"}); } else { - //TODO improve _disconnect().then((_) { Timer(Duration(seconds: 5), () { Logger.d("Trying to reconnect..."); @@ -237,9 +244,6 @@ class Connection { Future sendSocketMessage({String type, Map additionalData, bool auth: false}) { Completer _completer = Completer(); - if (!isConnected) { - _completer.completeError({"errorCode": 8, "errorMessage": "No connection to Home Assistant"}); - } Map dataObject = {"type": "$type"}; String callbackName; if (!auth) { @@ -253,10 +257,19 @@ class Connection { dataObject.addAll(additionalData); } _messageResolver[callbackName] = _completer; - //TODO add message to q and send after reconnect String rawMessage = json.encode(dataObject); Logger.d("[Sending] ==> $rawMessage"); - _socket.sink.add(rawMessage); + if (!isConnected) { + _connect().timeout(connectTimeout, onTimeout: (){ + _completer.completeError({"errorCode": 8, "errorMessage": "No connection to Home Assistant"}); + }).then((_) { + _socket.sink.add(rawMessage); + }).catchError((e) { + _completer.completeError(e); + }); + } else { + _socket.sink.add(rawMessage); + } return _completer.future; }