/* * Copyright (C) 2013 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.accessorydisplay.common; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.SparseArray; import java.io.IOException; import java.nio.ByteBuffer; /** * A simple message transport. *

* This object's interface is thread-safe, however incoming messages * are always delivered on the {@link Looper} thread on which the transport * was created. *

*/ public abstract class Transport { private static final int MAX_INPUT_BUFFERS = 8; private final Logger mLogger; // The transport thread looper and handler. private final TransportHandler mHandler; // Lock to guard all mutable state. private final Object mLock = new Object(); // The output buffer. Set to null when the transport is closed. private ByteBuffer mOutputBuffer; // The input buffer pool. private BufferPool mInputBufferPool; // The reader thread. Initialized when reading starts. private ReaderThread mThread; // The list of callbacks indexed by service id. private final SparseArray mServices = new SparseArray(); public Transport(Logger logger, int maxPacketSize) { mLogger = logger; mHandler = new TransportHandler(); mOutputBuffer = ByteBuffer.allocate(maxPacketSize); mInputBufferPool = new BufferPool( maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS); } /** * Gets the logger for debugging. */ public Logger getLogger() { return mLogger; } /** * Gets the handler on the transport's thread. */ public Handler getHandler() { return mHandler; } /** * Closes the transport. */ public void close() { synchronized (mLock) { if (mOutputBuffer != null) { if (mThread == null) { ioClose(); } else { // If the thread was started then it will be responsible for // closing the stream when it quits because it may currently // be in the process of reading from the stream so we can't simply // shut it down right now. mThread.quit(); } mOutputBuffer = null; } } } /** * Sends a message. * * @param service The service to whom the message is addressed. * @param what The message type. * @param content The content, or null if there is none. * @return True if the message was sent successfully, false if an error occurred. */ public boolean sendMessage(int service, int what, ByteBuffer content) { checkServiceId(service); checkMessageId(what); try { synchronized (mLock) { if (mOutputBuffer == null) { mLogger.logError("Send message failed because transport was closed."); return false; } final byte[] outputArray = mOutputBuffer.array(); final int capacity = mOutputBuffer.capacity(); mOutputBuffer.clear(); mOutputBuffer.putShort((short)service); mOutputBuffer.putShort((short)what); if (content == null) { mOutputBuffer.putInt(0); } else { final int contentLimit = content.limit(); int contentPosition = content.position(); int contentRemaining = contentLimit - contentPosition; if (contentRemaining > Protocol.MAX_CONTENT_SIZE) { throw new IllegalArgumentException("Message content too large: " + contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE); } mOutputBuffer.putInt(contentRemaining); while (contentRemaining != 0) { final int outputAvailable = capacity - mOutputBuffer.position(); if (contentRemaining <= outputAvailable) { mOutputBuffer.put(content); break; } content.limit(contentPosition + outputAvailable); mOutputBuffer.put(content); content.limit(contentLimit); ioWrite(outputArray, 0, capacity); contentPosition += outputAvailable; contentRemaining -= outputAvailable; mOutputBuffer.clear(); } } ioWrite(outputArray, 0, mOutputBuffer.position()); return true; } } catch (IOException ex) { mLogger.logError("Send message failed: " + ex); return false; } } /** * Starts reading messages on a separate thread. */ public void startReading() { synchronized (mLock) { if (mOutputBuffer == null) { throw new IllegalStateException("Transport has been closed"); } mThread = new ReaderThread(); mThread.start(); } } /** * Registers a service and provides a callback to receive messages. * * @param service The service id. * @param callback The callback to use. */ public void registerService(int service, Callback callback) { checkServiceId(service); if (callback == null) { throw new IllegalArgumentException("callback must not be null"); } synchronized (mLock) { mServices.put(service, callback); } } /** * Unregisters a service. * * @param service The service to unregister. */ public void unregisterService(int service) { checkServiceId(service); synchronized (mLock) { mServices.remove(service); } } private void dispatchMessageReceived(int service, int what, ByteBuffer content) { final Callback callback; synchronized (mLock) { callback = mServices.get(service); } if (callback != null) { callback.onMessageReceived(service, what, content); } else { mLogger.log("Discarding message " + what + " for unregistered service " + service); } } private static void checkServiceId(int service) { if (service < 0 || service > 0xffff) { throw new IllegalArgumentException("service id out of range: " + service); } } private static void checkMessageId(int what) { if (what < 0 || what > 0xffff) { throw new IllegalArgumentException("message id out of range: " + what); } } // The IO methods must be safe to call on any thread. // They may be called concurrently. protected abstract void ioClose(); protected abstract int ioRead(byte[] buffer, int offset, int count) throws IOException; protected abstract void ioWrite(byte[] buffer, int offset, int count) throws IOException; /** * Callback for services that handle received messages. */ public interface Callback { /** * Indicates that a message was received. * * @param service The service to whom the message is addressed. * @param what The message type. * @param content The content, or null if there is none. */ public void onMessageReceived(int service, int what, ByteBuffer content); } final class TransportHandler extends Handler { @Override public void handleMessage(Message msg) { final ByteBuffer buffer = (ByteBuffer)msg.obj; try { final int limit = buffer.limit(); while (buffer.position() < limit) { final int service = buffer.getShort() & 0xffff; final int what = buffer.getShort() & 0xffff; final int contentSize = buffer.getInt(); if (contentSize == 0) { dispatchMessageReceived(service, what, null); } else { final int end = buffer.position() + contentSize; buffer.limit(end); dispatchMessageReceived(service, what, buffer); buffer.limit(limit); buffer.position(end); } } } finally { mInputBufferPool.release(buffer); } } } final class ReaderThread extends Thread { // Set to true when quitting. private volatile boolean mQuitting; public ReaderThread() { super("Accessory Display Transport"); } @Override public void run() { loop(); ioClose(); } private void loop() { ByteBuffer buffer = null; int length = Protocol.HEADER_SIZE; int contentSize = -1; outer: while (!mQuitting) { // Get a buffer. if (buffer == null) { buffer = mInputBufferPool.acquire(length); } else { buffer = mInputBufferPool.grow(buffer, length); } // Read more data until needed number of bytes obtained. int position = buffer.position(); int count; try { count = ioRead(buffer.array(), position, buffer.capacity() - position); if (count < 0) { break; // end of stream } } catch (IOException ex) { mLogger.logError("Read failed: " + ex); break; // error } position += count; buffer.position(position); if (contentSize < 0 && position >= Protocol.HEADER_SIZE) { contentSize = buffer.getInt(4); if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) { mLogger.logError("Encountered invalid content size: " + contentSize); break; // malformed stream } length += contentSize; } if (position < length) { continue; // need more data } // There is at least one complete message in the buffer. // Find the end of a contiguous chunk of complete messages. int next = length; int remaining; for (;;) { length = Protocol.HEADER_SIZE; remaining = position - next; if (remaining < length) { contentSize = -1; break; // incomplete header, need more data } contentSize = buffer.getInt(next + 4); if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) { mLogger.logError("Encountered invalid content size: " + contentSize); break outer; // malformed stream } length += contentSize; if (remaining < length) { break; // incomplete content, need more data } next += length; } // Post the buffer then don't modify it anymore. // Now this is kind of sneaky. We know that no other threads will // be acquiring buffers from the buffer pool so we can keep on // referring to this buffer as long as we don't modify its contents. // This allows us to operate in a single-buffered mode if desired. buffer.limit(next); buffer.rewind(); mHandler.obtainMessage(0, buffer).sendToTarget(); // If there is an incomplete message at the end, then we will need // to copy it to a fresh buffer before continuing. In the single-buffered // case, we may acquire the same buffer as before which is fine. if (remaining == 0) { buffer = null; } else { final ByteBuffer oldBuffer = buffer; buffer = mInputBufferPool.acquire(length); System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining); buffer.position(remaining); } } if (buffer != null) { mInputBufferPool.release(buffer); } } public void quit() { mQuitting = true; } } }