/*
* Copyright (C) 2013 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.accessorydisplay.common;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.SparseArray;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A simple message transport.
*
* This object's interface is thread-safe, however incoming messages
* are always delivered on the {@link Looper} thread on which the transport
* was created.
*
*/
public abstract class Transport {
private static final int MAX_INPUT_BUFFERS = 8;
private final Logger mLogger;
// The transport thread looper and handler.
private final TransportHandler mHandler;
// Lock to guard all mutable state.
private final Object mLock = new Object();
// The output buffer. Set to null when the transport is closed.
private ByteBuffer mOutputBuffer;
// The input buffer pool.
private BufferPool mInputBufferPool;
// The reader thread. Initialized when reading starts.
private ReaderThread mThread;
// The list of callbacks indexed by service id.
private final SparseArray mServices = new SparseArray();
public Transport(Logger logger, int maxPacketSize) {
mLogger = logger;
mHandler = new TransportHandler();
mOutputBuffer = ByteBuffer.allocate(maxPacketSize);
mInputBufferPool = new BufferPool(
maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS);
}
/**
* Gets the logger for debugging.
*/
public Logger getLogger() {
return mLogger;
}
/**
* Gets the handler on the transport's thread.
*/
public Handler getHandler() {
return mHandler;
}
/**
* Closes the transport.
*/
public void close() {
synchronized (mLock) {
if (mOutputBuffer != null) {
if (mThread == null) {
ioClose();
} else {
// If the thread was started then it will be responsible for
// closing the stream when it quits because it may currently
// be in the process of reading from the stream so we can't simply
// shut it down right now.
mThread.quit();
}
mOutputBuffer = null;
}
}
}
/**
* Sends a message.
*
* @param service The service to whom the message is addressed.
* @param what The message type.
* @param content The content, or null if there is none.
* @return True if the message was sent successfully, false if an error occurred.
*/
public boolean sendMessage(int service, int what, ByteBuffer content) {
checkServiceId(service);
checkMessageId(what);
try {
synchronized (mLock) {
if (mOutputBuffer == null) {
mLogger.logError("Send message failed because transport was closed.");
return false;
}
final byte[] outputArray = mOutputBuffer.array();
final int capacity = mOutputBuffer.capacity();
mOutputBuffer.clear();
mOutputBuffer.putShort((short)service);
mOutputBuffer.putShort((short)what);
if (content == null) {
mOutputBuffer.putInt(0);
} else {
final int contentLimit = content.limit();
int contentPosition = content.position();
int contentRemaining = contentLimit - contentPosition;
if (contentRemaining > Protocol.MAX_CONTENT_SIZE) {
throw new IllegalArgumentException("Message content too large: "
+ contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE);
}
mOutputBuffer.putInt(contentRemaining);
while (contentRemaining != 0) {
final int outputAvailable = capacity - mOutputBuffer.position();
if (contentRemaining <= outputAvailable) {
mOutputBuffer.put(content);
break;
}
content.limit(contentPosition + outputAvailable);
mOutputBuffer.put(content);
content.limit(contentLimit);
ioWrite(outputArray, 0, capacity);
contentPosition += outputAvailable;
contentRemaining -= outputAvailable;
mOutputBuffer.clear();
}
}
ioWrite(outputArray, 0, mOutputBuffer.position());
return true;
}
} catch (IOException ex) {
mLogger.logError("Send message failed: " + ex);
return false;
}
}
/**
* Starts reading messages on a separate thread.
*/
public void startReading() {
synchronized (mLock) {
if (mOutputBuffer == null) {
throw new IllegalStateException("Transport has been closed");
}
mThread = new ReaderThread();
mThread.start();
}
}
/**
* Registers a service and provides a callback to receive messages.
*
* @param service The service id.
* @param callback The callback to use.
*/
public void registerService(int service, Callback callback) {
checkServiceId(service);
if (callback == null) {
throw new IllegalArgumentException("callback must not be null");
}
synchronized (mLock) {
mServices.put(service, callback);
}
}
/**
* Unregisters a service.
*
* @param service The service to unregister.
*/
public void unregisterService(int service) {
checkServiceId(service);
synchronized (mLock) {
mServices.remove(service);
}
}
private void dispatchMessageReceived(int service, int what, ByteBuffer content) {
final Callback callback;
synchronized (mLock) {
callback = mServices.get(service);
}
if (callback != null) {
callback.onMessageReceived(service, what, content);
} else {
mLogger.log("Discarding message " + what
+ " for unregistered service " + service);
}
}
private static void checkServiceId(int service) {
if (service < 0 || service > 0xffff) {
throw new IllegalArgumentException("service id out of range: " + service);
}
}
private static void checkMessageId(int what) {
if (what < 0 || what > 0xffff) {
throw new IllegalArgumentException("message id out of range: " + what);
}
}
// The IO methods must be safe to call on any thread.
// They may be called concurrently.
protected abstract void ioClose();
protected abstract int ioRead(byte[] buffer, int offset, int count)
throws IOException;
protected abstract void ioWrite(byte[] buffer, int offset, int count)
throws IOException;
/**
* Callback for services that handle received messages.
*/
public interface Callback {
/**
* Indicates that a message was received.
*
* @param service The service to whom the message is addressed.
* @param what The message type.
* @param content The content, or null if there is none.
*/
public void onMessageReceived(int service, int what, ByteBuffer content);
}
final class TransportHandler extends Handler {
@Override
public void handleMessage(Message msg) {
final ByteBuffer buffer = (ByteBuffer)msg.obj;
try {
final int limit = buffer.limit();
while (buffer.position() < limit) {
final int service = buffer.getShort() & 0xffff;
final int what = buffer.getShort() & 0xffff;
final int contentSize = buffer.getInt();
if (contentSize == 0) {
dispatchMessageReceived(service, what, null);
} else {
final int end = buffer.position() + contentSize;
buffer.limit(end);
dispatchMessageReceived(service, what, buffer);
buffer.limit(limit);
buffer.position(end);
}
}
} finally {
mInputBufferPool.release(buffer);
}
}
}
final class ReaderThread extends Thread {
// Set to true when quitting.
private volatile boolean mQuitting;
public ReaderThread() {
super("Accessory Display Transport");
}
@Override
public void run() {
loop();
ioClose();
}
private void loop() {
ByteBuffer buffer = null;
int length = Protocol.HEADER_SIZE;
int contentSize = -1;
outer: while (!mQuitting) {
// Get a buffer.
if (buffer == null) {
buffer = mInputBufferPool.acquire(length);
} else {
buffer = mInputBufferPool.grow(buffer, length);
}
// Read more data until needed number of bytes obtained.
int position = buffer.position();
int count;
try {
count = ioRead(buffer.array(), position, buffer.capacity() - position);
if (count < 0) {
break; // end of stream
}
} catch (IOException ex) {
mLogger.logError("Read failed: " + ex);
break; // error
}
position += count;
buffer.position(position);
if (contentSize < 0 && position >= Protocol.HEADER_SIZE) {
contentSize = buffer.getInt(4);
if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
mLogger.logError("Encountered invalid content size: " + contentSize);
break; // malformed stream
}
length += contentSize;
}
if (position < length) {
continue; // need more data
}
// There is at least one complete message in the buffer.
// Find the end of a contiguous chunk of complete messages.
int next = length;
int remaining;
for (;;) {
length = Protocol.HEADER_SIZE;
remaining = position - next;
if (remaining < length) {
contentSize = -1;
break; // incomplete header, need more data
}
contentSize = buffer.getInt(next + 4);
if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
mLogger.logError("Encountered invalid content size: " + contentSize);
break outer; // malformed stream
}
length += contentSize;
if (remaining < length) {
break; // incomplete content, need more data
}
next += length;
}
// Post the buffer then don't modify it anymore.
// Now this is kind of sneaky. We know that no other threads will
// be acquiring buffers from the buffer pool so we can keep on
// referring to this buffer as long as we don't modify its contents.
// This allows us to operate in a single-buffered mode if desired.
buffer.limit(next);
buffer.rewind();
mHandler.obtainMessage(0, buffer).sendToTarget();
// If there is an incomplete message at the end, then we will need
// to copy it to a fresh buffer before continuing. In the single-buffered
// case, we may acquire the same buffer as before which is fine.
if (remaining == 0) {
buffer = null;
} else {
final ByteBuffer oldBuffer = buffer;
buffer = mInputBufferPool.acquire(length);
System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining);
buffer.position(remaining);
}
}
if (buffer != null) {
mInputBufferPool.release(buffer);
}
}
public void quit() {
mQuitting = true;
}
}
}