交互流程
运用主进程handler 发送同步或许异步音讯。
类图结构
Hander模块解析
结构函数阐明
经过注入Looper实例,完成Hander和Looper进行绑定。
是否支撑异步:async
,假如为true,则:
- 异步音讯表明不需求相对于同步音讯进行全局排序的中止或事情。
- 异步音讯不受显示vsync等条件引入的同步妨碍的影响。
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}
// 成员变量初始化
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue; // 从Looper中读取Message行列
mCallback = callback;
mAsynchronous = async;// 是否支撑异步
}
发送Message
经过多个api,供给立即发送、延迟的发送Message的功用。
public final boolean sendMessage(@NonNull Message msg) {
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
// 终究刺进音讯的办法
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
最终走到enqueueMessage办法,调用MessageQueue#enqueueMessage
将Message实例注入到MessageQueue行列中。
// 终究刺进音讯的办法
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) { // 这里符号音讯是异步的,后面会有阐明
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Post Runnable或许Message
经过调用post相关的api (实践上会复用上面发送Message的办法) ,终究将Runnable实例绑定到Message实例的callback上,等到Message 被消费时,就会去履行callback。
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
public final boolean postAtTime(
@NonNull Runnable r, @Nullable Object token, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r, token), uptimeMillis);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
@UnsupportedAppUsage
private static Message getPostMessage(Runnable r, Object token) {
Message m = Message.obtain();
m.obj = token; // 符号一下
m.callback = r;
return m;
}
分发Message逻辑
/**
* Handle system messages here.
*/
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
// mCallback能够消费message,自定义处理,打断承继链中的handler逻辑
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
// 直接履行Runnable实例的run办法
private static void handleCallback(Message message) {
message.callback.run();
}
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*/
public interface Callback {
/**
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
boolean handleMessage(@NonNull Message msg);
}
/**
* Subclasses must implement this to receive messages.
* 需求子类完成并处理音讯
*/
public void handleMessage(@NonNull Message msg) {
}
Looper模块解析
结构函数阐明
结构函数是私有的,需求调用一些静态办法才干获取到一个Looper实例。
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread(); // 绑定当前线程实例
}
获取一个Looper实例
需求先调用静态办法parpare()办法,才干获取Looper实例。
- parpare -> Looper实例创立并装入到ThreadLocal实例中
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
// 这里运用到了ThreadLocal类,表明在当前线程中设置一个Looper实例。因为每一个线程都会有自己的ThreadLocal
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
/**
* 初始化主线程Looper
* Initialize the current thread as a looper, marking it as an
* application's main looper. See also: {@link #prepare()}
*
* @deprecated The main looper for your application is created by the Android environment,
* so you should never need to call this function yourself.
*/
@Deprecated
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
/* 回来主线程Looper Returns the application's main looper, which lives in the main thread of the application.
*/
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}
/** 回来当前线程线程Looper
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
Looper 死循环获取Message并消费
/**
* Poll and deliver single message, return true if the outer loop should continue.
* 获取并分发Message实例,假如外部looper需求持续,则回来ture
*/
@SuppressWarnings("AndroidFrameworkBinderIdentity")
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
Message msg = me.mQueue.next(); // might block 可能会堵塞,怎样完成?见MessageQueue
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
// Make sure the observer won't change while processing a transaction.
final Observer observer = sObserver;
final long traceTag = me.mTraceTag;
long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
if (thresholdOverride > 0) {
slowDispatchThresholdMs = thresholdOverride;
slowDeliveryThresholdMs = thresholdOverride;
}
final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);
final boolean needStartTime = logSlowDelivery || logSlowDispatch;
final boolean needEndTime = logSlowDispatch;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
Object token = null;
if (observer != null) {
token = observer.messageDispatchStarting(); // 观察者模式,符号音讯分发开端
}
long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
try {
msg.target.dispatchMessage(msg); // 分发message,这里调回了Handler中
if (observer != null) {
observer.messageDispatched(token, msg); // 观察者模式,告知观察者音讯已发送
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
// 观察者模式,告知观察者音讯发送出现异常
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logSlowDelivery) {
if (me.mSlowDeliveryDetected) {
if ((dispatchStart - msg.when) <= 10) {
Slog.w(TAG, "Drained");
me.mSlowDeliveryDetected = false;
}
} else {
if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
msg)) {
// Once we write a slow delivery log, suppress until the queue drains.
me.mSlowDeliveryDetected = true;
}
}
}
if (logSlowDispatch) {
showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
return true;
}
/**
* Run the message queue in this thread. Be sure to call
* {@link #quit()} to end the loop.
*/
@SuppressWarnings("AndroidFrameworkBinderIdentity")
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
if (me.mInLoop) {
Slog.w(TAG, "Loop again would have the queued messages be executed"
+ " before this one completed.");
}
me.mInLoop = true; // 符号Looper已开端死循环
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);
me.mSlowDeliveryDetected = false;
// 死循环,不断拿Message并消费。
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
Message模块解析
成员变量
/**
* User-defined message code so that the recipient can identify
* what this message is about. Each {@link Handler} has its own name-space
* for message codes, so you do not need to worry about yours conflicting
* with other handlers.
*/
public int what; // 用户可自定义音讯类型
/**
* arg1 and arg2 are lower-cost alternatives to using
* {@link #setData(Bundle) setData()} if you only need to store a
* few integer values.
*/
public int arg1; // 用户可自定义音讯参数
/**
* arg1 and arg2 are lower-cost alternatives to using
* {@link #setData(Bundle) setData()} if you only need to store a
* few integer values.
*/
public int arg2;// 用户可自定义音讯参数
/**
* An arbitrary object to send to the recipient. When using
* {@link Messenger} to send the message across processes this can only
* be non-null if it contains a Parcelable of a framework class (not one
* implemented by the application). For other data transfer use
* {@link #setData}.
*
* <p>Note that Parcelable objects here are not supported prior to
* the {@link android.os.Build.VERSION_CODES#FROYO} release.
*/
public Object obj; // 用户可自定义音讯参数,可传入任意数据目标,消费时进行强制转换
/**
* Optional Messenger where replies to this message can be sent. The
* semantics of exactly how this is used are up to the sender and
* receiver.
*/
public Messenger replyTo; // 可选项,支撑跨进程通信
/**
* Indicates that the uid is not set;
*
* @hide Only for use within the system server. 体系服务运用
*/
public static final int UID_NONE = -1;
/**
* Optional field indicating the uid that sent the message. This is
* only valid for messages posted by a {@link Messenger}; otherwise,
* it will be -1.
*/
public int sendingUid = UID_NONE;
/**
* Optional field indicating the uid that caused this message to be enqueued.
*
* @hide Only for use within the system server.
*/
public int workSourceUid = UID_NONE;
/** If set message is in use.
* This flag is set when the message is enqueued and remains set while it
* is delivered and afterwards when it is recycled. The flag is only cleared
* when a new message is created or obtained since that is the only time that
* applications are allowed to modify the contents of the message.
*
* It is an error to attempt to enqueue or recycle a message that is already in use.
*/
/*package*/ static final int FLAG_IN_USE = 1 << 0;
/** If set message is asynchronous */
/*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;
/** Flags to clear in the copyFrom method */
/*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
// Message状况符号变量
@UnsupportedAppUsage
/*package*/ int flags;
/**
* The targeted delivery time of this message. The time-base is
* {@link SystemClock#uptimeMillis}.
* @hide Only for use within the tests.
*/
@UnsupportedAppUsage
@VisibleForTesting(visibility = VisibleForTesting.Visibility.PACKAGE)
public long when;
/*package*/ Bundle data;
@UnsupportedAppUsage
/*package*/ Handler target;
@UnsupportedAppUsage
/*package*/ Runnable callback;
// sometimes we store linked lists of these things 内部支撑Message链表结构
@UnsupportedAppUsage
/*package*/ Message next;
/** @hide */
public static final Object sPoolSync = new Object();
private static Message sPool;// Message池
private static int sPoolSize = 0;
private static final int MAX_POOL_SIZE = 50;
private static boolean gCheckRecycle = true;
结构一个Message
/**
* Return a new Message instance from the global pool. Allows us to
* avoid allocating new objects in many cases.
*/
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
/**
* Same as {@link #obtain()}, but copies the values of an existing
* message (including its target) into the new one.
* @param orig Original message to copy.
* @return A Message object from the global pool.
*/
public static Message obtain(Message orig) {
Message m = obtain();
m.what = orig.what;
m.arg1 = orig.arg1;
m.arg2 = orig.arg2;
m.obj = orig.obj;
m.replyTo = orig.replyTo;
m.sendingUid = orig.sendingUid;
m.workSourceUid = orig.workSourceUid;
if (orig.data != null) {
m.data = new Bundle(orig.data);
}
m.target = orig.target;
m.callback = orig.callback;
return m;
}
// 重载
public static Message obtain(Handler h) {
Message m = obtain();
m.target = h;
return m;
}
// 重载
public static Message obtain(Handler h, Runnable callback) {
Message m = obtain();
m.target = h;
m.callback = callback;
return m;
}
// 重载
public static Message obtain(Handler h, int what) {
Message m = obtain();
m.target = h;
m.what = what;
return m;
}
// 重载
public static Message obtain(Handler h, int what, Object obj) {
Message m = obtain();
m.target = h;
m.what = what;
m.obj = obj;
return m;
}
// 重载
public static Message obtain(Handler h, int what, int arg1, int arg2) {
Message m = obtain();
m.target = h;
m.what = what;
m.arg1 = arg1;
m.arg2 = arg2;
return m;
}
// 重载
public static Message obtain(Handler h, int what,
int arg1, int arg2, Object obj) {
Message m = obtain();
m.target = h;
m.what = what;
m.arg1 = arg1;
m.arg2 = arg2;
m.obj = obj;
return m;
}
/** @hide */
public static void updateCheckRecycle(int targetSdkVersion) {
if (targetSdkVersion < Build.VERSION_CODES.LOLLIPOP) {
gCheckRecycle = false;
}
}
Message目标状况符号
/**
* Sets whether the message is asynchronous, meaning that it is not
* subject to { @link Looper} synchronization barriers. 异步Message 能够不受屏障的操控
*/
public void setAsynchronous(boolean async) {
if (async) {
flags |= FLAG_ASYNCHRONOUS;
} else {
flags &= ~FLAG_ASYNCHRONOUS;
}
}
// 判别是否在消费
/*package*/ boolean isInUse() {
return ((flags & FLAG_IN_USE) == FLAG_IN_USE);
}
// 符号能够在消费
@UnsupportedAppUsage
/*package*/ void markInUse() {
flags |= FLAG_IN_USE;
}
MessageQueue模块解析
结构函数
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit(); // 初始化底层指针
}
native办法剖析
private native static long nativeInit(); // 初始化
private native static void nativeDestroy(long ptr); // 毁掉
// 能够堵塞的函数
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
nativePollOnce源码
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis); // 依赖native层的Looper class
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
// Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {// 死循环
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;// 数据输出
return ident; // 回来一个int 实例,形似没有用
}
}
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;// 没有数据了
return result;
}
// 只需没有return 就一向调用
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 体系调用等待 epoll_wait
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
" that is no longer registered.",
epollEvents, seq);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock(); // 免除锁
handler->handleMessage(message); //处理音讯
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
AutoMutex _l(mLock);
removeSequenceNumberLocked(response.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
nativeWake源码
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// fd 写音讯 确保线程能持续读
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
Next 获取一个Message剖析
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 调用native层代码,堵塞主
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
enqueueMessage 装入一个Message 实例
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();// 符号音讯状况
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) { // 没有音讯,音讯标志为0 || 头插法刺进音讯
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 刺进一个音讯
for (;;) {
prev = p;
p = p.next;
// 到达最终一个Message 或许 音讯符号比 小
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr); // native层 唤醒线程
}
}
return true;
}
内存释放
@Override
protected void finalize() throws Throwable {
try {
dispose();
} finally {
super.finalize();
}
}
// Disposes of the underlying message queue.
// Must only be called on the looper thread or the finalizer.
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}
音讯屏障能力
/**
* Posts a synchronization barrier to the Looper's message queue.
*
* Message processing occurs as usual until the message queue encounters the
* synchronization barrier that has been posted. When the barrier is encountered,
* later synchronous messages in the queue are stalled (prevented from being executed)
* until the barrier is released by calling {@link #removeSyncBarrier} and specifying
* the token that identifies the synchronization barrier.
*
* This method is used to immediately postpone execution of all subsequently posted
* synchronous messages until a condition is met that releases the barrier.
* Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier
* and continue to be processed as usual.
*
* This call must be always matched by a call to {@link #removeSyncBarrier} with
* the same token to ensure that the message queue resumes normal operation.
* Otherwise the application will probably hang!
*
* @return A token that uniquely identifies the barrier. This token must be
* passed to {@link #removeSyncBarrier} to release the barrier.
*
* @hide
*/
@UnsupportedAppUsage
@TestApi
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
// 刺进msg
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
/**
* Removes a synchronization barrier.
*
* @param token The synchronization barrier token that was returned by
* {@link #postSyncBarrier}.
*
* @throws IllegalStateException if the barrier was not found.
*
* @hide
*/
@UnsupportedAppUsage
@TestApi
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
Linux I/O多路复用
epoll
epoll
全称 eventpoll
,是 Linux I/O 多路复用的其间一个完成,除了 epoll
外,还有 select
和 poll
。
在 Linux 中,任何能够进行 I/O 操作的目标都能够看做是流,一个 文件
, socket
, pipe
,咱们都能够把他们看作流。
epoll
供给的三个函数:
int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
-
epoll_create()
用于创立一个epoll
池 -
epoll_ctl()
用来履行fd
的 “增删改” 操作,最终一个参数event
是告知内核 需求监听什么事情。还是以网络恳求举例,socketfd
监听的就是可读事情
,一旦接收到服务器回来的数据,监听socketfd
的目标将会收到 回调告诉,表明socket
中有数据能够读了 -
epoll_wait()
是 运用户线程堵塞 的办法,它的第二个参数events
承受的是一个 调集目标,假如有多个事情同时发生,events
目标能够从内核得到发生的事情的调集。
Linux eventfd
理解了 epoll
咱们再来看 Linux eventfd
,eventfd
是专门用来传递事情的 fd
,它供给的功用也十分简略:累计计数
int efd = eventfd();write(efd, 1);//写入数字1write(efd, 2);//再写入数字2int res = read(efd);printf(res);//输出值为 3
经过 write()
函数,咱们能够向 eventfd
中写入一个 int
类型的值,而且,只需没有发生 读 操作,eventfd
中保存的值将会一向累加
经过 read()
函数能够将 eventfd
保存的值读了出来,而且,在没有新的值参加之前,再次调用 read()
办法会发生堵塞,直到有人从头向 eventfd
写入值
eventfd
完成的是计数的功用,只需 eventfd
计数不为 0 ,那么表明 fd
是可读的。再结合 epoll
的特性,咱们能够十分轻松的创立出 生产者/顾客模型
epoll + eventfd
作为顾客大部分时候处于堵塞休眠状况(只占用线程资源,不占用cpu资源) ,而一旦有恳求入队(eventfd
被写入值),顾客就马上唤醒处理,Handler 机制的底层逻辑就是利用 epoll + eventfd
Native Handler 相关逻辑
native层的handler是给Java层服务的,供给更高效的堵塞机制
android_os_MessageQueue.cpp
留给读者自己探究
handler.cpp
留给读者自己探究
Looper.cpp
留给读者自己探究
总结
loop循环调用链
Looper#loop()
-> MessageQueue#next()
-> MessageQueue#nativePollOnce()
-> NativeMessageQueue#pollOnce() //留意,进入 Native 层
-> Looper#pollOnce()
-> Looper#pollInner()
-> epoll_wait() // 线程堵塞,让出cpu资源