001/** 002 * 003 * Copyright 2018 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.Map; 020import java.util.Queue; 021import java.util.WeakHashMap; 022import java.util.concurrent.ConcurrentLinkedQueue; 023import java.util.concurrent.Executor; 024 025/** 026 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key. 027 * <p> 028 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks 029 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use 030 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user 031 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled, 032 * since synchronous connection listeners are invoked from the main event loop. 033 * </p> 034 * <p> 035 * It is common for those situations that the order of callbacks is not globally important, but only important in 036 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which 037 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is 038 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed 039 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which 040 * the listeners for those two contacts are invoked does not matter. 041 * </p> 042 * <p> 043 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts 044 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that 045 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same 046 * key. 047 * </p> 048 * 049 * @param <K> the type of the key 050 * @since 4.3 051 */ 052public class AsyncButOrdered<K> { 053 054 private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>(); 055 056 private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>(); 057 058 private final Executor executor; 059 060 public AsyncButOrdered() { 061 this(null); 062 } 063 064 public AsyncButOrdered(Executor executor) { 065 this.executor = executor; 066 } 067 068 /** 069 * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. 070 * 071 * @param key the key deriving the order 072 * @param runnable the {@link Runnable} to run 073 * @return true if a new thread was created 074 */ 075 public boolean performAsyncButOrdered(K key, Runnable runnable) { 076 Queue<Runnable> keyQueue; 077 synchronized (pendingRunnables) { 078 keyQueue = pendingRunnables.get(key); 079 if (keyQueue == null) { 080 keyQueue = new ConcurrentLinkedQueue<>(); 081 pendingRunnables.put(key, keyQueue); 082 } 083 } 084 085 keyQueue.add(runnable); 086 087 boolean newHandler; 088 synchronized (threadActiveMap) { 089 Boolean threadActive = threadActiveMap.get(key); 090 if (threadActive == null) { 091 threadActive = false; 092 threadActiveMap.put(key, threadActive); 093 } 094 095 newHandler = !threadActive; 096 if (newHandler) { 097 Handler handler = new Handler(keyQueue, key); 098 threadActiveMap.put(key, true); 099 if (executor == null) { 100 AbstractXMPPConnection.asyncGo(handler); 101 } else { 102 executor.execute(handler); 103 } 104 } 105 } 106 107 return newHandler; 108 } 109 110 public Executor asExecutorFor(final K key) { 111 return new Executor() { 112 @Override 113 public void execute(Runnable runnable) { 114 performAsyncButOrdered(key, runnable); 115 } 116 }; 117 } 118 119 private class Handler implements Runnable { 120 private final Queue<Runnable> keyQueue; 121 private final K key; 122 123 Handler(Queue<Runnable> keyQueue, K key) { 124 this.keyQueue = keyQueue; 125 this.key = key; 126 } 127 128 @Override 129 public void run() { 130 mainloop: 131 while (true) { 132 Runnable runnable = null; 133 while ((runnable = keyQueue.poll()) != null) { 134 try { 135 runnable.run(); 136 } catch (Throwable t) { 137 // The run() method threw, this handler thread is going to terminate because of that. Ensure we note 138 // that in the map. 139 synchronized (threadActiveMap) { 140 threadActiveMap.put(key, false); 141 } 142 throw t; 143 } 144 } 145 146 synchronized (threadActiveMap) { 147 // If the queue is empty, stop this handler, otherwise continue looping. 148 if (keyQueue.isEmpty()) { 149 threadActiveMap.put(key, false); 150 break mainloop; 151 } 152 } 153 } 154 } 155 } 156}