001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026import static java.util.Objects.requireNonNull;
027
028import com.google.common.annotations.Beta;
029import com.google.common.annotations.GwtIncompatible;
030import com.google.common.util.concurrent.Monitor.Guard;
031import com.google.common.util.concurrent.Service.State; // javadoc needs this
032import com.google.errorprone.annotations.CanIgnoreReturnValue;
033import com.google.errorprone.annotations.ForOverride;
034import com.google.errorprone.annotations.concurrent.GuardedBy;
035import com.google.j2objc.annotations.WeakOuter;
036import java.time.Duration;
037import java.util.concurrent.Executor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import javax.annotation.CheckForNull;
041
042/**
043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
045 * callbacks. Its subclasses must manage threads manually; consider {@link
046 * AbstractExecutionThreadService} if you need only a single execution thread.
047 *
048 * @author Jesse Wilson
049 * @author Luke Sandberg
050 * @since 1.0
051 */
052@GwtIncompatible
053@ElementTypesAreNonnullByDefault
054public abstract class AbstractService implements Service {
055  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
056      new ListenerCallQueue.Event<Listener>() {
057        @Override
058        public void call(Listener listener) {
059          listener.starting();
060        }
061
062        @Override
063        public String toString() {
064          return "starting()";
065        }
066      };
067  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
068      new ListenerCallQueue.Event<Listener>() {
069        @Override
070        public void call(Listener listener) {
071          listener.running();
072        }
073
074        @Override
075        public String toString() {
076          return "running()";
077        }
078      };
079  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
080      stoppingEvent(STARTING);
081  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
082      stoppingEvent(RUNNING);
083
084  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
085      terminatedEvent(NEW);
086  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
087      terminatedEvent(STARTING);
088  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
089      terminatedEvent(RUNNING);
090  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
091      terminatedEvent(STOPPING);
092
093  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
094    return new ListenerCallQueue.Event<Listener>() {
095      @Override
096      public void call(Listener listener) {
097        listener.terminated(from);
098      }
099
100      @Override
101      public String toString() {
102        return "terminated({from = " + from + "})";
103      }
104    };
105  }
106
107  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
108    return new ListenerCallQueue.Event<Listener>() {
109      @Override
110      public void call(Listener listener) {
111        listener.stopping(from);
112      }
113
114      @Override
115      public String toString() {
116        return "stopping({from = " + from + "})";
117      }
118    };
119  }
120
121  private final Monitor monitor = new Monitor();
122
123  private final Guard isStartable = new IsStartableGuard();
124
125  @WeakOuter
126  private final class IsStartableGuard extends Guard {
127    IsStartableGuard() {
128      super(AbstractService.this.monitor);
129    }
130
131    @Override
132    public boolean isSatisfied() {
133      return state() == NEW;
134    }
135  }
136
137  private final Guard isStoppable = new IsStoppableGuard();
138
139  @WeakOuter
140  private final class IsStoppableGuard extends Guard {
141    IsStoppableGuard() {
142      super(AbstractService.this.monitor);
143    }
144
145    @Override
146    public boolean isSatisfied() {
147      return state().compareTo(RUNNING) <= 0;
148    }
149  }
150
151  private final Guard hasReachedRunning = new HasReachedRunningGuard();
152
153  @WeakOuter
154  private final class HasReachedRunningGuard extends Guard {
155    HasReachedRunningGuard() {
156      super(AbstractService.this.monitor);
157    }
158
159    @Override
160    public boolean isSatisfied() {
161      return state().compareTo(RUNNING) >= 0;
162    }
163  }
164
165  private final Guard isStopped = new IsStoppedGuard();
166
167  @WeakOuter
168  private final class IsStoppedGuard extends Guard {
169    IsStoppedGuard() {
170      super(AbstractService.this.monitor);
171    }
172
173    @Override
174    public boolean isSatisfied() {
175      return state().compareTo(TERMINATED) >= 0;
176    }
177  }
178
179  /** The listeners to notify during a state transition. */
180  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
181
182  /**
183   * The current state of the service. This should be written with the lock held but can be read
184   * without it because it is an immutable object in a volatile field. This is desirable so that
185   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
186   * without grabbing the lock.
187   *
188   * <p>To update this field correctly the lock must be held to guarantee that the state is
189   * consistent.
190   */
191  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
192
193  /** Constructor for use by subclasses. */
194  protected AbstractService() {}
195
196  /**
197   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
198   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
199   * or after it has returned. If startup fails, the invocation should cause a call to {@link
200   * #notifyFailed(Throwable)} instead.
201   *
202   * <p>This method should return promptly; prefer to do work on a different thread where it is
203   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
204   * called multiple times.
205   */
206  @ForOverride
207  protected abstract void doStart();
208
209  /**
210   * This method should be used to initiate service shutdown. The invocation of this method should
211   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
212   * returned. If shutdown fails, the invocation should cause a call to {@link
213   * #notifyFailed(Throwable)} instead.
214   *
215   * <p>This method should return promptly; prefer to do work on a different thread where it is
216   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
217   * called multiple times.
218   *
219   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
220   * invoked immediately. Instead, it will be deferred until after the service is {@link
221   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
222   */
223  @ForOverride
224  protected abstract void doStop();
225
226  /**
227   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
228   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
229   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
230   *
231   * <p>This method should return promptly; prefer to do work on a different thread where it is
232   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
233   * called multiple times.
234   *
235   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
236   * external state observable by the caller of {@link #stopAsync}.
237   *
238   * @since 27.0
239   */
240  @Beta
241  @ForOverride
242  protected void doCancelStart() {}
243
244  @CanIgnoreReturnValue
245  @Override
246  public final Service startAsync() {
247    if (monitor.enterIf(isStartable)) {
248      try {
249        snapshot = new StateSnapshot(STARTING);
250        enqueueStartingEvent();
251        doStart();
252      } catch (Throwable startupFailure) {
253        notifyFailed(startupFailure);
254      } finally {
255        monitor.leave();
256        dispatchListenerEvents();
257      }
258    } else {
259      throw new IllegalStateException("Service " + this + " has already been started");
260    }
261    return this;
262  }
263
264  @CanIgnoreReturnValue
265  @Override
266  public final Service stopAsync() {
267    if (monitor.enterIf(isStoppable)) {
268      try {
269        State previous = state();
270        switch (previous) {
271          case NEW:
272            snapshot = new StateSnapshot(TERMINATED);
273            enqueueTerminatedEvent(NEW);
274            break;
275          case STARTING:
276            snapshot = new StateSnapshot(STARTING, true, null);
277            enqueueStoppingEvent(STARTING);
278            doCancelStart();
279            break;
280          case RUNNING:
281            snapshot = new StateSnapshot(STOPPING);
282            enqueueStoppingEvent(RUNNING);
283            doStop();
284            break;
285          case STOPPING:
286          case TERMINATED:
287          case FAILED:
288            // These cases are impossible due to the if statement above.
289            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
290        }
291      } catch (Throwable shutdownFailure) {
292        notifyFailed(shutdownFailure);
293      } finally {
294        monitor.leave();
295        dispatchListenerEvents();
296      }
297    }
298    return this;
299  }
300
301  @Override
302  public final void awaitRunning() {
303    monitor.enterWhenUninterruptibly(hasReachedRunning);
304    try {
305      checkCurrentState(RUNNING);
306    } finally {
307      monitor.leave();
308    }
309  }
310
311  /** @since 28.0 */
312  @Override
313  public final void awaitRunning(Duration timeout) throws TimeoutException {
314    Service.super.awaitRunning(timeout);
315  }
316
317  @Override
318  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
319    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
320      try {
321        checkCurrentState(RUNNING);
322      } finally {
323        monitor.leave();
324      }
325    } else {
326      // It is possible due to races the we are currently in the expected state even though we
327      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
328      // even check the guard. I don't think we care too much about this use case but it could lead
329      // to a confusing error message.
330      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
331    }
332  }
333
334  @Override
335  public final void awaitTerminated() {
336    monitor.enterWhenUninterruptibly(isStopped);
337    try {
338      checkCurrentState(TERMINATED);
339    } finally {
340      monitor.leave();
341    }
342  }
343
344  /** @since 28.0 */
345  @Override
346  public final void awaitTerminated(Duration timeout) throws TimeoutException {
347    Service.super.awaitTerminated(timeout);
348  }
349
350  @Override
351  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
352    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
353      try {
354        checkCurrentState(TERMINATED);
355      } finally {
356        monitor.leave();
357      }
358    } else {
359      // It is possible due to races the we are currently in the expected state even though we
360      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
361      // even check the guard. I don't think we care too much about this use case but it could lead
362      // to a confusing error message.
363      throw new TimeoutException(
364          "Timed out waiting for "
365              + this
366              + " to reach a terminal state. "
367              + "Current state: "
368              + state());
369    }
370  }
371
372  /** Checks that the current state is equal to the expected state. */
373  @GuardedBy("monitor")
374  private void checkCurrentState(State expected) {
375    State actual = state();
376    if (actual != expected) {
377      if (actual == FAILED) {
378        // Handle this specially so that we can include the failureCause, if there is one.
379        throw new IllegalStateException(
380            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
381            failureCause());
382      }
383      throw new IllegalStateException(
384          "Expected the service " + this + " to be " + expected + ", but was " + actual);
385    }
386  }
387
388  /**
389   * Implementing classes should invoke this method once their service has started. It will cause
390   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
391   *
392   * @throws IllegalStateException if the service is not {@link State#STARTING}.
393   */
394  protected final void notifyStarted() {
395    monitor.enter();
396    try {
397      // We have to examine the internal state of the snapshot here to properly handle the stop
398      // while starting case.
399      if (snapshot.state != STARTING) {
400        IllegalStateException failure =
401            new IllegalStateException(
402                "Cannot notifyStarted() when the service is " + snapshot.state);
403        notifyFailed(failure);
404        throw failure;
405      }
406
407      if (snapshot.shutdownWhenStartupFinishes) {
408        snapshot = new StateSnapshot(STOPPING);
409        // We don't call listeners here because we already did that when we set the
410        // shutdownWhenStartupFinishes flag.
411        doStop();
412      } else {
413        snapshot = new StateSnapshot(RUNNING);
414        enqueueRunningEvent();
415      }
416    } finally {
417      monitor.leave();
418      dispatchListenerEvents();
419    }
420  }
421
422  /**
423   * Implementing classes should invoke this method once their service has stopped. It will cause
424   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
425   * State#TERMINATED}.
426   *
427   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
428   *     State#STARTING}, or {@link State#RUNNING}.
429   */
430  protected final void notifyStopped() {
431    monitor.enter();
432    try {
433      State previous = state();
434      switch (previous) {
435        case NEW:
436        case TERMINATED:
437        case FAILED:
438          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
439        case RUNNING:
440        case STARTING:
441        case STOPPING:
442          snapshot = new StateSnapshot(TERMINATED);
443          enqueueTerminatedEvent(previous);
444          break;
445      }
446    } finally {
447      monitor.leave();
448      dispatchListenerEvents();
449    }
450  }
451
452  /**
453   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
454   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
455   * or otherwise cannot be started nor stopped.
456   */
457  protected final void notifyFailed(Throwable cause) {
458    checkNotNull(cause);
459
460    monitor.enter();
461    try {
462      State previous = state();
463      switch (previous) {
464        case NEW:
465        case TERMINATED:
466          throw new IllegalStateException("Failed while in state:" + previous, cause);
467        case RUNNING:
468        case STARTING:
469        case STOPPING:
470          snapshot = new StateSnapshot(FAILED, false, cause);
471          enqueueFailedEvent(previous, cause);
472          break;
473        case FAILED:
474          // Do nothing
475          break;
476      }
477    } finally {
478      monitor.leave();
479      dispatchListenerEvents();
480    }
481  }
482
483  @Override
484  public final boolean isRunning() {
485    return state() == RUNNING;
486  }
487
488  @Override
489  public final State state() {
490    return snapshot.externalState();
491  }
492
493  /** @since 14.0 */
494  @Override
495  public final Throwable failureCause() {
496    return snapshot.failureCause();
497  }
498
499  /** @since 13.0 */
500  @Override
501  public final void addListener(Listener listener, Executor executor) {
502    listeners.addListener(listener, executor);
503  }
504
505  @Override
506  public String toString() {
507    return getClass().getSimpleName() + " [" + state() + "]";
508  }
509
510  /**
511   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
512   * #monitor}.
513   */
514  private void dispatchListenerEvents() {
515    if (!monitor.isOccupiedByCurrentThread()) {
516      listeners.dispatch();
517    }
518  }
519
520  private void enqueueStartingEvent() {
521    listeners.enqueue(STARTING_EVENT);
522  }
523
524  private void enqueueRunningEvent() {
525    listeners.enqueue(RUNNING_EVENT);
526  }
527
528  private void enqueueStoppingEvent(final State from) {
529    if (from == State.STARTING) {
530      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
531    } else if (from == State.RUNNING) {
532      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
533    } else {
534      throw new AssertionError();
535    }
536  }
537
538  private void enqueueTerminatedEvent(final State from) {
539    switch (from) {
540      case NEW:
541        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
542        break;
543      case STARTING:
544        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
545        break;
546      case RUNNING:
547        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
548        break;
549      case STOPPING:
550        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
551        break;
552      case TERMINATED:
553      case FAILED:
554        throw new AssertionError();
555    }
556  }
557
558  private void enqueueFailedEvent(final State from, final Throwable cause) {
559    // can't memoize this one due to the exception
560    listeners.enqueue(
561        new ListenerCallQueue.Event<Listener>() {
562          @Override
563          public void call(Listener listener) {
564            listener.failed(from, cause);
565          }
566
567          @Override
568          public String toString() {
569            return "failed({from = " + from + ", cause = " + cause + "})";
570          }
571        });
572  }
573
574  /**
575   * An immutable snapshot of the current state of the service. This class represents a consistent
576   * snapshot of the state and therefore it can be used to answer simple queries without needing to
577   * grab a lock.
578   */
579  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
580  private static final class StateSnapshot {
581    /**
582     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
583     */
584    final State state;
585
586    /** If true, the user requested a shutdown while the service was still starting up. */
587    final boolean shutdownWhenStartupFinishes;
588
589    /**
590     * The exception that caused this service to fail. This will be {@code null} unless the service
591     * has failed.
592     */
593    @CheckForNull final Throwable failure;
594
595    StateSnapshot(State internalState) {
596      this(internalState, false, null);
597    }
598
599    StateSnapshot(
600        State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) {
601      checkArgument(
602          !shutdownWhenStartupFinishes || internalState == STARTING,
603          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
604          internalState);
605      checkArgument(
606          (failure != null) == (internalState == FAILED),
607          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
608              + "instead.",
609          internalState,
610          failure);
611      this.state = internalState;
612      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
613      this.failure = failure;
614    }
615
616    /** @see Service#state() */
617    State externalState() {
618      if (shutdownWhenStartupFinishes && state == STARTING) {
619        return STOPPING;
620      } else {
621        return state;
622      }
623    }
624
625    /** @see Service#failureCause() */
626    Throwable failureCause() {
627      checkState(
628          state == FAILED,
629          "failureCause() is only valid if the service has failed, service is %s",
630          state);
631      // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED.
632      return requireNonNull(failure);
633    }
634  }
635}