Ignore events from already discarded websocket listeners

This commit is contained in:
Jannis Mattheis
2020-08-01 13:59:12 +02:00
parent f3a0266694
commit 31c96e2d15

View File

@@ -13,6 +13,7 @@ import com.github.gotify.client.model.Message;
import com.github.gotify.log.Log; import com.github.gotify.log.Log;
import java.util.Calendar; import java.util.Calendar;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import okhttp3.HttpUrl; import okhttp3.HttpUrl;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
@@ -21,6 +22,7 @@ import okhttp3.WebSocket;
import okhttp3.WebSocketListener; import okhttp3.WebSocketListener;
class WebSocketConnection { class WebSocketConnection {
private static final AtomicLong ID = new AtomicLong(0);
private final ConnectivityManager connectivityManager; private final ConnectivityManager connectivityManager;
private final AlarmManager alarmManager; private final AlarmManager alarmManager;
private OkHttpClient client; private OkHttpClient client;
@@ -110,15 +112,16 @@ class WebSocketConnection {
public synchronized WebSocketConnection start() { public synchronized WebSocketConnection start() {
close(); close();
isClosed = false; isClosed = false;
Log.i("WebSocket: starting..."); long nextId = ID.incrementAndGet();
Log.i("WebSocket(" + nextId + "): starting...");
webSocket = client.newWebSocket(request(), new Listener()); webSocket = client.newWebSocket(request(), new Listener(nextId));
return this; return this;
} }
public synchronized void close() { public synchronized void close() {
if (webSocket != null) { if (webSocket != null) {
Log.i("WebSocket: closing existing connection."); Log.i("WebSocket(" + ID.get() + "): closing existing connection.");
isClosed = true; isClosed = true;
webSocket.close(1000, ""); webSocket.close(1000, "");
webSocket = null; webSocket = null;
@@ -147,39 +150,48 @@ class WebSocketConnection {
} }
private class Listener extends WebSocketListener { private class Listener extends WebSocketListener {
private final long id;
public Listener(long id) {
this.id = id;
}
@Override @Override
public void onOpen(WebSocket webSocket, Response response) { public void onOpen(WebSocket webSocket, Response response) {
Log.i("WebSocket: opened"); syncExec(
synchronized (this) { () -> {
onOpen.run(); Log.i("WebSocket(" + id + "): opened");
onOpen.run();
if (errorCount > 0) { if (errorCount > 0) {
onReconnected.run(); onReconnected.run();
errorCount = 0; errorCount = 0;
} }
} });
super.onOpen(webSocket, response); super.onOpen(webSocket, response);
} }
@Override @Override
public void onMessage(WebSocket webSocket, String text) { public void onMessage(WebSocket webSocket, String text) {
Log.i("WebSocket: received message " + text); syncExec(
synchronized (this) { () -> {
Message message = Utils.JSON.fromJson(text, Message.class); Log.i("WebSocket(" + id + "): received message " + text);
onMessage.onSuccess(message); Message message = Utils.JSON.fromJson(text, Message.class);
} onMessage.onSuccess(message);
});
super.onMessage(webSocket, text); super.onMessage(webSocket, text);
} }
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
synchronized (this) { syncExec(
if (!isClosed) { () -> {
Log.w("WebSocket: closed"); if (!isClosed) {
onClose.run(); Log.w("WebSocket(" + id + "): closed");
} onClose.run();
} isClosed = true;
}
});
super.onClosed(webSocket, code, reason); super.onClosed(webSocket, code, reason);
} }
@@ -188,31 +200,40 @@ class WebSocketConnection {
public void onFailure(WebSocket webSocket, Throwable t, Response response) { public void onFailure(WebSocket webSocket, Throwable t, Response response) {
String code = response != null ? "StatusCode: " + response.code() : ""; String code = response != null ? "StatusCode: " + response.code() : "";
String message = response != null ? response.message() : ""; String message = response != null ? response.message() : "";
Log.e("WebSocket: failure " + code + " Message: " + message, t); Log.e("WebSocket(" + id + "): failure " + code + " Message: " + message, t);
synchronized (this) { syncExec(
if (response != null && response.code() >= 400 && response.code() <= 499) { () -> {
onBadRequest.execute(message); if (response != null && response.code() >= 400 && response.code() <= 499) {
close(); onBadRequest.execute(message);
return; close();
} return;
}
errorCount++; errorCount++;
NetworkInfo network = connectivityManager.getActiveNetworkInfo(); NetworkInfo network = connectivityManager.getActiveNetworkInfo();
if (network == null || !network.isConnected()) { if (network == null || !network.isConnected()) {
Log.i("WebSocket: Network not connected"); Log.i("WebSocket(" + id + "): Network not connected");
onDisconnect.run(); onDisconnect.run();
return; return;
} }
int minutes = Math.min(errorCount * 2 - 1, 20); int minutes = Math.min(errorCount * 2 - 1, 20);
onNetworkFailure.execute(minutes); onNetworkFailure.execute(minutes);
scheduleReconnect(TimeUnit.MINUTES.toSeconds(minutes)); scheduleReconnect(TimeUnit.MINUTES.toSeconds(minutes));
} });
super.onFailure(webSocket, t, response); super.onFailure(webSocket, t, response);
} }
private void syncExec(Runnable runnable) {
synchronized (this) {
if (ID.get() == id) {
runnable.run();
}
}
}
} }
interface BadRequestRunnable { interface BadRequestRunnable {