001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.Writer; 028import java.lang.reflect.Constructor; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.net.Socket; 032import java.security.KeyManagementException; 033import java.security.KeyStore; 034import java.security.KeyStoreException; 035import java.security.NoSuchAlgorithmException; 036import java.security.NoSuchProviderException; 037import java.security.Provider; 038import java.security.SecureRandom; 039import java.security.Security; 040import java.security.UnrecoverableKeyException; 041import java.security.cert.CertificateException; 042import java.util.ArrayList; 043import java.util.Collection; 044import java.util.Iterator; 045import java.util.LinkedHashSet; 046import java.util.LinkedList; 047import java.util.List; 048import java.util.Map; 049import java.util.Set; 050import java.util.concurrent.ArrayBlockingQueue; 051import java.util.concurrent.BlockingQueue; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentLinkedQueue; 054import java.util.concurrent.Semaphore; 055import java.util.concurrent.TimeUnit; 056import java.util.concurrent.atomic.AtomicBoolean; 057import java.util.logging.Level; 058import java.util.logging.Logger; 059 060import javax.net.SocketFactory; 061import javax.net.ssl.HostnameVerifier; 062import javax.net.ssl.KeyManager; 063import javax.net.ssl.KeyManagerFactory; 064import javax.net.ssl.SSLContext; 065import javax.net.ssl.SSLSession; 066import javax.net.ssl.SSLSocket; 067import javax.net.ssl.TrustManager; 068import javax.net.ssl.X509TrustManager; 069import javax.security.auth.callback.Callback; 070import javax.security.auth.callback.CallbackHandler; 071import javax.security.auth.callback.PasswordCallback; 072 073import org.jivesoftware.smack.AbstractConnectionListener; 074import org.jivesoftware.smack.AbstractXMPPConnection; 075import org.jivesoftware.smack.ConnectionConfiguration; 076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode; 077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 078import org.jivesoftware.smack.SmackConfiguration; 079import org.jivesoftware.smack.SmackException; 080import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 082import org.jivesoftware.smack.SmackException.ConnectionException; 083import org.jivesoftware.smack.SmackException.NoResponseException; 084import org.jivesoftware.smack.SmackException.NotConnectedException; 085import org.jivesoftware.smack.SmackException.NotLoggedInException; 086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 087import org.jivesoftware.smack.SmackException.SmackWrappedException; 088import org.jivesoftware.smack.SmackFuture; 089import org.jivesoftware.smack.StanzaListener; 090import org.jivesoftware.smack.SynchronizationPoint; 091import org.jivesoftware.smack.XMPPConnection; 092import org.jivesoftware.smack.XMPPException; 093import org.jivesoftware.smack.XMPPException.FailedNonzaException; 094import org.jivesoftware.smack.XMPPException.StreamErrorException; 095import org.jivesoftware.smack.compress.packet.Compress; 096import org.jivesoftware.smack.compress.packet.Compressed; 097import org.jivesoftware.smack.compression.XMPPInputOutputStream; 098import org.jivesoftware.smack.filter.StanzaFilter; 099import org.jivesoftware.smack.packet.Element; 100import org.jivesoftware.smack.packet.IQ; 101import org.jivesoftware.smack.packet.Message; 102import org.jivesoftware.smack.packet.Nonza; 103import org.jivesoftware.smack.packet.Presence; 104import org.jivesoftware.smack.packet.Stanza; 105import org.jivesoftware.smack.packet.StartTls; 106import org.jivesoftware.smack.packet.StreamError; 107import org.jivesoftware.smack.packet.StreamOpen; 108import org.jivesoftware.smack.proxy.ProxyInfo; 109import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 113import org.jivesoftware.smack.sm.SMUtils; 114import org.jivesoftware.smack.sm.StreamManagementException; 115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 118import org.jivesoftware.smack.sm.packet.StreamManagement; 119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 127import org.jivesoftware.smack.sm.predicates.Predicate; 128import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 130import org.jivesoftware.smack.util.Async; 131import org.jivesoftware.smack.util.DNSUtil; 132import org.jivesoftware.smack.util.PacketParserUtils; 133import org.jivesoftware.smack.util.StringUtils; 134import org.jivesoftware.smack.util.TLSUtils; 135import org.jivesoftware.smack.util.XmlStringBuilder; 136import org.jivesoftware.smack.util.dns.HostAddress; 137import org.jivesoftware.smack.util.dns.SmackDaneProvider; 138import org.jivesoftware.smack.util.dns.SmackDaneVerifier; 139 140import org.jxmpp.jid.impl.JidCreate; 141import org.jxmpp.jid.parts.Resourcepart; 142import org.jxmpp.stringprep.XmppStringprepException; 143import org.jxmpp.util.XmppStringUtils; 144import org.minidns.dnsname.DnsName; 145import org.xmlpull.v1.XmlPullParser; 146import org.xmlpull.v1.XmlPullParserException; 147 148/** 149 * Creates a socket connection to an XMPP server. This is the default connection 150 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 151 * 152 * @see XMPPConnection 153 * @author Matt Tucker 154 */ 155public class XMPPTCPConnection extends AbstractXMPPConnection { 156 157 private static final int QUEUE_SIZE = 500; 158 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 159 160 /** 161 * The socket which is used for this connection. 162 */ 163 private Socket socket; 164 165 /** 166 * 167 */ 168 private boolean disconnectedButResumeable = false; 169 170 private SSLSocket secureSocket; 171 172 private final Semaphore readerWriterSemaphore = new Semaphore(2); 173 174 /** 175 * Protected access level because of unit test purposes 176 */ 177 protected final PacketWriter packetWriter = new PacketWriter(); 178 179 /** 180 * Protected access level because of unit test purposes 181 */ 182 protected final PacketReader packetReader = new PacketReader(); 183 184 private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>( 185 this, "initial open stream element send to server"); 186 187 /** 188 * 189 */ 190 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 191 this, "stream compression feature"); 192 193 /** 194 * 195 */ 196 private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>( 197 this, "stream compression"); 198 199 /** 200 * A synchronization point which is successful if this connection has received the closing 201 * stream element from the remote end-point, i.e. the server. 202 */ 203 private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>( 204 this, "stream closing element received"); 205 206 /** 207 * The default bundle and defer callback, used for new connections. 208 * @see bundleAndDeferCallback 209 */ 210 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 211 212 /** 213 * The used bundle and defer callback. 214 * <p> 215 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 216 * having a 'volatile' read within the writer threads loop. 217 * </p> 218 */ 219 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 220 221 private static boolean useSmDefault = true; 222 223 private static boolean useSmResumptionDefault = true; 224 225 /** 226 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 227 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 228 * {@link #unacknowledgedStanzas}. 229 */ 230 private String smSessionId; 231 232 private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>( 233 this, "stream resumed element"); 234 235 private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>( 236 this, "stream enabled element"); 237 238 /** 239 * The client's preferred maximum resumption time in seconds. 240 */ 241 private int smClientMaxResumptionTime = -1; 242 243 /** 244 * The server's preferred maximum resumption time in seconds. 245 */ 246 private int smServerMaxResumptionTime = -1; 247 248 /** 249 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 250 */ 251 private boolean useSm = useSmDefault; 252 private boolean useSmResumption = useSmResumptionDefault; 253 254 /** 255 * The counter that the server sends the client about it's current height. For example, if the server sends 256 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 257 */ 258 private long serverHandledStanzasCount = 0; 259 260 /** 261 * The counter for stanzas handled ("received") by the client. 262 * <p> 263 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 264 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 265 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 266 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 267 * </p> 268 */ 269 private long clientHandledStanzasCount = 0; 270 271 private BlockingQueue<Stanza> unacknowledgedStanzas; 272 273 /** 274 * Set to true if Stream Management was at least once enabled for this connection. 275 */ 276 private boolean smWasEnabledAtLeastOnce = false; 277 278 /** 279 * This listeners are invoked for every stanza that got acknowledged. 280 * <p> 281 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 282 * themselves after they have been invoked. 283 * </p> 284 */ 285 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 286 287 /** 288 * These listeners are invoked for every stanza that got dropped. 289 * <p> 290 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 291 * themselves after they have been invoked. 292 * </p> 293 */ 294 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 295 296 /** 297 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 298 * only be invoked once and automatically removed after that. 299 */ 300 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 301 302 /** 303 * Predicates that determine if an stream management ack should be requested from the server. 304 * <p> 305 * We use a linked hash set here, so that the order how the predicates are added matches the 306 * order in which they are invoked in order to determine if an ack request should be send or not. 307 * </p> 308 */ 309 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 310 311 @SuppressWarnings("HidingField") 312 private final XMPPTCPConnectionConfiguration config; 313 314 /** 315 * Creates a new XMPP connection over TCP (optionally using proxies). 316 * <p> 317 * Note that XMPPTCPConnection constructors do not establish a connection to the server 318 * and you must call {@link #connect()}. 319 * </p> 320 * 321 * @param config the connection configuration. 322 */ 323 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 324 super(config); 325 this.config = config; 326 addConnectionListener(new AbstractConnectionListener() { 327 @Override 328 public void connectionClosedOnError(Exception e) { 329 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 330 dropSmState(); 331 } 332 } 333 }); 334 } 335 336 /** 337 * Creates a new XMPP connection over TCP. 338 * <p> 339 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 340 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 341 * constructor. 342 * </p> 343 * 344 * @param jid the bare JID used by the client. 345 * @param password the password or authentication token. 346 * @throws XmppStringprepException 347 */ 348 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 349 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 350 } 351 352 /** 353 * Creates a new XMPP connection over TCP. 354 * <p> 355 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 356 * you can get fine-grained control over connection settings using the 357 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 358 * </p> 359 * @param username 360 * @param password 361 * @param serviceName 362 * @throws XmppStringprepException 363 */ 364 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 365 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 366 JidCreate.domainBareFrom(serviceName)).build()); 367 } 368 369 @Override 370 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 371 if (packetWriter == null) { 372 throw new NotConnectedException(); 373 } 374 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 375 } 376 377 @Override 378 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 379 if (isConnected() && !disconnectedButResumeable) { 380 throw new AlreadyConnectedException(); 381 } 382 } 383 384 @Override 385 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 386 if (isAuthenticated() && !disconnectedButResumeable) { 387 throw new AlreadyLoggedInException(); 388 } 389 } 390 391 @Override 392 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 393 // Reset the flag in case it was set 394 disconnectedButResumeable = false; 395 super.afterSuccessfulLogin(resumed); 396 } 397 398 @Override 399 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 400 SmackException, IOException, InterruptedException { 401 // Authenticate using SASL 402 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 403 saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); 404 405 // Wait for stream features after the authentication. 406 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 407 // renamed to "streamFeaturesAfterAuthenticationReceived". 408 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 409 410 // If compression is enabled then request the server to use stream compression. XEP-170 411 // recommends to perform stream compression before resource binding. 412 maybeEnableCompression(); 413 414 if (isSmResumptionPossible()) { 415 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 416 if (smResumedSyncPoint.wasSuccessful()) { 417 // We successfully resumed the stream, be done here 418 afterSuccessfulLogin(true); 419 return; 420 } 421 // SM resumption failed, what Smack does here is to report success of 422 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 423 // normal resource binding can be tried. 424 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 425 } 426 427 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 428 if (unacknowledgedStanzas != null) { 429 // There was a previous connection with SM enabled but that was either not resumable or 430 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 431 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 432 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 433 // XMPP session (There maybe was an enabled in a previous XMPP session of this 434 // connection instance though). This is used in writePackets to decide if stanzas should 435 // be added to the unacknowledged stanzas queue, because they have to be added right 436 // after the 'enable' stream element has been sent. 437 dropSmState(); 438 } 439 440 // Now bind the resource. It is important to do this *after* we dropped an eventually 441 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 442 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 443 bindResourceAndEstablishSession(resource); 444 445 if (isSmAvailable() && useSm) { 446 // Remove what is maybe left from previously stream managed sessions 447 serverHandledStanzasCount = 0; 448 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 449 // then this is a non recoverable error and we therefore throw an exception. 450 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 451 synchronized (requestAckPredicates) { 452 if (requestAckPredicates.isEmpty()) { 453 // Assure that we have at lest one predicate set up that so that we request acks 454 // for the server and eventually flush some stanzas from the unacknowledged 455 // stanza queue 456 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 457 } 458 } 459 } 460 // Inform client about failed resumption if possible, resend stanzas otherwise 461 // Process the stanzas synchronously so a client can re-queue them for transmission 462 // before it is informed about connection success 463 if (!stanzaDroppedListeners.isEmpty()) { 464 for (Stanza stanza : previouslyUnackedStanzas) { 465 for (StanzaListener listener : stanzaDroppedListeners) { 466 try { 467 listener.processStanza(stanza); 468 } 469 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 470 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 471 } 472 } 473 } 474 } else { 475 for (Stanza stanza : previouslyUnackedStanzas) { 476 sendStanzaInternal(stanza); 477 } 478 } 479 480 afterSuccessfulLogin(false); 481 } 482 483 @Override 484 public boolean isSecureConnection() { 485 return secureSocket != null; 486 } 487 488 /** 489 * Shuts the current connection down. After this method returns, the connection must be ready 490 * for re-use by connect. 491 */ 492 @Override 493 protected void shutdown() { 494 if (isSmEnabled()) { 495 try { 496 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 497 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 498 sendSmAcknowledgementInternal(); 499 } catch (InterruptedException | NotConnectedException e) { 500 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 501 } 502 } 503 shutdown(false); 504 } 505 506 @Override 507 public synchronized void instantShutdown() { 508 shutdown(true); 509 } 510 511 private void shutdown(boolean instant) { 512 // First shutdown the writer, this will result in a closing stream element getting send to 513 // the server 514 LOGGER.finer("PacketWriter shutdown()"); 515 packetWriter.shutdown(instant); 516 LOGGER.finer("PacketWriter has been shut down"); 517 518 if (!instant) { 519 try { 520 // After we send the closing stream element, check if there was already a 521 // closing stream element sent by the server or wait with a timeout for a 522 // closing stream element to be received from the server. 523 @SuppressWarnings("unused") 524 Exception res = closingStreamReceived.checkIfSuccessOrWait(); 525 } catch (InterruptedException | NoResponseException e) { 526 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 527 } 528 } 529 530 LOGGER.finer("PacketReader shutdown()"); 531 packetReader.shutdown(); 532 LOGGER.finer("PacketReader has been shut down"); 533 534 final Socket socket = this.socket; 535 if (socket != null && socket.isConnected()) { 536 try { 537 socket.close(); 538 } catch (Exception e) { 539 LOGGER.log(Level.WARNING, "shutdown", e); 540 } 541 } 542 543 setWasAuthenticated(); 544 545 // Wait for reader and writer threads to be terminated. 546 readerWriterSemaphore.acquireUninterruptibly(2); 547 readerWriterSemaphore.release(2); 548 549 if (disconnectedButResumeable) { 550 return; 551 } 552 553 // If we are able to resume the stream, then don't set 554 // connected/authenticated/usingTLS to false since we like behave like we are still 555 // connected (e.g. sendStanza should not throw a NotConnectedException). 556 if (isSmResumptionPossible() && instant) { 557 disconnectedButResumeable = true; 558 } else { 559 disconnectedButResumeable = false; 560 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 561 // stream tag, there is no longer a stream to resume. 562 smSessionId = null; 563 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 564 // information is available in the connectionClosedOnError() listeners. 565 } 566 authenticated = false; 567 connected = false; 568 secureSocket = null; 569 reader = null; 570 writer = null; 571 572 initState(); 573 } 574 575 @Override 576 protected void initState() { 577 super.initState(); 578 maybeCompressFeaturesReceived.init(); 579 compressSyncPoint.init(); 580 smResumedSyncPoint.init(); 581 smEnabledSyncPoint.init(); 582 initialOpenStreamSend.init(); 583 } 584 585 @Override 586 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 587 packetWriter.sendStreamElement(element); 588 } 589 590 @Override 591 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 592 packetWriter.sendStreamElement(packet); 593 if (isSmEnabled()) { 594 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 595 if (requestAckPredicate.accept(packet)) { 596 requestSmAcknowledgementInternal(); 597 break; 598 } 599 } 600 } 601 } 602 603 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 604 List<HostAddress> failedAddresses = populateHostAddresses(); 605 SocketFactory socketFactory = config.getSocketFactory(); 606 ProxyInfo proxyInfo = config.getProxyInfo(); 607 int timeout = config.getConnectTimeout(); 608 if (socketFactory == null) { 609 socketFactory = SocketFactory.getDefault(); 610 } 611 for (HostAddress hostAddress : hostAddresses) { 612 Iterator<InetAddress> inetAddresses; 613 String host = hostAddress.getHost(); 614 int port = hostAddress.getPort(); 615 if (proxyInfo == null) { 616 inetAddresses = hostAddress.getInetAddresses().iterator(); 617 assert (inetAddresses.hasNext()); 618 619 innerloop: while (inetAddresses.hasNext()) { 620 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 621 // re-usable after a failed connection attempt. See also SMACK-724. 622 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 623 624 final InetAddress inetAddress = inetAddresses.next(); 625 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 626 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 627 socketFuture.connectAsync(inetSocketAddress, timeout); 628 629 try { 630 socket = socketFuture.getOrThrow(); 631 } catch (IOException e) { 632 hostAddress.setException(inetAddress, e); 633 if (inetAddresses.hasNext()) { 634 continue innerloop; 635 } else { 636 break innerloop; 637 } 638 } 639 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 640 // We found a host to connect to, return here 641 this.host = host; 642 this.port = port; 643 return; 644 } 645 failedAddresses.add(hostAddress); 646 } else { 647 socket = socketFactory.createSocket(); 648 StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy"); 649 final String hostAndPort = host + " at port " + port; 650 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 651 try { 652 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 653 } catch (IOException e) { 654 hostAddress.setException(e); 655 failedAddresses.add(hostAddress); 656 continue; 657 } 658 LOGGER.finer("Established TCP connection to " + hostAndPort); 659 // We found a host to connect to, return here 660 this.host = host; 661 this.port = port; 662 return; 663 } 664 } 665 // There are no more host addresses to try 666 // throw an exception and report all tried 667 // HostAddresses in the exception 668 throw ConnectionException.from(failedAddresses); 669 } 670 671 /** 672 * Initializes the connection by creating a stanza reader and writer and opening a 673 * XMPP stream to the server. 674 * 675 * @throws XMPPException if establishing a connection to the server fails. 676 * @throws SmackException if the server fails to respond back or if there is anther error. 677 * @throws IOException 678 * @throws InterruptedException 679 */ 680 private void initConnection() throws IOException, InterruptedException { 681 compressionHandler = null; 682 683 // Set the reader and writer instance variables 684 initReaderAndWriter(); 685 686 int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); 687 if (availableReaderWriterSemaphorePermits < 2) { 688 Object[] logObjects = new Object[] { 689 this, 690 availableReaderWriterSemaphorePermits, 691 }; 692 LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); 693 } 694 readerWriterSemaphore.acquire(2); 695 // Start the writer thread. This will open an XMPP stream to the server 696 packetWriter.init(); 697 // Start the reader thread. The startup() method will block until we 698 // get an opening stream packet back from server 699 packetReader.init(); 700 } 701 702 private void initReaderAndWriter() throws IOException { 703 InputStream is = socket.getInputStream(); 704 OutputStream os = socket.getOutputStream(); 705 if (compressionHandler != null) { 706 is = compressionHandler.getInputStream(is); 707 os = compressionHandler.getOutputStream(os); 708 } 709 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 710 writer = new OutputStreamWriter(os, "UTF-8"); 711 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 712 713 // If debugging is enabled, we open a window and write out all network traffic. 714 initDebugger(); 715 } 716 717 /** 718 * The server has indicated that TLS negotiation can start. We now need to secure the 719 * existing plain connection and perform a handshake. This method won't return until the 720 * connection has finished the handshake or an error occurred while securing the connection. 721 * @throws IOException 722 * @throws CertificateException 723 * @throws NoSuchAlgorithmException 724 * @throws NoSuchProviderException 725 * @throws KeyStoreException 726 * @throws UnrecoverableKeyException 727 * @throws KeyManagementException 728 * @throws SmackException 729 * @throws Exception if an exception occurs. 730 */ 731 @SuppressWarnings("LiteralClassName") 732 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 733 SmackDaneVerifier daneVerifier = null; 734 735 if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) { 736 SmackDaneProvider daneProvider = DNSUtil.getDaneProvider(); 737 if (daneProvider == null) { 738 throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured"); 739 } 740 daneVerifier = daneProvider.newInstance(); 741 if (daneVerifier == null) { 742 throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier"); 743 } 744 } 745 746 SSLContext context = this.config.getCustomSSLContext(); 747 KeyStore ks = null; 748 PasswordCallback pcb = null; 749 750 if (context == null) { 751 final String keyStoreType = config.getKeystoreType(); 752 final CallbackHandler callbackHandler = config.getCallbackHandler(); 753 final String keystorePath = config.getKeystorePath(); 754 if ("PKCS11".equals(keyStoreType)) { 755 try { 756 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 757 String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library(); 758 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8)); 759 Provider p = (Provider) c.newInstance(config); 760 Security.addProvider(p); 761 ks = KeyStore.getInstance("PKCS11",p); 762 pcb = new PasswordCallback("PKCS11 Password: ",false); 763 callbackHandler.handle(new Callback[] {pcb}); 764 ks.load(null,pcb.getPassword()); 765 } 766 catch (Exception e) { 767 LOGGER.log(Level.WARNING, "Exception", e); 768 ks = null; 769 } 770 } 771 else if ("Apple".equals(keyStoreType)) { 772 ks = KeyStore.getInstance("KeychainStore","Apple"); 773 ks.load(null,null); 774 // pcb = new PasswordCallback("Apple Keychain",false); 775 // pcb.setPassword(null); 776 } 777 else if (keyStoreType != null) { 778 ks = KeyStore.getInstance(keyStoreType); 779 if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) { 780 try { 781 pcb = new PasswordCallback("Keystore Password: ", false); 782 callbackHandler.handle(new Callback[] { pcb }); 783 ks.load(new FileInputStream(keystorePath), pcb.getPassword()); 784 } 785 catch (Exception e) { 786 LOGGER.log(Level.WARNING, "Exception", e); 787 ks = null; 788 } 789 } else { 790 ks.load(null, null); 791 } 792 } 793 794 KeyManager[] kms = null; 795 796 if (ks != null) { 797 String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); 798 KeyManagerFactory kmf = null; 799 try { 800 kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); 801 } 802 catch (NoSuchAlgorithmException e) { 803 LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '" 804 + keyManagerFactoryAlgorithm + "' algorithm", e); 805 } 806 if (kmf != null) { 807 try { 808 if (pcb == null) { 809 kmf.init(ks, null); 810 } 811 else { 812 kmf.init(ks, pcb.getPassword()); 813 pcb.clearPassword(); 814 } 815 kms = kmf.getKeyManagers(); 816 } 817 catch (NullPointerException npe) { 818 LOGGER.log(Level.WARNING, "NullPointerException", npe); 819 } 820 } 821 } 822 823 // If the user didn't specify a SSLContext, use the default one 824 context = SSLContext.getInstance("TLS"); 825 826 final SecureRandom secureRandom = new java.security.SecureRandom(); 827 X509TrustManager customTrustManager = config.getCustomX509TrustManager(); 828 829 if (daneVerifier != null) { 830 // User requested DANE verification. 831 daneVerifier.init(context, kms, customTrustManager, secureRandom); 832 } else { 833 TrustManager[] customTrustManagers = null; 834 if (customTrustManager != null) { 835 customTrustManagers = new TrustManager[] { customTrustManager }; 836 } 837 context.init(kms, customTrustManagers, secureRandom); 838 } 839 } 840 841 Socket plain = socket; 842 // Secure the plain connection 843 socket = context.getSocketFactory().createSocket(plain, 844 config.getXMPPServiceDomain().toString(), plain.getPort(), true); 845 846 final SSLSocket sslSocket = (SSLSocket) socket; 847 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 848 // important (at least on certain platforms) and it seems to be a good idea anyways to 849 // prevent an accidental implicit handshake. 850 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 851 852 // Initialize the reader and writer with the new secured version 853 initReaderAndWriter(); 854 855 // Proceed to do the handshake 856 sslSocket.startHandshake(); 857 858 if (daneVerifier != null) { 859 daneVerifier.finish(sslSocket); 860 } 861 862 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 863 if (verifier == null) { 864 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 865 } 866 867 final String verifierHostname; 868 { 869 DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible(); 870 // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII 871 // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint. 872 // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1 873 if (xmppServiceDomainDnsName != null) { 874 verifierHostname = xmppServiceDomainDnsName.ace; 875 } 876 else { 877 LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain() 878 + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail."); 879 verifierHostname = getXMPPServiceDomain().toString(); 880 } 881 } 882 883 final boolean verificationSuccessful; 884 // Verify the TLS session. 885 verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession()); 886 if (!verificationSuccessful) { 887 throw new CertificateException( 888 "Hostname verification of certificate failed. Certificate does not authenticate " 889 + getXMPPServiceDomain()); 890 } 891 892 // Set that TLS was successful 893 secureSocket = sslSocket; 894 } 895 896 /** 897 * Returns the compression handler that can be used for one compression methods offered by the server. 898 * 899 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 900 * 901 */ 902 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 903 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 904 String method = handler.getCompressionMethod(); 905 if (compression.getMethods().contains(method)) 906 return handler; 907 } 908 return null; 909 } 910 911 @Override 912 public boolean isUsingCompression() { 913 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 914 } 915 916 /** 917 * <p> 918 * Starts using stream compression that will compress network traffic. Traffic can be 919 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 920 * connection. However, the server and the client will need to use more CPU time in order to 921 * un/compress network data so under high load the server performance might be affected. 922 * </p> 923 * <p> 924 * Stream compression has to have been previously offered by the server. Currently only the 925 * zlib method is supported by the client. Stream compression negotiation has to be done 926 * before authentication took place. 927 * </p> 928 * 929 * @throws NotConnectedException 930 * @throws SmackException 931 * @throws NoResponseException 932 * @throws InterruptedException 933 */ 934 private void maybeEnableCompression() throws SmackException, InterruptedException { 935 if (!config.isCompressionEnabled()) { 936 return; 937 } 938 939 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 940 if (compression == null) { 941 // Server does not support compression 942 return; 943 } 944 // If stream compression was offered by the server and we want to use 945 // compression then send compression request to the server 946 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 947 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 948 } else { 949 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 950 } 951 } 952 953 /** 954 * Establishes a connection to the XMPP server. It basically 955 * creates and maintains a socket connection to the server. 956 * <p> 957 * Listeners will be preserved from a previous connection if the reconnection 958 * occurs after an abrupt termination. 959 * </p> 960 * 961 * @throws XMPPException if an error occurs while trying to establish the connection. 962 * @throws SmackException 963 * @throws IOException 964 * @throws InterruptedException 965 */ 966 @Override 967 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 968 closingStreamReceived.init(); 969 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 970 // there is an error establishing the connection 971 connectUsingConfiguration(); 972 973 // We connected successfully to the servers TCP port 974 initConnection(); 975 976 // TLS handled will be successful either if TLS was established, or if it was not mandatory. 977 tlsHandled.checkIfSuccessOrWaitOrThrow(); 978 979 // Wait with SASL auth until the SASL mechanisms have been received 980 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 981 } 982 983 /** 984 * Sends out a notification that there was an error with the connection 985 * and closes the connection. Also prints the stack trace of the given exception 986 * 987 * @param e the exception that causes the connection close event. 988 */ 989 private void notifyConnectionError(final Exception e) { 990 ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { 991 @Override 992 public void run() { 993 // Listeners were already notified of the exception, return right here. 994 if (packetReader.done || packetWriter.done()) return; 995 996 // Report the failure outside the synchronized block, so that a thread waiting within a synchronized 997 // function like connect() throws the wrapped exception. 998 SmackWrappedException smackWrappedException = new SmackWrappedException(e); 999 tlsHandled.reportGenericFailure(smackWrappedException); 1000 saslFeatureReceived.reportGenericFailure(smackWrappedException); 1001 maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException); 1002 lastFeaturesReceived.reportGenericFailure(smackWrappedException); 1003 1004 synchronized (XMPPTCPConnection.this) { 1005 // Within this synchronized block, either *both* reader and writer threads must be terminated, or 1006 // none. 1007 assert ((packetReader.done && packetWriter.done()) 1008 || (!packetReader.done && !packetWriter.done())); 1009 1010 // Closes the connection temporary. A reconnection is possible 1011 // Note that a connection listener of XMPPTCPConnection will drop the SM state in 1012 // case the Exception is a StreamErrorException. 1013 instantShutdown(); 1014 } 1015 1016 Async.go(new Runnable() { 1017 @Override 1018 public void run() { 1019 // Notify connection listeners of the error. 1020 callConnectionClosedOnErrorListener(e); 1021 } 1022 }, XMPPTCPConnection.this + " callConnectionClosedOnErrorListener()"); 1023 } 1024 }); 1025 } 1026 1027 /** 1028 * For unit testing purposes 1029 * 1030 * @param writer 1031 */ 1032 protected void setWriter(Writer writer) { 1033 this.writer = writer; 1034 } 1035 1036 @Override 1037 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 1038 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 1039 if (startTlsFeature != null) { 1040 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 1041 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 1042 tlsHandled.reportFailure(smackException); 1043 throw smackException; 1044 } 1045 1046 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 1047 sendNonza(new StartTls()); 1048 } else { 1049 tlsHandled.reportSuccess(); 1050 } 1051 } else { 1052 tlsHandled.reportSuccess(); 1053 } 1054 1055 if (getSASLAuthentication().authenticationSuccessful()) { 1056 // If we have received features after the SASL has been successfully completed, then we 1057 // have also *maybe* received, as it is an optional feature, the compression feature 1058 // from the server. 1059 maybeCompressFeaturesReceived.reportSuccess(); 1060 } 1061 } 1062 1063 /** 1064 * Resets the parser using the latest connection's reader. Resetting the parser is necessary 1065 * when the plain connection has been secured or when a new opening stream element is going 1066 * to be sent by the server. 1067 * 1068 * @throws SmackException if the parser could not be reset. 1069 * @throws InterruptedException 1070 */ 1071 void openStream() throws SmackException, InterruptedException { 1072 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 1073 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 1074 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 1075 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 1076 CharSequence to = getXMPPServiceDomain(); 1077 CharSequence from = null; 1078 CharSequence localpart = config.getUsername(); 1079 if (localpart != null) { 1080 from = XmppStringUtils.completeJidFrom(localpart, to); 1081 } 1082 String id = getStreamId(); 1083 sendNonza(new StreamOpen(to, from, id)); 1084 try { 1085 packetReader.parser = PacketParserUtils.newXmppParser(reader); 1086 } 1087 catch (XmlPullParserException e) { 1088 throw new SmackException(e); 1089 } 1090 } 1091 1092 protected class PacketReader { 1093 1094 private final String threadName = "Smack Reader (" + getConnectionCounter() + ')'; 1095 1096 XmlPullParser parser; 1097 1098 private volatile boolean done; 1099 1100 /** 1101 * Initializes the reader in order to be used. The reader is initialized during the 1102 * first connection and when reconnecting due to an abruptly disconnection. 1103 */ 1104 void init() { 1105 done = false; 1106 1107 Async.go(new Runnable() { 1108 @Override 1109 public void run() { 1110 LOGGER.finer(threadName + " start"); 1111 try { 1112 parsePackets(); 1113 } finally { 1114 LOGGER.finer(threadName + " exit"); 1115 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1116 } 1117 } 1118 }, threadName); 1119 } 1120 1121 /** 1122 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 1123 */ 1124 void shutdown() { 1125 done = true; 1126 } 1127 1128 /** 1129 * Parse top-level packets in order to process them further. 1130 */ 1131 private void parsePackets() { 1132 try { 1133 initialOpenStreamSend.checkIfSuccessOrWait(); 1134 int eventType = parser.getEventType(); 1135 while (!done) { 1136 switch (eventType) { 1137 case XmlPullParser.START_TAG: 1138 final String name = parser.getName(); 1139 switch (name) { 1140 case Message.ELEMENT: 1141 case IQ.IQ_ELEMENT: 1142 case Presence.ELEMENT: 1143 try { 1144 parseAndProcessStanza(parser); 1145 } finally { 1146 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1147 } 1148 break; 1149 case "stream": 1150 // We found an opening stream. 1151 if ("jabber:client".equals(parser.getNamespace(null))) { 1152 streamId = parser.getAttributeValue("", "id"); 1153 String reportedServerDomain = parser.getAttributeValue("", "from"); 1154 assert (config.getXMPPServiceDomain().equals(reportedServerDomain)); 1155 } 1156 break; 1157 case "error": 1158 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1159 saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); 1160 // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync 1161 // point to report the error, which is checked immediately after tlsHandled in 1162 // connectInternal(). 1163 tlsHandled.reportSuccess(); 1164 throw new StreamErrorException(streamError); 1165 case "features": 1166 parseFeatures(parser); 1167 break; 1168 case "proceed": 1169 try { 1170 // Secure the connection by negotiating TLS 1171 proceedTLSReceived(); 1172 // Send a new opening stream to the server 1173 openStream(); 1174 } 1175 catch (Exception e) { 1176 SmackException smackException = new SmackException(e); 1177 tlsHandled.reportFailure(smackException); 1178 throw e; 1179 } 1180 break; 1181 case "failure": 1182 String namespace = parser.getNamespace(null); 1183 switch (namespace) { 1184 case "urn:ietf:params:xml:ns:xmpp-tls": 1185 // TLS negotiation has failed. The server will close the connection 1186 // TODO Parse failure stanza 1187 throw new SmackException("TLS negotiation has failed"); 1188 case "http://jabber.org/protocol/compress": 1189 // Stream compression has been denied. This is a recoverable 1190 // situation. It is still possible to authenticate and 1191 // use the connection but using an uncompressed connection 1192 // TODO Parse failure stanza 1193 compressSyncPoint.reportFailure(new SmackException( 1194 "Could not establish compression")); 1195 break; 1196 case SaslStreamElements.NAMESPACE: 1197 // SASL authentication has failed. The server may close the connection 1198 // depending on the number of retries 1199 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1200 getSASLAuthentication().authenticationFailed(failure); 1201 break; 1202 } 1203 break; 1204 case Challenge.ELEMENT: 1205 // The server is challenging the SASL authentication made by the client 1206 String challengeData = parser.nextText(); 1207 getSASLAuthentication().challengeReceived(challengeData); 1208 break; 1209 case Success.ELEMENT: 1210 Success success = new Success(parser.nextText()); 1211 // We now need to bind a resource for the connection 1212 // Open a new stream and wait for the response 1213 openStream(); 1214 // The SASL authentication with the server was successful. The next step 1215 // will be to bind the resource 1216 getSASLAuthentication().authenticated(success); 1217 break; 1218 case Compressed.ELEMENT: 1219 // Server confirmed that it's possible to use stream compression. Start 1220 // stream compression 1221 // Initialize the reader and writer with the new compressed version 1222 initReaderAndWriter(); 1223 // Send a new opening stream to the server 1224 openStream(); 1225 // Notify that compression is being used 1226 compressSyncPoint.reportSuccess(); 1227 break; 1228 case Enabled.ELEMENT: 1229 Enabled enabled = ParseStreamManagement.enabled(parser); 1230 if (enabled.isResumeSet()) { 1231 smSessionId = enabled.getId(); 1232 if (StringUtils.isNullOrEmpty(smSessionId)) { 1233 SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received"); 1234 smEnabledSyncPoint.reportFailure(xmppException); 1235 throw xmppException; 1236 } 1237 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1238 } else { 1239 // Mark this a non-resumable stream by setting smSessionId to null 1240 smSessionId = null; 1241 } 1242 clientHandledStanzasCount = 0; 1243 smWasEnabledAtLeastOnce = true; 1244 smEnabledSyncPoint.reportSuccess(); 1245 LOGGER.fine("Stream Management (XEP-198): successfully enabled"); 1246 break; 1247 case Failed.ELEMENT: 1248 Failed failed = ParseStreamManagement.failed(parser); 1249 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1250 // If only XEP-198 would specify different failure elements for the SM 1251 // enable and SM resume failure case. But this is not the case, so we 1252 // need to determine if this is a 'Failed' response for either 'Enable' 1253 // or 'Resume'. 1254 if (smResumedSyncPoint.requestSent()) { 1255 smResumedSyncPoint.reportFailure(xmppException); 1256 } 1257 else { 1258 if (!smEnabledSyncPoint.requestSent()) { 1259 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1260 } 1261 smEnabledSyncPoint.reportFailure(new SmackException(xmppException)); 1262 // Report success for last lastFeaturesReceived so that in case a 1263 // failed resumption, we can continue with normal resource binding. 1264 // See text of XEP-198 5. below Example 11. 1265 lastFeaturesReceived.reportSuccess(); 1266 } 1267 break; 1268 case Resumed.ELEMENT: 1269 Resumed resumed = ParseStreamManagement.resumed(parser); 1270 if (!smSessionId.equals(resumed.getPrevId())) { 1271 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1272 } 1273 // Mark SM as enabled 1274 smEnabledSyncPoint.reportSuccess(); 1275 // First, drop the stanzas already handled by the server 1276 processHandledCount(resumed.getHandledCount()); 1277 // Then re-send what is left in the unacknowledged queue 1278 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1279 unacknowledgedStanzas.drainTo(stanzasToResend); 1280 for (Stanza stanza : stanzasToResend) { 1281 sendStanzaInternal(stanza); 1282 } 1283 // If there where stanzas resent, then request a SM ack for them. 1284 // Writer's sendStreamElement() won't do it automatically based on 1285 // predicates. 1286 if (!stanzasToResend.isEmpty()) { 1287 requestSmAcknowledgementInternal(); 1288 } 1289 // Mark SM resumption as successful 1290 smResumedSyncPoint.reportSuccess(); 1291 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1292 break; 1293 case AckAnswer.ELEMENT: 1294 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1295 processHandledCount(ackAnswer.getHandledCount()); 1296 break; 1297 case AckRequest.ELEMENT: 1298 ParseStreamManagement.ackRequest(parser); 1299 if (smEnabledSyncPoint.wasSuccessful()) { 1300 sendSmAcknowledgementInternal(); 1301 } else { 1302 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1303 } 1304 break; 1305 default: 1306 LOGGER.warning("Unknown top level stream element: " + name); 1307 break; 1308 } 1309 break; 1310 case XmlPullParser.END_TAG: 1311 final String endTagName = parser.getName(); 1312 if ("stream".equals(endTagName)) { 1313 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1314 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1315 break; 1316 } 1317 1318 // Check if the queue was already shut down before reporting success on closing stream tag 1319 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1320 // did re-start the queue again, causing this writer to assume that the queue is not 1321 // shutdown, which results in a call to disconnect(). 1322 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1323 closingStreamReceived.reportSuccess(); 1324 1325 if (queueWasShutdown) { 1326 // We received a closing stream element *after* we initiated the 1327 // termination of the session by sending a closing stream element to 1328 // the server first 1329 return; 1330 } else { 1331 // We received a closing stream element from the server without us 1332 // sending a closing stream element first. This means that the 1333 // server wants to terminate the session, therefore disconnect 1334 // the connection 1335 LOGGER.info(XMPPTCPConnection.this 1336 + " received closing </stream> element." 1337 + " Server wants to terminate the connection, calling disconnect()"); 1338 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1339 @Override 1340 public void run() { 1341 disconnect(); 1342 }}); 1343 } 1344 } 1345 break; 1346 case XmlPullParser.END_DOCUMENT: 1347 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1348 // closing stream element before. 1349 throw new SmackException( 1350 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1351 } 1352 eventType = parser.next(); 1353 } 1354 } 1355 catch (Exception e) { 1356 closingStreamReceived.reportFailure(e); 1357 // The exception can be ignored if the the connection is 'done' 1358 // or if the it was caused because the socket got closed 1359 if (!(done || packetWriter.queue.isShutdown())) { 1360 // Close the connection and notify connection listeners of the 1361 // error. 1362 notifyConnectionError(e); 1363 } 1364 } 1365 } 1366 } 1367 1368 protected class PacketWriter { 1369 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1370 1371 private final String threadName = "Smack Writer (" + getConnectionCounter() + ')'; 1372 1373 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1374 QUEUE_SIZE, true); 1375 1376 /** 1377 * Needs to be protected for unit testing purposes. 1378 */ 1379 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>( 1380 XMPPTCPConnection.this, "shutdown completed"); 1381 1382 /** 1383 * If set, the stanza writer is shut down 1384 */ 1385 protected volatile Long shutdownTimestamp = null; 1386 1387 private volatile boolean instantShutdown; 1388 1389 /** 1390 * True if some preconditions are given to start the bundle and defer mechanism. 1391 * <p> 1392 * This will likely get set to true right after the start of the writer thread, because 1393 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1394 * this field to true. 1395 * </p> 1396 */ 1397 private boolean shouldBundleAndDefer; 1398 1399 /** 1400 * Initializes the writer in order to be used. It is called at the first connection and also 1401 * is invoked if the connection is disconnected by an error. 1402 */ 1403 void init() { 1404 shutdownDone.init(); 1405 shutdownTimestamp = null; 1406 1407 if (unacknowledgedStanzas != null) { 1408 // It's possible that there are new stanzas in the writer queue that 1409 // came in while we were disconnected but resumable, drain those into 1410 // the unacknowledged queue so that they get resent now 1411 drainWriterQueueToUnacknowledgedStanzas(); 1412 } 1413 1414 queue.start(); 1415 Async.go(new Runnable() { 1416 @Override 1417 public void run() { 1418 LOGGER.finer(threadName + " start"); 1419 try { 1420 writePackets(); 1421 } finally { 1422 LOGGER.finer(threadName + " exit"); 1423 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1424 } 1425 } 1426 }, threadName); 1427 } 1428 1429 private boolean done() { 1430 return shutdownTimestamp != null; 1431 } 1432 1433 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1434 final boolean done = done(); 1435 if (done) { 1436 final boolean smResumptionPossible = isSmResumptionPossible(); 1437 // Don't throw a NotConnectedException is there is an resumable stream available 1438 if (!smResumptionPossible) { 1439 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1440 + " smResumptionPossible=" + smResumptionPossible); 1441 } 1442 } 1443 } 1444 1445 /** 1446 * Sends the specified element to the server. 1447 * 1448 * @param element the element to send. 1449 * @throws NotConnectedException 1450 * @throws InterruptedException 1451 */ 1452 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1453 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1454 try { 1455 queue.put(element); 1456 } 1457 catch (InterruptedException e) { 1458 // put() may throw an InterruptedException for two reasons: 1459 // 1. If the queue was shut down 1460 // 2. If the thread was interrupted 1461 // so we have to check which is the case 1462 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1463 // If the method above did not throw, then the sending thread was interrupted 1464 throw e; 1465 } 1466 } 1467 1468 /** 1469 * Shuts down the stanza writer. Once this method has been called, no further 1470 * packets will be written to the server. 1471 * @throws InterruptedException 1472 */ 1473 void shutdown(boolean instant) { 1474 instantShutdown = instant; 1475 queue.shutdown(); 1476 shutdownTimestamp = System.currentTimeMillis(); 1477 if (shutdownDone.isNotInInitialState()) { 1478 try { 1479 shutdownDone.checkIfSuccessOrWait(); 1480 } catch (NoResponseException | InterruptedException e) { 1481 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1482 } 1483 } 1484 } 1485 1486 /** 1487 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1488 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1489 * that case. 1490 * 1491 * @return the next element for writing or null. 1492 */ 1493 private Element nextStreamElement() { 1494 // It is important the we check if the queue is empty before removing an element from it 1495 if (queue.isEmpty()) { 1496 shouldBundleAndDefer = true; 1497 } 1498 Element packet = null; 1499 try { 1500 packet = queue.take(); 1501 } 1502 catch (InterruptedException e) { 1503 if (!queue.isShutdown()) { 1504 // Users shouldn't try to interrupt the packet writer thread 1505 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1506 } 1507 } 1508 return packet; 1509 } 1510 1511 private void writePackets() { 1512 Exception writerException = null; 1513 try { 1514 openStream(); 1515 initialOpenStreamSend.reportSuccess(); 1516 // Write out packets from the queue. 1517 while (!done()) { 1518 Element element = nextStreamElement(); 1519 if (element == null) { 1520 continue; 1521 } 1522 1523 // Get a local version of the bundle and defer callback, in case it's unset 1524 // between the null check and the method invocation 1525 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1526 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1527 // empty), then we could wait a bit for further stanzas attempting to decrease 1528 // our energy consumption 1529 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1530 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1531 // queue is empty again. 1532 shouldBundleAndDefer = false; 1533 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1534 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1535 bundlingAndDeferringStopped)); 1536 if (bundleAndDeferMillis > 0) { 1537 long remainingWait = bundleAndDeferMillis; 1538 final long waitStart = System.currentTimeMillis(); 1539 synchronized (bundlingAndDeferringStopped) { 1540 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1541 bundlingAndDeferringStopped.wait(remainingWait); 1542 remainingWait = bundleAndDeferMillis 1543 - (System.currentTimeMillis() - waitStart); 1544 } 1545 } 1546 } 1547 } 1548 1549 Stanza packet = null; 1550 if (element instanceof Stanza) { 1551 packet = (Stanza) element; 1552 } 1553 else if (element instanceof Enable) { 1554 // The client needs to add messages to the unacknowledged stanzas queue 1555 // right after it sent 'enabled'. Stanza will be added once 1556 // unacknowledgedStanzas is not null. 1557 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1558 } 1559 maybeAddToUnacknowledgedStanzas(packet); 1560 1561 CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE); 1562 if (elementXml instanceof XmlStringBuilder) { 1563 ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE); 1564 } 1565 else { 1566 writer.write(elementXml.toString()); 1567 } 1568 1569 if (queue.isEmpty()) { 1570 writer.flush(); 1571 } 1572 if (packet != null) { 1573 firePacketSendingListeners(packet); 1574 } 1575 } 1576 if (!instantShutdown) { 1577 // Flush out the rest of the queue. 1578 try { 1579 while (!queue.isEmpty()) { 1580 Element packet = queue.remove(); 1581 if (packet instanceof Stanza) { 1582 Stanza stanza = (Stanza) packet; 1583 maybeAddToUnacknowledgedStanzas(stanza); 1584 } 1585 writer.write(packet.toXML(null).toString()); 1586 } 1587 writer.flush(); 1588 } 1589 catch (Exception e) { 1590 LOGGER.log(Level.WARNING, 1591 "Exception flushing queue during shutdown, ignore and continue", 1592 e); 1593 } 1594 1595 // Close the stream. 1596 try { 1597 writer.write("</stream:stream>"); 1598 writer.flush(); 1599 } 1600 catch (Exception e) { 1601 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1602 } 1603 1604 // Delete the queue contents (hopefully nothing is left). 1605 queue.clear(); 1606 } else if (instantShutdown && isSmEnabled()) { 1607 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1608 // into the unacknowledgedStanzas queue 1609 drainWriterQueueToUnacknowledgedStanzas(); 1610 } 1611 // Do *not* close the writer here, as it will cause the socket 1612 // to get closed. But we may want to receive further stanzas 1613 // until the closing stream tag is received. The socket will be 1614 // closed in shutdown(). 1615 } 1616 catch (Exception e) { 1617 // The exception can be ignored if the the connection is 'done' 1618 // or if the it was caused because the socket got closed 1619 if (!(done() || queue.isShutdown())) { 1620 writerException = e; 1621 } else { 1622 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1623 } 1624 } finally { 1625 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1626 shutdownDone.reportSuccess(); 1627 } 1628 // Delay notifyConnectionError after shutdownDone has been reported in the finally block. 1629 if (writerException != null) { 1630 notifyConnectionError(writerException); 1631 } 1632 } 1633 1634 private void drainWriterQueueToUnacknowledgedStanzas() { 1635 List<Element> elements = new ArrayList<>(queue.size()); 1636 queue.drainTo(elements); 1637 for (int i = 0; i < elements.size(); i++) { 1638 Element element = elements.get(i); 1639 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1640 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1641 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1642 .newWith(i, elements, unacknowledgedStanzas); 1643 LOGGER.log(Level.WARNING, 1644 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1645 return; 1646 } 1647 if (element instanceof Stanza) { 1648 unacknowledgedStanzas.add((Stanza) element); 1649 } 1650 } 1651 } 1652 1653 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1654 // Check if the stream element should be put to the unacknowledgedStanza 1655 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1656 // packet order is not stable at this point (sendStanzaInternal() can be 1657 // called concurrently). 1658 if (unacknowledgedStanzas != null && stanza != null) { 1659 // If the unacknowledgedStanza queue is nearly full, request an new ack 1660 // from the server in order to drain it 1661 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1662 writer.write(AckRequest.INSTANCE.toXML(null).toString()); 1663 writer.flush(); 1664 } 1665 try { 1666 // It is important the we put the stanza in the unacknowledged stanza 1667 // queue before we put it on the wire 1668 unacknowledgedStanzas.put(stanza); 1669 } 1670 catch (InterruptedException e) { 1671 throw new IllegalStateException(e); 1672 } 1673 } 1674 } 1675 } 1676 1677 /** 1678 * Set if Stream Management should be used by default for new connections. 1679 * 1680 * @param useSmDefault true to use Stream Management for new connections. 1681 */ 1682 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1683 XMPPTCPConnection.useSmDefault = useSmDefault; 1684 } 1685 1686 /** 1687 * Set if Stream Management resumption should be used by default for new connections. 1688 * 1689 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1690 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1691 */ 1692 @Deprecated 1693 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1694 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1695 } 1696 1697 /** 1698 * Set if Stream Management resumption should be used by default for new connections. 1699 * 1700 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1701 */ 1702 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1703 if (useSmResumptionDefault) { 1704 // Also enable SM is resumption is enabled 1705 setUseStreamManagementDefault(useSmResumptionDefault); 1706 } 1707 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1708 } 1709 1710 /** 1711 * Set if Stream Management should be used if supported by the server. 1712 * 1713 * @param useSm true to use Stream Management. 1714 */ 1715 public void setUseStreamManagement(boolean useSm) { 1716 this.useSm = useSm; 1717 } 1718 1719 /** 1720 * Set if Stream Management resumption should be used if supported by the server. 1721 * 1722 * @param useSmResumption true to use Stream Management resumption. 1723 */ 1724 public void setUseStreamManagementResumption(boolean useSmResumption) { 1725 if (useSmResumption) { 1726 // Also enable SM is resumption is enabled 1727 setUseStreamManagement(useSmResumption); 1728 } 1729 this.useSmResumption = useSmResumption; 1730 } 1731 1732 /** 1733 * Set the preferred resumption time in seconds. 1734 * @param resumptionTime the preferred resumption time in seconds 1735 */ 1736 public void setPreferredResumptionTime(int resumptionTime) { 1737 smClientMaxResumptionTime = resumptionTime; 1738 } 1739 1740 /** 1741 * Add a predicate for Stream Management acknowledgment requests. 1742 * <p> 1743 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1744 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1745 * </p> 1746 * <p> 1747 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1748 * </p> 1749 * 1750 * @param predicate the predicate to add. 1751 * @return if the predicate was not already active. 1752 */ 1753 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1754 synchronized (requestAckPredicates) { 1755 return requestAckPredicates.add(predicate); 1756 } 1757 } 1758 1759 /** 1760 * Remove the given predicate for Stream Management acknowledgment request. 1761 * @param predicate the predicate to remove. 1762 * @return true if the predicate was removed. 1763 */ 1764 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1765 synchronized (requestAckPredicates) { 1766 return requestAckPredicates.remove(predicate); 1767 } 1768 } 1769 1770 /** 1771 * Remove all predicates for Stream Management acknowledgment requests. 1772 */ 1773 public void removeAllRequestAckPredicates() { 1774 synchronized (requestAckPredicates) { 1775 requestAckPredicates.clear(); 1776 } 1777 } 1778 1779 /** 1780 * Send an unconditional Stream Management acknowledgement request to the server. 1781 * 1782 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1783 * @throws NotConnectedException if the connection is not connected. 1784 * @throws InterruptedException 1785 */ 1786 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1787 if (!isSmEnabled()) { 1788 throw new StreamManagementException.StreamManagementNotEnabledException(); 1789 } 1790 requestSmAcknowledgementInternal(); 1791 } 1792 1793 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1794 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1795 } 1796 1797 /** 1798 * Send a unconditional Stream Management acknowledgment to the server. 1799 * <p> 1800 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1801 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1802 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1803 * </p> 1804 * 1805 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1806 * @throws NotConnectedException if the connection is not connected. 1807 * @throws InterruptedException 1808 */ 1809 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1810 if (!isSmEnabled()) { 1811 throw new StreamManagementException.StreamManagementNotEnabledException(); 1812 } 1813 sendSmAcknowledgementInternal(); 1814 } 1815 1816 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1817 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1818 } 1819 1820 /** 1821 * Add a Stanza acknowledged listener. 1822 * <p> 1823 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1824 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1825 * possible. 1826 * </p> 1827 * 1828 * @param listener the listener to add. 1829 */ 1830 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1831 stanzaAcknowledgedListeners.add(listener); 1832 } 1833 1834 /** 1835 * Remove the given Stanza acknowledged listener. 1836 * 1837 * @param listener the listener. 1838 * @return true if the listener was removed. 1839 */ 1840 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1841 return stanzaAcknowledgedListeners.remove(listener); 1842 } 1843 1844 /** 1845 * Remove all stanza acknowledged listeners. 1846 */ 1847 public void removeAllStanzaAcknowledgedListeners() { 1848 stanzaAcknowledgedListeners.clear(); 1849 } 1850 1851 /** 1852 * Add a Stanza dropped listener. 1853 * <p> 1854 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1855 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1856 * the Stanzas. 1857 * </p> 1858 * 1859 * @param listener the listener to add. 1860 * @since 4.3.3 1861 */ 1862 public void addStanzaDroppedListener(StanzaListener listener) { 1863 stanzaDroppedListeners.add(listener); 1864 } 1865 1866 /** 1867 * Remove the given Stanza dropped listener. 1868 * 1869 * @param listener the listener. 1870 * @return true if the listener was removed. 1871 * @since 4.3.3 1872 */ 1873 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1874 return stanzaDroppedListeners.remove(listener); 1875 } 1876 1877 /** 1878 * Add a new Stanza ID acknowledged listener for the given ID. 1879 * <p> 1880 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1881 * automatically be removed after the listener was run. 1882 * </p> 1883 * 1884 * @param id the stanza ID. 1885 * @param listener the listener to invoke. 1886 * @return the previous listener for this stanza ID or null. 1887 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1888 */ 1889 @SuppressWarnings("FutureReturnValueIgnored") 1890 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1891 // Prevent users from adding callbacks that will never get removed 1892 if (!smWasEnabledAtLeastOnce) { 1893 throw new StreamManagementException.StreamManagementNotEnabledException(); 1894 } 1895 // Remove the listener after max. 3 hours 1896 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1897 schedule(new Runnable() { 1898 @Override 1899 public void run() { 1900 stanzaIdAcknowledgedListeners.remove(id); 1901 } 1902 }, removeAfterSeconds, TimeUnit.SECONDS); 1903 return stanzaIdAcknowledgedListeners.put(id, listener); 1904 } 1905 1906 /** 1907 * Remove the Stanza ID acknowledged listener for the given ID. 1908 * 1909 * @param id the stanza ID. 1910 * @return true if the listener was found and removed, false otherwise. 1911 */ 1912 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1913 return stanzaIdAcknowledgedListeners.remove(id); 1914 } 1915 1916 /** 1917 * Removes all Stanza ID acknowledged listeners. 1918 */ 1919 public void removeAllStanzaIdAcknowledgedListeners() { 1920 stanzaIdAcknowledgedListeners.clear(); 1921 } 1922 1923 /** 1924 * Returns true if Stream Management is supported by the server. 1925 * 1926 * @return true if Stream Management is supported by the server. 1927 */ 1928 public boolean isSmAvailable() { 1929 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1930 } 1931 1932 /** 1933 * Returns true if Stream Management was successfully negotiated with the server. 1934 * 1935 * @return true if Stream Management was negotiated. 1936 */ 1937 public boolean isSmEnabled() { 1938 return smEnabledSyncPoint.wasSuccessful(); 1939 } 1940 1941 /** 1942 * Returns true if the stream was successfully resumed with help of Stream Management. 1943 * 1944 * @return true if the stream was resumed. 1945 */ 1946 public boolean streamWasResumed() { 1947 return smResumedSyncPoint.wasSuccessful(); 1948 } 1949 1950 /** 1951 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1952 * 1953 * @return true if disconnected but resumption possible. 1954 */ 1955 public boolean isDisconnectedButSmResumptionPossible() { 1956 return disconnectedButResumeable && isSmResumptionPossible(); 1957 } 1958 1959 /** 1960 * Returns true if the stream is resumable. 1961 * 1962 * @return true if the stream is resumable. 1963 */ 1964 public boolean isSmResumptionPossible() { 1965 // There is no resumable stream available 1966 if (smSessionId == null) 1967 return false; 1968 1969 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1970 // Seems like we are already reconnected, report true 1971 if (shutdownTimestamp == null) { 1972 return true; 1973 } 1974 1975 // See if resumption time is over 1976 long current = System.currentTimeMillis(); 1977 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1978 if (current > shutdownTimestamp + maxResumptionMillies) { 1979 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1980 // resumption is possible 1981 return false; 1982 } else { 1983 return true; 1984 } 1985 } 1986 1987 /** 1988 * Drop the stream management state. Sets {@link #smSessionId} and 1989 * {@link #unacknowledgedStanzas} to <code>null</code>. 1990 */ 1991 private void dropSmState() { 1992 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1993 // respective. No need to reset them here. 1994 smSessionId = null; 1995 unacknowledgedStanzas = null; 1996 } 1997 1998 /** 1999 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 2000 * <p> 2001 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 2002 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 2003 * without checking for overflows before. 2004 * </p> 2005 * 2006 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 2007 */ 2008 public int getMaxSmResumptionTime() { 2009 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 2010 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 2011 return Math.min(clientResumptionTime, serverResumptionTime); 2012 } 2013 2014 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 2015 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 2016 final List<Stanza> ackedStanzas = new ArrayList<>( 2017 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 2018 : Integer.MAX_VALUE); 2019 for (long i = 0; i < ackedStanzasCount; i++) { 2020 Stanza ackedStanza = unacknowledgedStanzas.poll(); 2021 // If the server ack'ed a stanza, then it must be in the 2022 // unacknowledged stanza queue. There can be no exception. 2023 if (ackedStanza == null) { 2024 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 2025 ackedStanzasCount, ackedStanzas); 2026 } 2027 ackedStanzas.add(ackedStanza); 2028 } 2029 2030 boolean atLeastOneStanzaAcknowledgedListener = false; 2031 if (!stanzaAcknowledgedListeners.isEmpty()) { 2032 // If stanzaAcknowledgedListeners is not empty, the we have at least one 2033 atLeastOneStanzaAcknowledgedListener = true; 2034 } 2035 else { 2036 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 2037 for (Stanza ackedStanza : ackedStanzas) { 2038 String id = ackedStanza.getStanzaId(); 2039 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 2040 atLeastOneStanzaAcknowledgedListener = true; 2041 break; 2042 } 2043 } 2044 } 2045 2046 // Only spawn a new thread if there is a chance that some listener is invoked 2047 if (atLeastOneStanzaAcknowledgedListener) { 2048 asyncGo(new Runnable() { 2049 @Override 2050 public void run() { 2051 for (Stanza ackedStanza : ackedStanzas) { 2052 for (StanzaListener listener : stanzaAcknowledgedListeners) { 2053 try { 2054 listener.processStanza(ackedStanza); 2055 } 2056 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 2057 LOGGER.log(Level.FINER, "Received exception", e); 2058 } 2059 } 2060 String id = ackedStanza.getStanzaId(); 2061 if (StringUtils.isNullOrEmpty(id)) { 2062 continue; 2063 } 2064 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 2065 if (listener != null) { 2066 try { 2067 listener.processStanza(ackedStanza); 2068 } 2069 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 2070 LOGGER.log(Level.FINER, "Received exception", e); 2071 } 2072 } 2073 } 2074 } 2075 }); 2076 } 2077 2078 serverHandledStanzasCount = handledCount; 2079 } 2080 2081 /** 2082 * Set the default bundle and defer callback used for new connections. 2083 * 2084 * @param defaultBundleAndDeferCallback 2085 * @see BundleAndDeferCallback 2086 * @since 4.1 2087 */ 2088 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 2089 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 2090 } 2091 2092 /** 2093 * Set the bundle and defer callback used for this connection. 2094 * <p> 2095 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 2096 * no longer get deferred. 2097 * </p> 2098 * 2099 * @param bundleAndDeferCallback the callback or <code>null</code>. 2100 * @see BundleAndDeferCallback 2101 * @since 4.1 2102 */ 2103 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 2104 this.bundleAndDeferCallback = bundleAndDeferCallback; 2105 } 2106 2107}