经过前几章的学习,咱们对
GCD
的运用和行列的原理有了根本的了解,可是GCD
底层到底是怎样拓荒线程,怎样履行函数等还不是很清楚,本章就来一探终究。
系列文章传送门:
☞ iOS底层学习 – 多线程之基础原理篇
☞ iOS底层学习 – 多线程之GCD初探
☞ iOS底层学习 – 多线程之GCD行列原理篇
☞ iO. v Q ]S底层学习 – 多线程之GCb r M s c b )D运用篇
关于GCD
的底层来P S U Q说,首要有行列创立,函数履行,同步异步原理和其他运用函数的原理。关于行列原理的,咱们之前的华章现已讲过,相信关于GCD
是怎样创立行列的,现已有了知道,今日就来持续看其他的底层原理,仍是经过源码来深化研究
同步C D 3 U ( 4 : G履行dispatch_sync
死锁的原因
咱们都知道,当运用dispatch_sync
在串行行列上履行时,会形成dispatch_sync
块使命和内部履行使命的彼此等候,然后形成死锁崩溃。那么我就从这个问题来触发,看一下为什么会形成死锁,然后了解同步履行的原理
仍是老规矩,talk is cheap,show me the code
咱们经过源码找到了dispatch_sync
的调用如下,因为unlikely
一般运转的较少,多为容错处理,所以咱们先跟主流程,终究来到了函数_dispatch_sync_f_inline
中
void
dispatch_sync(dispatche p X K 9 a U 2_queue_t dq, dispatch_block_t work)
{
uintptr_t 4 m k C F dc_flags = DC_FLAG_BLM H 8 h - POCK;
if (unlikely(_dispatch_block m % ` l k 1 [ *_has_private_dataK R Y K 3 ~ m W E(work))) {
return+ X 1 o - | q ! } _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispB ) Q I V Catch_B9 { o K Q _ } Mlock_invoke(- M p c P )work), dc_flags);
}
----------------r b I d q 5 y & .------------------------------------------a y ~ ; a 7-------------------------------
static void
_dispatch_sync_f(dispatch_qu- l H V { Geue_t dq, void *ctxt, dispatch_function_t func,
uin x X 3 j G V # Dtptr_t dc_i _ # V q tflags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
--------------------------? W H s b------------------------------------------------------------5 S 5 3 I---
在_dispatch_sync_f_inline
中发现了一个判别likely(dq->dq_wB 2 f S s jidth == 1
,经过之前行列的原理咱们能够知道,串行行列G Y D b O 6 * m的width
是为1
的,所以串行的履行办法,是在_dispatch_barrier_sync_f
中的。
并且依据函数名,咱们( q Y )能够知道_dispatct = + b 5 / ~ :h_barrier
是之前讲的栅门函数的调用,所} v # k @ / I以说栅门函数也会走到此办法中。
因为咱们先找死锁的原因,所以在这儿就先不看下面并发的逻辑了。
DISPATCH_R g ` { 3A& Q l k b 9LWAYS_INLINE
static inline void
_dispatch_syn+ 0 m e | I }c_f_inline(din 2 | ? Z [ G |s{ G t U 0 3 /patch_queue_t dq, void *ctxt,
dispatch_function_t funs F + A Kc, uintptr_t dc_flags)
{
✅// 串@ ( = k 0 J r W行 来到这儿
if (likely(dq->dq_widT U O g Y Ith == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);r ; 7 M s n
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn6 6 W M K f T I ('t suppL N P 7ort dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// GlobaL 8 fl concurrent queues and queues bound to non-disp0 : ak F }tch threads
// always fall into the slow c( # k & 3 / . base, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
i( k %f (unlikely(!_dispatch_queue_try_reservS f O K , De_sync_width(dl))) {
rA q ( 6 y B #eturn _dispatch_sync_f_slo$ C 0 % b ^ Iw(dl, ctxt, func, 0, dl, dc_h U U m Rfla$ j M Z | Tgs);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_w d qdispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_& D y d 9 5 T Npush_pop(dq, ctxt, func,z k V V dc_flags)));
}
---------------------f c v _--------------r q / G g [----------T $ K E 5 ~------------------------------A Z t i--------------
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctx~ g A dt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
终究,咱们来到了_dispatch_barrier_sync_f_inline
函数中。
首要履行了_dispatch_tX W * = ` #id_self
办法。经过源码盯梢,咱们能够发现其为宏界说的办法,底层首要履行了_dS { & 1 LispatcU t m b P h v y Rh_thread_getspecific
。这个函数书首要是经过KeyValue
的方式来获取线程的一些信息。在这儿I M r k便是获取当时线程的tid
,即唯一ID。
咱们知道,N R z H o x A形成死锁的原因便是串行行列上使命的彼此等候。那么必然会经过tid
来判别是否满意条件,然后找到了_dispav p Y 2tch_queue_try_acquire_barrier_synH t : _ e +c
函数
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())
#define _dispatch_thread_port() ((mach_port_t)(uintptr_t)
_dispatch_thread_getspecific(_PTHREAD_TSD_SLOT_MACH_THREAD_SELF))
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch n * V { t g_function_t func, uintptr_t dc_flags)
{
✅// 获取线程ID -- mach pthread --
dispatch_tid tid = _dispatch_6 s &tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch6 y b a Y , D / o_lane_t dl = upcH U qast(dq)._dl;
// The more correct thing to do would be to merge the qos of the threaf i A i b - Nd
// that just acquired the barrier lockJ _ 6 K K & into the queue5 f M W W state.
//
// However this is too expensive for the fast path, so skip doin + qng itX b m : a e.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, tF t a } jhis threadw $ * 3 W v l may receive a useless overrid- ) | T N ] de.
//
// GK 0 w : ; Klobal concurrent queues and queues bs B 0 ! e 7 Z Lound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
✅// 死锁
if (unlikely(!_dispatch_queue_try_acquirN z Ne_barrier_sync(dl, tid))) {
return _dispatch_sync_fd j g A . [ Z_slow(dl, ctxt, func, DC_( & bFLAG_BARRg | E B j ~IER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _diI T m r p & 8spatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flagsH C 6 [ ~ J C);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_co& ( ( C G Vmplete(dl,E 9 % 7 Z ctxt, func
D^ | 5 _ @ 5 Y 8 )ISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
在函数_dispatch_queue_try_acquire_barrier_sync_and_suspend
中,从该函数咱们能够知道,经过os_atomic_rmw_loop2o
函数回调,从OS底层获取到了状况信息,并回来。
De e , 9 U kISPATCH_ALWAYS_C D p 9 V DINLINE DISPATCHJ P u_WARN_RES~ 4 : 2 s _ULT
static inline bp c u g d J U !ool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_tB t Y ~ _ * I $ tid)
{
returB b | M * $ p I rn _dispatch_queue_trH v ^ T a V . ~y_acquire_barrier_sync_and_suspend(dq._= x ydl, tid, 0);
}
------K h t j o *-----------------------------------------------------------------------------------
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool, ( C [ W : G j P
_q P ! Cdispatch_queue_try_acquire_barrier_sO h s %ync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISM x : a v Q :PATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dip J r ^ &spatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QX p gUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
✅// 从底层获取信息 -- 状况信息 - 当时行列 - 线程
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | r: X F ] h 1 c role)) {
os_atomic_rmw_loop_give_up(br} $ { [eak);
}
n7 l t | 0 4ew_state = value | role;
});
}
那么回来之后,就履行了_dispatch_sync_f_slow
函数。经过下图崩溃堆栈咱们也能够从侧方面验证。
其中经过源码能够发P t 1 0 R 5 ( t c现,首要是生成了一些使命的信息,然后经过_dispatch. h $ G R O_trace_item_push
来进行压栈操作,然后存放在咱们的同步行列中(FIFO),然后完成函数的履行。
_dispatch_^ , ] 0 Lsync_f_slow(dispatc@ b Eh_queue_class_t top_dqu, void *ctb Y O oxt,
d5 P ^ a t 9ih 9 { ospatch_function_t I n -t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t I K W w dc_flags)
{
...
pthread_priority_t pp = _dispatch_get_priority();
sts } y 7 xruct dispa/ 2 a 7 Gtch_G H D s _sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsP R 6 k f kc,
.dc_other = top_dq,
.dZ } l T j h k Oc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucheG v ^ 8r = _voucher_get(),
.dsc_func = func,
.dsc_cF I e + a * ^ @ $txt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trac9 # B {e_item_push(top_dq,c M F Z 4 f ! &dsc);
__DISPATJ ( A z . - 2 kCH_* U F uWAIT_FOR_QUEUE__(@ S L z&N ! 0 D Zamp;dsc, dq);
...
}
那么发生死锁的首要检测就再__DISPATCH_b S ^ 6WAIT_FOR_QUEUE__
这个函数中了,经过检查函数,发现它会获取到行列的状况,w _ + D Q ? H B看其是否为等候状况,然后调用_dq_state_drain_locked_by
中的异或运算,判别行列和线程的等候状况,假如两者都在等候,那么就会回来YES
,然后形成死锁的崩溃。
static void
__DISPATCH_WAIT_FOR_QUEUE__(; _ r k 4 y 5dispatch_sync_context3 H x_t dsc, dispatch_queue_t dq)
{
// 获取行列的状况,看是否是处于等候状况
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlw m [ + M E yikely(_dq_state_drainz q J N &_locked_by(dqg 2 6 [ i E_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
...
}
--2 | m +---------------------------------------------------------------------------------------
static inline bool
_dispatch_lock_is_locked_by(din 5 3 ] h ) P !spatch_lock lock? m ^ 4 2 R 7_val@ p u M T g fue, diq / &spatch_tid tid)
{ /} J L U * } 0 ; D/ lock_vz m S 2 1 X t M RalueP ? k q * ; ) 为行列T I _ P f ? + a状况,tid 为线程 id
// ^ (异或o E v x @ . ~运算法) 两个相同就会呈现 0 不然为1
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
小结一下
-
_dispatch_sync
首要获取当时线程的tid
- 获取到体系底层回来的
status
- 获取到行列的等候状况和tL z X 9 8 ~ * [ Yid比较,假如相同,则表明正在死锁,然后崩溃
block
使命的履行
关于同步使命的block
履行,咱5 – ! B @ S m _ {们在持续跟进之前的源码8 n : { n B G 4_dispatch_sync
源码中_dispatch_barrier_sync_f_inline
函数,观看其函数完成,函数的履行首要是在_dispatch_client_callout
办法中。
DISPATCH_NOINLINE
static void
_dispatch_lane_ba) = [ { 5 /rrier_sync_invoke_and_cZ . @ f _ :omplete(diG ; Kspatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(vy U ~ G noid *dc))
{
_+ n 4 ] = Y ;dispatch_sync_function_invoke_inlineL V z D =(dq, ctxt, func);
...
}
--------------------------------------------X ! P P B U p---------------------------------------------
DISPATCH_AL5 x * ZWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatc7 S q Lh_thX @ ( +read_frame_s dtf;
_dispatch_thread_frame_pug H m N + o W ush(&dtf, dq);
// f(ctxt) -- funcA & b ; m W t 3(ctxt)
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_framex ( ` T ?_pop(&dtf);
}
检查_dispatch_client_callout
办法,里边果然有2 1 S X x函数的调用f(ctxt)` X K;
至此,同# b E d B ? H步函数的block
调用完成
_dispatch_cx 4 O `lient_cE K u _ L ~ E 3allout(void *ctxt, dispatch_fun0 A { o f 2 Action_t f)- & h n
{
_dispatch_get_tsd_base();
void# | 3 7 U . B *u = _dispatch_get_unwind_tsd();
if (likely(!u)) return f(ctxt);
_dispatch_set_unwind_tsd(NULL);
f(ctxt);
_dispatA x y m hch_free_unwind_tsd();
_dispatch_set_unwind_tsd(u);
}
小结一下
同步函数的blocu q ; ; 2 8 5 Qk调用过程
dispatch_sync
└──_dispatch_bw @ % 5 A ! = Narrier_sync_f_inline
└──_dispatch_sP T : . Z } : 3ync_invoke_and_complete
└──_dispatch_sync_function_d h E 9invoke_inline
└──_w n 7 w B ^ 7 t 3dispatch_client_c; h a 0 R a )allout
└──f(ctxt7 ; / u b g j);
异步履行v ` $dispatch_async
看完了同步履行的相关源码,下面咱们来) 1 8 v P看异步的履行就简略多了。
检查其源码,首要履行了两个函数_dispatch_continuation_init
和_disp[ X : q H hatch_contin~ , = v r L s ~uation_async
,下面咱们一个个来看一下
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t wo! r ) Ark)
{J 7 M ] / w ; : S
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch: H = ? ` J 5_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, wo3 [ W ! Drk, 0, dc_flags);
_di# 4 kspatch_con3 F v k :tinuation_async(dq, dc, qos, d; + A x gc->dc_flags);
}
_dispatch_conn E ! jtinuation_init
经过源码咱们可知,这个函数dispatch_qos_t
这个$ X ! g ? n / 5对象,里边的完成必然是对其进c ~ E : d行初始化赋值的操作。
经过_dispatch_Block_invoke
的宏界说,咱们能够发现其对传入的dispatch_block_t
回调参数进行了封装。
DISPA) - # + O jTCH_ALWAYS_I| & A ~ H H a rNLINE
static 8 R = & L inline dispatch_qosF O 7 Z_t
_dispatch_continuationI , % b H E @ (_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispX ~ -atch_block_t work,
dispatch_block_flags_t flaP ) * q f L R C 8gs, uintk t T 2 - { I rptr_t dc_fZ Z R 4 ( L V rlags)
{
voidV W . j . *ctxt = _dispa^ 4 ~tch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = cT ( z _ ) V F V Otxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
diS & { 8 Ispatch_functiov 3 L u f in_t func = _dispatch_Block_% U x 2 1invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_reln k ( 1 0 n } dease;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
-----------------------------------------------------------------------------------------
#define _dispatch_Block_i{ @ W Cnvoke(bu , p u G I L 8b)
((dispatch_function_t)((struct Block_layout *)bb)->invj B ! . a p Coke)
终究的封装会在_disp? N R X P H x katch_continuation_init_f
中,其代码也十分的简略,仍旧是函数式保存的赋值的相关操作,对回调等也* s ) @ s进行了封装保存。
而进行封装保存的意义也很简略:因: u 3为异步需要在适宜的时机进行线程回调block
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dcL ! h,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATE: ) + Y e P c d (D;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_By ^ } s ) b q [LOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has o} _ s ( T , 3ne
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_Z ! G 1 `continuation_voucher_set(dc, flags);
return _dispatch_continuation_prK = !iority_set(dc, dqu, pp, flagD / z 8 F - L u .s);
}
_dispatch_continuationo R v 9_async
咱们知道了上一步对信息进行函数式封装,那么关于一个异步履行来说,最重要的便是何时创立线程和函数履行呢,那么就再这个办法里边了。
检查办法,发现r : i 6完成十分的简略,可是越 5 U简略的东西,其内里就越复杂。这个办法首要便是履行了dx_push
办法,检查其代码,发现为宏界说,首要履行了dq_push
办法.
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_contiY ^ B y r Z unuation_t dcq C * J _ u 5 4, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INy y 5 s 3 C `TROSPECTION
if (!(dcy T % C y_flags & DC_FLA+ ^ Y , , V $G_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flaB _ s N q cgs;
#endif
return dx_pu) ^ k e )sh(dqu._dq,p T W * v @ dc, qos);
}
------------------S 8 ` 2 : . L g---------------------------------------{ f F 2 p 4 ^ K--------z s ~------------------------
#define dx_push(x, y, z) dx_vtable(& ~ . t $x)->dq_push(x, y, z)
那么dq_push
又是怎样赋值的呢,因为其是一个属性,所以咱们能够查找.dq_pus
来检查其赋值Y u N。咱们发现其赋值的地方十分多,可是大体的意思咱们r i V能够了解,便是首要在根行列,自界说行列,主行列等等进行push
操作的时分调用。
咱们知道线程的创立一般都是z ? k u } i在根行列上进行创立的,所以_ [ I _咱们直接找根行V j } i ( T q列的dq_push
赋值,这r k ^ * b E B样比较快速,当然其他的也能够,终究都会走到这A w u ~ r S N儿。
咱们发现_dispatch_root_queue_push
办法终究会调用_dispatch_root_queue_push_inline
办法,而_dispatch_rw Z F A 4 Hoot_queue_push& ; X { +_inline
办法终究又会调用_dispatch_root_queuS v Fe_F + ` H Rpoke
。
_O ! $dispatch_root/ 1 b i L )_queue_poke
这个函数首要进行了一些容错的判别,终b i ^ P @ `究走到了_K o u Rdispatch_root_queue_poke_slow
相关的办法里
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_def? V u X L 2erry n K I ^ w * Zed3 % } 1 f P_items_t ddi = _dispatch_deferred_items_get();
if (un, [ b } M e .likely(ddi && ddi->ddi_S 7 k $can_stash))
...一些不重要的操( } C 9 c 1作 ...
#endif
#if HAVE_PTHREAD_WOO h p J E ]RKQUEUE_QOS
if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}
#else
(voidM D l o : j)qos;
#endif
_dispatch_root_queue_push_inline(rq, dou, dou, 1w * 8 p 7 A ; c w);
}X k A W v G S
----------------------------------------------------? ^ R-------------------------------------
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_gloz 4 A T S g , bal_t dq,
dispatch_object_t _head, dispatch_] I m b uobject_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(oE A Us_mpsc_push_lin { yst(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_J 7 = U W = 5poke(dq, n,= V K J 6 U ( w { 0);
}
}
------------------------------------------------------` @ ~ D 2 *------j Y + t o s ------------------------------
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, in= p O U ^ { xt n, int floor)
{
if (!- ] ~ [_dispatch_6 J - s -queue_class_pro a u x =obe(dq)) {
reX D w |turn;
}
#if !DISPATCH_USp p ( _ J ( + ) mE_INTERNAL_WORKQUO = 6 T o J * 9EUE
#if DISPATCH_USE_PTHREAD_POOL
if (lA 9 r jikely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
{
if (unlikely(!7 F T c T Wos_atomic_cmpxchg2o(dq, dgq_penB c Z s n a b W 5ding6 I W } z 4 M `, 0, n, relaxed))) {
_dispatch_root_queue_debug("worker thread request still pend` l R Q cing "
"for global qux T w ;eue: %p= l [ 9 q I ! Q", dq);
return;
}
}
#endif // !DISPATCH_USE_INB I D ~ 9TERNAL_WORKQUEUI t NE
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
_dispatch_root_queue_poke_slow
这个办法便是异步履行的首要办法,创立线程也是在此,2 f ( _ 1 H z *因为代码比较长,咱们仍是寻找代码中的关键节点来讲。
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
...
✅//行列初始化,runtime强转等操作,避免类型无法匹配J B j Z H m等状况
_dispatch_root_queues_init();7 } 5 4
_dispatch_debug_roo! a S + n { zt_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
...
int can_request, t_count;
// seq_cst with atomic store t; 1 ko tail <rdar://problem/16932833>
✅// 获取线程池的巨细
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
✅// 核算能够恳求的数量
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pm # Wool reducing request from %d to %d",
remain D + /ning,O 4 - [ can_request);
os_atomic_sub2o(dq, dgq_pending, remainingf N r f _ K ^ | - can_request, relaxed);
reh # 8 ~ m 8 V Wmaining = can_request;
}
if (remaining == 0) {
// 线程池无可用将会报_ W & ^ E v m O错
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomiU N 5 c_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
pthread_attr_t *attr = &aV f q Qmp;pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THRE^ % Q N + ` 0 2AD && DISPATS ^ F 5 h H = ,CH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_I F k u F ] V *root_queue)) {
pthr = _dispatch_mgr_root_queuw r T R / H D &e_init();
}
#endif
do {
_dH s $ t [ O J -ispa: _ T Z 7 y ktch_retain(dq); // released in _dispatch_worker_thread
✅✅✅//拓荒线程✅✅✅
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (O C Y R } S ( ^ =r != EAGAIN)= n e 0 * {
(void)dispatch_assume_zerv ) 4 ] ; _o(C h g Sr);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else
(voi+ # @ ;d)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
---8 J H Y--------------------------------------------------------------------------G 4 H v Y Z M c ;------------
#define _dispatch_tf L J urace. ` y_runtime_event(evt, ptr, value)
_dispatch_introspection_runtime_event(
dispatch_introspection_runtime_event_##evt, ptr, valueX p 1 Z)
依据代码能够知道,体系会获取线程池总数量和能够创立的数量,然后经过两个do while
来进行动态的拓荒线程。
单例dispatch_once
经过dispatch_once
函数检查其底层调用,能够发现其终究调用到dispatch_once_f
办法中。相关的代码如下。
- 首要咱们知道
val
一3 b = u 6 I开始为NULL
,并将其转化为dispatch_once_gate_t
- 经过检查
_dispatch_once_gate_tryenter
源码,咱们知道其z G x & O F在OS底层经过判别l->dgo_once
是否为DLOCK_ONCE_UNLOCKED
状况 - 假如成立,则会履行
_dispatch_once_callA ` y 6out
函数。履行对应的blockU + l O ~,然后将l->dgo_once
置为DLOu @ ; H sCK_ONCE_DONE
,然后保证了只履行一次
DISPATCH_NOINLINE
void
dispatce . E F e ( #h_oncr k E C 5 s Qe_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
// 假如你来过一次 -- 下次就a U n i不来
dispatch_once_gate_t l = (dispa Z % &atch_once_gate_t)val;
//DLOCK_ONCE_DONE
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPAT9 , 3 0 m e P XCH_ONCE_USE_Qt M 9 j 4 0 o , $UIESCENT_COUQ E A Q %NTER
uintptr_t v = os_atomic_loaz W + k -d(&l->dgo_once, acqu` - j ~ r { Z =ire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTERJ X | n | # | {
if (likelyu @ ) _(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_on_ $ Oce_mark_done_if_quiesced(l, v);
}
#endif
#endif
// 满意条件 -- 企图进去
if (_dispatch_once] z T m g A Y 4 t_gate_tryenter(l)) {+ 9 s
// 单利调用 -- v->DLOCK_ON} G e , o # r )CE_DONE
return _dispatch_oncK l r E + - ee_callout(l, ctxt, func)R @ 9 ,;
}
return@ ; o I E _dispatch_oncu y [e_wait(l);
}
-----------------------------------------------, O + ~------------------------------------------
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_= Y |t l)
{
// os 对象是否存储过
// unlock
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uin, + ~ L [ Z vtptr_t)_dispatch_lock_value_for_selT : Xf(), relaxed);
}
-----------------------------------------------------w ; Y P | n )--------------------------L I L L 9 C x (--} B a [--------
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_functi! q / Z * yon_t func)
{
// block()
_v I @ ) , @ { *dispatch_client_E X 5callout(ctxt` p T 8, func);
_dispatch_once_gate_broadcast(l);
}
信号量dispatch_semaphoreT n 2
dispatch_semaphore_create
这个办法比较清晰,便是函数式保存,转化: # s ^成了dispatch_semaphori | % Xe_tx s . e [
对象。信号量的1 = | | R P $ *处理都是基于此对象来进行. d Q : C _ c的。
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
/z X n y E/ 假如 value 小于 0 直接回来 0
ic c M (f (value < 0) {
return DISPAT- 7 C / V : { ^CH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s)* i ]);
dsema-&gg / r Rt;d5 1 W ; -o_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(* m Z = @ y !false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema-&g0 6 A [ ; z [t;dsema_sema, _DSEMA4_POLICY_FIB / 2 4 OFO);
dsema->dsema_orig = value;
retuw Y @ Zrn dsema;
}
dispatch_2 b W 7 Qsemaphore_wait
wait
函数首要进行了3步操作:
- 调用
os_atomic_dec2o
宏。经过对这个宏的检查,咱们发现其便是一个对dsema
进行原子性的-1
操作 - 判别
value
是否>! : X R y S .;= 0
,假如满意条件,则不堵塞,直接履行 - 调用
_d_ + ; Wispatch_semaphore_waj W 2 o 0it_slow
。经过源码,咱们K V o $能够发现其对_ ` k U : Dtimeout
的参数进行了分其他处理
long
dispatch_semaphore_wait(dispatch_semaphoret j / l ^ ) v_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema,7 M ( = w _ 2 dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
#define os_atomic_dec2o(p, f, m)
os_atomic_sub2o(p, f, 1, m)
#define os_atom| u T _ ` - & iic_sub2o(p, f, v, m)
os_atomic_sub(&(p)->B i 2 ~ R L wf, (v), m)
#define os_atomic_sub$ g i(pC ~ l o U 4 K G, v, m)
_os_atomic_c11_op((p), (v), m, sub, -)
_dispatch_semaphore_wait_slow
函数的处理如下:
-
dR A _ i S h Nefault:
首要调用了_dispatch_sema4_tim$ { ) yedwait
办法,这个办法首要是判别当时的操作是否超越指定的超时时刻。 -
DISPATCH_TIME_NOW
中的while
是一定会履行的,假如不满意条件,现已在之前的操作跳出了,不会履行到此。if
操作调用os_! ~ n v H ]atomic_cmpxchgvw2o l . H R
,会将value
进行+1,跳出堵塞,并回来_DSEMA4_TIMEOUT
超时 -
DIL } S D ; K , }SPATCH_TIME_FOREVER
中即调用. r N ^_dispatch_sema4. = Z _ t s g_wait
,表明会一向堵塞,知道比及single
加1变为0停止,跳出堵塞
DISPATCH_NOINLINE
static long
_dispatch_sem] O # 4 l daphore_wait_slow(dispaD - H s M ; htch_semaphore_t dsema,
dispatch_time_t timeout)
{
lo2 p 9 ( 5 j ng orig;
_dispat] d ^ Vch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:= / ; { p ) C
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_! 8 & ( 9 _ b |atomic_cmpxchgvw2o(dsema, dsema_value, or; % F m 4 {ig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_s( | ` O e ` yignal().
// Fall through and0 : ) E drain the wakeup.
case DISPAT2 P kCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
retus , c ) Z T Z m Qrn 0;
}M & t ( ` B . 3 m
dispatch_semaphore_signal
了解了wait
之后,对sig0 L 2 snal
的了解也很简略。os_atomic_inc2o
宏界说便是对dsema
进行原子性+1
的操作,假如大于0,则持续履行。
long
dispaj r : { k = z L %tch_semaphore_signal(dispatch_semaphore_t dsema)
{
// 取值 + 1 == 0 + 1 = 1
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)G j f e 4 @ m S 3) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanm g = |ced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal0 7 H ^ 1_slow(dsema);
}
总结一下信号的底层原理:
信号量在初始化时要指定 value,随后内部将这个 v` H C J 1 1alue 进行函数式保存。实际操作时会存两个 value,一个是当时的valum } – te,一个是记录初始 value。信号的 wait 和 signal 是互逆的两个操作,wC $ + % a :ait进L m + 1行减1的操作,single进行} f K #加1的操作。初始 value 必须大于等于 0,假如为0或者小于0 并随后调用 wait 办法,线程将被堵塞直到其他线程调用了 signal 办法
调度组dispatch_groN & Q $ +up
其实dispatch_group
的相关函数的底层原理和信号量的底层原理的思维是相同的。都是在底层保护了一个value
的值,进组和出组操作时,对value
的值进行操作,到达0这个临界值的时分,进N * M } P N ; l A行后续的操作。
dispatch_group_create
和信号量相似,创立组后,对其进行了函数式保存dispatchu F q K @ % +_group_te v ( a
,并经过os_atomic_store2o
宏界说,内部保护了一个value
的值
dispatch_group_t
dispatch_group_crez x G P ate(void)
{
return _dispatch_grM C ?oup_create_with_count(0);
}
-----------------------------------------------------------------------------------------
DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_cra g jeate_wit9 H *h_count(uint32_t n)
{
dispatch_group_t dg = _dispa0 O T Ftch_objeX @ 5 xct_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_groug ` N Y ] # E a vp_s));
dg->do_nex/ * ! } ) i _ Xt = DISPATCH_OBJECT_LISTn O p E J N z .LESS;
dg-&2 Q r ~ { x }gt;do_targetq = _dispatch_get_default_queue(false2 O + e T C W k);
if (n) {
os_atomic_store2o(dg, dg_bits,
-n * DISPATCH; Q L_u / ~GROUP_VALV [ 9 mUE_INO ` S 0 | H F ( *TERVAL, relaxed);
os_atomic_stf K * L @ Y _ (ore2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
rG 5 = ` 2 _eturn dg;
}
d; * w 7ispatch_group_enter
经过源码,咱们能5 } ,够知道进组操作,首要是先经– z a }过os_atomic_sub_orig2o
宏界说,对bit
进行了原子性减1的操作,然后又经过位运算& DISPATCH_GROUP_VALUE_MASK
取得实在的value
void
dispatch_group_enter(dispatch_group_t d9 b h p 0 ^ ,g)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to theW W w / 5 upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_van 7 q ^ c c !lue = old_bits & DD K H v R k K ` HISPATCH_GROUP_VALUE_MASK;
if (unl@ 5 +ikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikec p : Uly(old_value == DISPATCH_GROUP_VALB a m o i | TUE_MAX)) {
DISPATCH_CLIENT_CRASH(o@ n r $ Zld_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
dispatch_group_leave
出组的操作即经过os_atomic_add_orig2$ F O Y C Co
的对值进行原子性的加操作,并经过& DISPATCH_GROUP_V[ 2 ` |ALUE_MASK
获取到实在的value
值。假如新旧两个值持平,则履行_dispatch_group_wake
操作,进行后% | 7 A S & s续的操作。
void
dispatch_group_leave(di_ R : c Y 0 # qspatchk _ j v_group_t dg)
{
// ThY } y . . C z / ye value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition incrementX G Z O l a v qs thz e Y ) T | Fe generation atomically.
uint64_t new) v t X e_state, old_B Y p # a vstate = o= 7 & V @s_aX 4 5 ttomic_add_orig2oH G M ^ !(dg, dg_p q ) Tstate,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old~ w - n ; N N [ d_value == DISPATCH_GROUP_VALU G T ~ xE_y ; P %1)) {
old_state += DISPATCH_GROUP_VAL+ | ( 3 M P v 8 fUE_INTERVAL;
do7 ~ V h @ 8 {
new_state = old_stt F # 9 v ( Pate;
if ((old_state & DISPATCH_G4 A }ROUP_VALK 5 yUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_v 2 ( O v $state &= ~DISPATCH_GROUP_HAS_NOTIFS;K l B r v = I F l
} else {
// If the group was entered again since the atomic_adM V m M , id above,
// we can't c( @ V ! +lear the waiters bit anymore as we don't kQ x v } k gnow for
// which generat} L K Z lion the waiters are f^ - a qor
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmp3 @ o y H 3xchgv2o(dg, dg_state,
o} + S a O b a P =ld_state, new_state, &$ ) w % 1 h m K uamp;old_state, relaxed)));
return _dispatch_group_o h O y jwake(dY N 8 c q Gg, old_state, true);
}
if (unlike/ 5 o e & 9 : g )ly(old_value == 0))^ F V b y 2 {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
dispatch_gro7 L K 4 Cup_async
dispatch_group_async
函数便是对enter
和leave
的封装。经过代码能够看出其和异步调用函数相似,都对block进行a n A d 2 : R的封装保存。然后再内部履行的时分,手工调用了 B [ ~dispatch_group_enter
和dis( v { _ f b m + patch_group_leave
办法。
void
dispa1 _ ? B Xtch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatcho B 2 p t E @ j __cf 2 x (ontinuation_alloc();
uintptr_t dc_flags = DC_FLB K o (AG_CONSUME_ s : S e b e | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
// 保存使命
qos = _dispatch_conto t @ g * linuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuo h }ation_gM V , ? b C a 7 kroup_async(dg, dq, dc, qos);
}
static iny x ^ dline void
_dispatch_continuation_u ! -group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_cont3 | ) n 1 & 3 e =inuation_t dc, dispatch_qos_t qos)
{ // 进组
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
static inline void
_dispatch_continuation_witht ? ( ? N !_group_invoke(dispatch_continuation_t dc)
{
struI - ` S h j !ct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if7 Q Z a q D T B X (type == DISPATCH_6 g N 3GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
// 出组
dispatch_group_leave((dispatch_group_t)[ J w * ^ , D t 7dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
dispatch_group_notify
经过源码,咱们能够发现,经 , { | S Y过调用os_atomic_rmw_loop2o
在体系内核中获取到对应的状况,终究仍是调用到了_dispatch_group_wake
DISPATCH_ALWAYS_INLINE
static i; k 7 2 2 ~nline void
_dispatch_group_nW 4 C Z -otify(dispatch_group_t dg, dispatch_queue_t dq,
dispN I : 1atch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->p b Z 3 s % n 4 r;dc_data = dq;
_disp| / k ?atch_retain(dq);
prev = os_mpsc_push_upd[ H G b { Sate_tail(os_mpsc(dg, dg_nog / itify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg9 = V o 7_notify), prev, dsn, do_next);
if (os_mpsc_push_was_emp; k n 2 p R Rty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_H g b PAS_NOTIFS;
if ((uint[ _ [ G s ? g j32_t)oldF $ / _state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_+ Q ?state, false);
});
}
});
}
}
_dispatch_group_wake
这个函数首要分为两部分,首要循环调用 semap; l m X M h r }hore_signal
告知唤醒当初等候 group 的信号量,因此 dispatch_gro& - E # mup_wait
函数得以回来。
然后获取链表,顺次调用 dispatch_a` o { V I ? { _sync_f
异步履行在 notify
函数中注册的回调。
DISPATCH_NOINLINE
static voidt # 1 ~ d t u
_r 8 y } w M J 6dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCHn i B x *_GROUP_HAS_NOTIFS) {
dispatc7 B f t xh_continuac U f N [ tion_t dc, nextZ _ { o t P_dc, tailO L r = A c @ b A;
// Snapshot before anything is notified/woken <rdav @ S F y 3 O Ar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_cd 5 & ! j Y Vontinuation_async(dsn_queue, dc,
_diT 7 F ( X tspatchX 9 R Y s 8 ~ O :_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
rex ; k n a h sfs++;
}
if (dg_state &q d 2amp; DISPATCH_GROUP_HAS_WAITERS) {
_dis& ] / V ! Gpatch_wake_by_address(&amU : t T cp;dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
总结
-
disE C E m ) C t -patch_sync
将使命block
经过push
到行列中,然后按照FIFO
去履行。 -
dispatch_sync
形成死锁的首要P R / N g原因是堵塞的tid
和现在运转的tid
为同一个 -
dispatch_async
会把使命包装并保存,之后就会拓荒相应线程去履行已保存的使命。 -
semaphon Z 7 @ K w Are
首要在底层保护一个value
的值,运用signal
进行+ +1
,wait
进行-1
。假如v( _ H alue
的值大于或者等于0,则取消堵塞,不然依据timeout
参数进行超时判别 -
dispatch_group
底层l T H ? z t也是保护了一个value
的1 . C & ( N _ T ,值,等候group
完成实际上便是等候value
康复初始值。而notify
的作用是将一切注册的回调组装成一个链表,在dispatch_as? r & % v o 4 F Pynco . ~ W X 4 %
完成时判别value
是不是q t c L康复初始值,假如是则调用dispatch_async
异步履行一切注册的回调。 -
dispatch_once
经过一个静态变量来符号block
是否已被履行,同时运用加| X | ( L [ _ 8 &锁保证只要一个线程能履行,Z h B 0履行完block
后会唤醒其他一切等候的线程。
参考资料
libdispatch
深化了解GCD
发表回复
要发表评论,您必须先登录。