QueuedMessageWriter.java

/*
 * Copyright 2015-2016 Providence Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.providence.logging;

import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageOrBuilder;
import net.morimekta.providence.PServiceCall;
import net.morimekta.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * A queued message writer that takes in messages onto a queue, and let
 * a single thread handle all the writes to the contained writer. This
 * writer is thread safe, and should be much faster than having multiple
 * threads fight over the file IO.
 * <p>
 * Note that the writer will continue to accept messages after it has been
 * closed.
 */
public class QueuedMessageWriter implements MessageWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueuedMessageWriter.class);
    private static final int    DEFAULT_MAX_QUEUE_LEN = 65536;

    private final Queue<PMessageOrBuilder> messageQueue;
    private final Queue<PServiceCall> callQueue;
    private final ExecutorService     executor;
    private final MessageWriter       writer;
    private final int                 maxQueueLength;

    /**
     * Create a queued message writer.
     *
     * @param writer The message writer to write to.
     */
    public QueuedMessageWriter(MessageWriter writer) {
        this(writer, Executors.newSingleThreadExecutor(
                NamedThreadFactory.builder()
                                  .setDaemon(true)
                                  .setNameFormat("providence-queued-writer")
                                  .build()));
    }

    /**
     * Create a queued message writer using the given executor service.
     * Note that the executor service will be shut down with the message queue.
     *
     * @param writer The message writer to write to.
     * @param executor The executor service running the write loop thread.
     */
    public QueuedMessageWriter(MessageWriter writer,
                               ExecutorService executor) {
        this(writer, executor, DEFAULT_MAX_QUEUE_LEN);
    }

    /**
     * Create a queued message writer using the given executor service.
     * Note that the executor service will be shut down with the message queue.
     *
     * @param writer The message writer to write to.
     * @param executor The executor service running the write loop thread.
     * @param maxQueueLength The max queue length. If 0 or less, no limit is enforced.
     *                       Default is 65536 (64k).
     */
    public QueuedMessageWriter(MessageWriter writer,
                               ExecutorService executor,
                               int maxQueueLength) {
        this.writer = writer;
        this.executor = executor;
        this.maxQueueLength = maxQueueLength;
        this.messageQueue = new ConcurrentLinkedQueue<>();
        this.callQueue = new ConcurrentLinkedQueue<>();
        this.executor.submit(this::writeLoop);
    }

    /**
     * @return The current size of the in-memory queue.
     */
    public int size() {
        return callQueue.size() + messageQueue.size();
    }

    @Override
    public <Message extends PMessage<Message>>
    int write(PMessageOrBuilder<Message> message) throws IOException {
        if (maxQueueLength <= 0 || size() < maxQueueLength) {
            messageQueue.offer(message);
            return 1;
        }
        return 0;
    }

    @Override
    public <Message extends PMessage<Message>>
    int write(PServiceCall<Message> call) throws IOException {
        if (maxQueueLength <= 0 || size() < maxQueueLength) {
            callQueue.offer(call);
            return 1;
        }
        return 0;
    }

    @Override
    public int separator() {
        return 0;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void close() throws IOException {
        if (!executor.isShutdown()) {
            try {
                executor.shutdown();
                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted while stopping writer loop thread", e);
                throw new RuntimeException(e.getMessage(), e);
            } finally {
                try {
                    while (messageQueue.size() > 0) {
                        writer.write(messageQueue.poll());
                        writer.separator();
                    }
                    while (callQueue.size() > 0) {
                        writer.write(callQueue.poll());
                        writer.separator();
                    }
                } catch (IOException e) {
                    LOGGER.error("Unable to write messages on close", e);
                }
                writer.close();
            }
        }
    }

    @SuppressWarnings("unchecked")
    private void writeLoop() {
        try {
            long failDelay = 137L;
            while (!executor.isShutdown()) {
                try {
                    while (messageQueue.size() > 0) {
                        writer.write(messageQueue.poll());
                        failDelay = 137L;
                    }
                    while (callQueue.size() > 0) {
                        writer.write(callQueue.poll());
                        failDelay = 137L;
                    }
                    sleep(3L);  // 3ms should be enough to do actual work.
                    // This is a very tight loop, so should be expensive to
                    // to have a short sleep time.
                } catch (IOException e) {
                    if (failDelay >= 10_000) {
                        LOGGER.error("Unable to write message, sleeping {}s",
                                     (failDelay / 1000), e);
                    } else {
                        LOGGER.error("Unable to write message, sleeping {}ms",
                                     failDelay, e);
                    }

                    // Continue but with longer sleep on errors.
                    try {
                        sleep(failDelay);
                    } finally {
                        failDelay = Math.min(
                                TimeUnit.MINUTES.toMillis(10),
                                // add 2/3 to the time for each consecutive failure.
                                (long) (failDelay * 1.66666667));
                    }
                }
            }
        } catch (InterruptedException ignore) {
            // thread is interrupted, just stop. Not tested.
            Thread.currentThread().interrupt();
        }
    }

    protected void sleep(long ms) throws InterruptedException {
        Thread.sleep(ms);
    }
}