Следующий код основан на swool 4.4.5-alpha, php 7.1.26
Мы анализируем реализацию всего процесса шаг за шагом в соответствии с процессом выполнения. Программа PHP выглядит следующим образом:
Go на самом деле является псевдонимом swool ﹣ сопрограмма ﹣ создать:
PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create);
Во-первых, для создания процесса будет выполнена сопрограмма ZIF [swool] [создать].:
//Functions actually executed
PHP_FUNCTION(swoole_coroutine_create)
{
...
//Analytic parameters
ZEND_PARSE_PARAMETERS_START(1, -1)
Z_PARAM_FUNC(fci, fci_cache)
Z_PARAM_VARIADIC('*', fci.params, fci.param_count)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);
...
long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
if (sw_likely(cid > 0))
{
RETURN_LONG(cid);
}
else
{
RETURN_FALSE;
}
}
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
//Save anonymous function parameters and execution structure
php_coro_args php_coro_args;
php_coro_args.fci_cache = fci_cache;
php_coro_args.argv = argv;
php_coro_args.argc = argc;
Save_task (get_task()); // save the PHP stack to the current task
//Create coroutine
return Coroutine::create(main_func, (void*) &php_coro_args);
}Основные аргументы PHP-это структура, используемая для хранения информации о функциях обратного вызова:
//Save the structure of go() callback
struct php_coro_args
{
Zend? FCall? Info? Cache * FCI? Cache; // anonymous function information
Zval * argv; // parameter
Uint32_t argc; // number of parameters
};Php_corine:: get_task() используется для получения текущей выполняемой задачи. При первом выполнении он получает инициализированную задачу main_task:
php_coro_task PHPCoroutine::main_task = {0};
//Get the current task, otherwise it is the main task
static inline php_coro_task* get_task()
{
php_coro_task *task = (php_coro_task *) Coroutine::get_current_task();
return task ? task : &main_task;
}
static inline void* get_current_task()
{
return sw_likely(current) ? current->get_task() : nullptr;
}
inline void* get_task()
{
return task;
}Задача сохранения сохранит текущую информацию о стеке PHP в используемой в данный момент задаче. Текущая задача является основной задачей, поэтому информация будет сохранена в основной задаче:
void PHPCoroutine::save_task(php_coro_task *task)
{
Save VM stack (task); // save PHP stack
...
}
inline void PHPCoroutine::save_vm_stack(php_coro_task *task)
{
task->bailout = EG(bailout);
Task - > VM stack top = eg (VM stack top); // current stack top
Task - > VM ABCD stack end = eg (VM ABCD stack end); // stack bottom
Task - > vm_stack = eg (vm_stack); // the whole stack structure
task->vm_stack_page_size = EG(vm_stack_page_size);
task->error_handling = EG(error_handling);
task->exception_class = EG(exception_class);
task->exception = EG(exception);
}Эта структура используется для сохранения стека PHP текущей задачи:
struct php_coro_task
{
JMP buf * bailout; // internal exception use
Zval * vm_stack_top; // stack top
Zval * vm_stack_end; // stack bottom
Zend VM stack VM stack; // execution stack
size_t vm_stack_page_size;
zend_execute_data *execute_data;
zend_error_handling_t error_handling;
zend_class_entry *exception_class;
zend_object *exception;
zend_output_globals *output_ptr;
/* for array_walk non-reentrancy */
php_swoole_fci *array_walk_fci;
Swoole:: coroutine * Co; // which coroutine does it belong to
std::stack *defer_tasks;
long pcid;
zend_object *context;
int64_t last_msec;
zend_bool enable_scheduler;
}; После сохранения текущего стека PHP вы можете приступить к созданию сопрограммы:
static inline long create(coroutine_func_t fn, void* args = nullptr)
{
return (new Coroutine(fn, args))->run();
}
Coroutine(coroutine_func_t fn, void *private_data) :
CTX (stack size, FN, private data) // default stack size 2m
{
CID = + + last_cid; // assign the process ID
Coroutines [CID] = this; // the current object pointer is stored on the global coroutines static attribute
If (SW ﹣ unlikely (count() > peak ﹣ Num)) // update peak value
{
peak_num = count();
}
}Во-первых, будет создан объект ActX, и объект контекста в основном используется для управления стеком C
#define SW_DEFAULT_C_STACK_SIZE (2 *1024 * 1024)
size_t Coroutine::stack_size = SW_DEFAULT_C_STACK_SIZE;
ctx(stack_size, fn, private_data)
Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
fn_(fn), stack_size_(stack_size), private_data_(private_data)
{
End = false; // mark whether the process has been completed
swap_ctx_ = nullptr;
Stack (char *) sw_malloc (stack_size "; // allocate a piece of memory to store stack C, 2m by default
...
Void * SP = (void *) ((char *) stack "+ stack" [size]; // the stack top address is the highest address
CTX = make ﹐ fcontext (SP, stack ﹐ size ﹐ (void (*) (IntPtr ﹐) & context ﹐ func); // build context
}Функция make context предусмотрена в библиотеке boost.context и написана сборкой. Разные платформы имеют разные реализации. Мы используем файл make x86 64 SYSV elf gas. С здесь:
Регистры, используемые для передачи параметров: RDI, RSI, RDX, RCX, R8, R9
make_fcontext:
/* first arg of make_fcontext() == top of context-stack */
/* rax = sp */
movq %rdi, %rax
/* shift address in RAX to lower 16 byte boundary */
/*Rax = rax & - 16 = > rax = rax & (~ 0x10000 + 1) = > rax = rax - rax% 16, in fact, align by 16*/
andq $-16, %rax
/* reserve space for context-data on context-stack */
/* size for fc_mxcsr .. RIP + return-address for context-function */
/* on context-function entry: (RSP -0x8) % 16 == 0 */
/*Lea is the abbreviation of "load effective address",
In short, the lea instruction can be used to assign a memory address directly to the destination operand,
For example, Lea eax, [ebx + 8] assigns the value of ebx + 8 directly to eax, rather than the data in the memory address at ebx + 8 to eax.
The MOV instruction, for example: mov eax, [ebx + 8] assigns the data at the memory address ebx + 8 to eax. * /
/*Rax = rax - 0x48, 0x48 bytes reserved*/
leaq -0x48(%rax), %rax
/* third arg of make_fcontext() == address of context-function */
/*Context_func function address is placed at rax + 0x38*/
movq %rdx, 0x38(%rax)
/* save MMX control- and status-word */
stmxcsr (%rax)
/* save x87 control-word */
fnstcw 0x4(%rax)
/* compute abs address of label finish */
/*
https://sourceware.org/binutils/docs/as/i386_002dMemory.html
The x86-64 architecture adds an RIP (instruction pointer relative) addressing.
This addressing mode is specified by using 'rip' as a base register. Only constant offsets are valid. For example:
AT&T: '1234(%rip)', Intel: '[rip + 1234]'
Points to the address 1234 bytes past the end of the current instruction.
AT&T: 'symbol(%rip)', Intel: '[rip + symbol]'
Points to the symbol in RIP relative way, this is shorter than the default absolute addressing.
*/
/* rcx = finish */
leaq finish(%rip), %rcx
/* save address of finish as return-address for context-function */
/* will be entered after context-function returns */
/*The address of finish function is placed at rax + 0x40*/
movq %rcx, 0x40(%rax)
/*return rax*/
ret /* return pointer to context-data */
finish:
/* exit code is zero */
xorq %rdi, %rdi
/* exit application */
call [email protected]
hltПосле выполнения функции make context схема памяти, используемая для сохранения контекста, выглядит следующим образом:
/****************************************************************************************
* |<- ctx_
---------------------------------------------------------------------------------- *
* | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | *
* ---------------------------------------------------------------------------------- *
* | 0x0 | 0x4 | 0x8 | 0xc | 0x10 | 0x14 | 0x18 | 0x1c | *
* ---------------------------------------------------------------------------------- *
* | fc_mxcsr|fc_x87_cw| | | | *
* ---------------------------------------------------------------------------------- *
* ---------------------------------------------------------------------------------- *
* | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | *
* ---------------------------------------------------------------------------------- *
* | 0x20 | 0x24 | 0x28 | 0x2c | 0x30 | 0x34 | 0x38 | 0x3c | *
* ---------------------------------------------------------------------------------- *
* | | | | context_func | *
* ---------------------------------------------------------------------------------- *
* ---------------------------------------------------------------------------------- *
* | 16 | 17 | | *
* ---------------------------------------------------------------------------------- *
* | 0x40 | 0x44 | | *
* ---------------------------------------------------------------------------------- *
* | finish | | *
* ---------------------------------------------------------------------------------- *
* *
****************************************************************************************/
После создания экземпляра объекта сопрограммы метод run выполнит метод run. Метод run сохранит объект сопрограммы последнего связанного метода в источнике и установит current в качестве текущего объекта:
static sw_co_thread_local Coroutine* current;
Coroutine *origin;
inline long run()
{
long cid = this->cid;
Origin = current; // orign saves the original object
Current = this; // set current as the current object
CTX. Swap in(); // swap in
...
}Далее следует основной метод переключения стека C, подкачки и подкачки. Нижний слой также предоставляется библиотекой boost.context. Во-первых, давайте посмотрим на переключатель в:
bool Context::swap_in()
{
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
return true;
}
// jump_x86_64_sysv_elf_gas.S
jump_fcontext:
/*The current register is pushed into the stack. Note that there is actually a rip above RBP, because call jump ﹣ fcontext is equivalent to push rip, JMP jump ﹣ fcontext*/
/*Rip holds the next instruction to be executed. In this case, it is the return true after jump ﹐ fcontext*/
pushq %rbp /* save RBP */
pushq %rbx /* save RBX */
pushq %r15 /* save R15 */
pushq %r14 /* save R14 */
pushq %r13 /* save R13 */
pushq %r12 /* save R12 */
/* prepare stack for FPU */
leaq -0x8(%rsp), %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 1f
/* save MMX control- and status-word */
stmxcsr (%rsp)
/* save x87 control-word */
fnstcw 0x4(%rsp)
1:
/* store RSP (pointing to context-data) in RDI */
/** swap? CTX? RSP, save the top of the stack*/
movq %rsp, (%rdi)
/* restore RSP (pointing to context-data) from RSI */
/*RSP = CTX UU, which points the current execution stack to the stack just built through make ﹣ fcontext*/
movq %rsi, %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 2f
/* restore MMX control- and status-word */
ldmxcsr (%rsp)
/* restore x87 control-word */
fldcw 0x4(%rsp)
2:
/* prepare stack for FPU */
leaq 0x8(%rsp), %rsp
/*Restore the register to the value pushed in from the new stack. This time, it is still empty*/
popq %r12 /* restrore R12 */
popq %r13 /* restrore R13 */
popq %r14 /* restrore R14 */
popq %r15 /* restrore R15 */
popq %rbx /* restrore RBX */
popq %rbp /* restrore RBP */
/* restore return-address */
/*R8 = make ﹣ fcontext (look up at the memory layout after make ﹣ fcontext)*/
popq %r8
/* use third arg as return-value after jump */
/* rax = this */
movq %rdx, %rax
/* use third arg as first arg in context function */
/* rdi = this */
movq %rdx, %rdi
/* indirect jump to context */
/*Execute context func*/
jmp *%r8Исходная компоновка стековой памяти после выполнения jump ﹣ fcontext выглядит следующим образом:
/**************************************************************************************** * |<-swap_ctx_ * * ---------------------------------------------------------------------------------- * * | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | * * ---------------------------------------------------------------------------------- * * | 0x0 | 0x4 | 0x8 | 0xc | 0x10 | 0x14 | 0x18 | 0x1c | * * ---------------------------------------------------------------------------------- * * | fc_mxcsr|fc_x87_cw| R12 | R13 | R14 | * * ---------------------------------------------------------------------------------- * * ---------------------------------------------------------------------------------- * * | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | * * ---------------------------------------------------------------------------------- * * | 0x20 | 0x24 | 0x28 | 0x2c | 0x30 | 0x34 | 0x38 | 0x3c | * * ---------------------------------------------------------------------------------- * * | R15 | RBX | RBP | RIP/return true | * * ---------------------------------------------------------------------------------- * * * ****************************************************************************************/
Функция контекста имеет параметр. Это записанное в RBI после выполнения контекста перехода будет использоваться в качестве параметра функции контекста. FN, личные данные-это параметр, передаваемый при построении CTX:
void Context::context_func(void *arg)
{
Context *_this = (Context *) arg;
_this->fn_(_this->private_data_); // main_func(php_coro_args)
_this->end_ = true;
_this->swap_out();
}
Основная функция выделяет новый стек выполнения для текущего процесса, привязывает его к только что созданной сопрограмме, а затем выполняет функцию обратного вызова процесса:
void PHPCoroutine::main_func(void *arg)
{
...
//Create a new vmstack on eg to execute the callback function in go(). The previous execution stack has been saved on the main task
vm_stack_init();
call = (zend_execute_data *) (EG(vm_stack_top));
task = (php_coro_task *) EG(vm_stack_top);
Eg (VM? Stack? Top) = (zval *) ((char *) call + PHP? Coro? Task? Slot * sizeof (zval)); // reserve a place for task
Call = Zend? VM? Stack? Push? Call? Frame (call? Info, func, argc, object? Or? Called? Scope); // allocate stack space for parameters
EG(bailout) = NULL;
EG(current_execute_data) = call;
EG(error_handling) = EH_NORMAL;
EG(exception_class) = NULL;
EG(exception) = NULL;
Save VM stack (task); // save vmstack to the current task
Record? Last? Msec (task); // record the time
task->output_ptr = NULL;
task->array_walk_fci = NULL;
Task - > co = coroutine:: get Fu current(); // record the current coroutine
Task - > co - > set task ((void *) task); // coroutine is bound to the current task
task->defer_tasks = nullptr;
Task - > pcid = task - > co - > get \ origin \ cid(); // record the last process ID
task->context = nullptr;
task->enable_scheduler = 1;
if (EXPECTED(func->type == ZEND_USER_FUNCTION))
{
...
//Initialize execute? Data
zend_init_func_execute_data(call, &func->op_array, retval);
//Execute user functions in the process
zend_execute_ex(EG(current_execute_data));
}
...
}Затем будет выполнен код операции, сгенерированный функцией обратного вызова пользователя. Когда код операции будет выполнен в CO:: sleep (1), будет вызвана system:: sleep (секунды). В этом случае для текущей сопрограммы будет зарегистрировано событие синхронизации. Функция обратного вызова-это сон? Перерыв:
int System::sleep(double sec)
{
Coroutine * co = coroutine:: get Fu current Fu safe(); // get the current coroutine
If (swoole? Timer? Add ((long) (SEC * 1000), SW? False, sleep? Timeout, CO) = = null) // add a timing event for the current courroute
{
return -1;
}
Co - > yield(); // switch
return 0;
}
//Callback for timed event registration
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
((Coroutine *) tnode->data)->resume();
}Функция Yield отвечает за переключение между стеком PHP и стеком C
void Coroutine::yield()
{
SW_ASSERT(current == this || on_bailout != nullptr);
State = SW ﹣ cor ﹣ waiting; // the process state changes to waiting
if (sw_likely(on_yield))
{
On_yield (task); // PHP stack switch
}
Current = origin; // switch the current process to the previous one
CTX. Swap out(); // C stack switch
}Во-первых, давайте посмотрим на переключатель стека PHP. On yield-это зарегистрированная функция во время инициализации
void PHPCoroutine::init()
{
Coroutine::set_on_yield(on_yield);
Coroutine::set_on_resume(on_resume);
Coroutine::set_on_close(on_close);
}
void PHPCoroutine::on_yield(void *arg)
{
PHP ABCD task * task = (PHP ABCD task *) Arg; // current task
PHP ﹣ Coro ﹣ task * origin ﹣ task = get ﹣ origin ﹣ task (task); // get the previous task
Save task (task); // save the current task
Restore? Task (origin? Task); // restore the previous task
}После получения последней задачи вы можете восстановить, например, информацию о выполнении, сохраненную выше. Программа очень проста. Просто замените vmstack и текущие данные выполнения:
void PHPCoroutine::restore_task(php_coro_task *task)
{
restore_vm_stack(task);
...
}
inline void PHPCoroutine::restore_vm_stack(php_coro_task *task)
{
EG(bailout) = task->bailout;
EG(vm_stack_top) = task->vm_stack_top;
EG(vm_stack_end) = task->vm_stack_end;
EG(vm_stack) = task->vm_stack;
EG(vm_stack_page_size) = task->vm_stack_page_size;
EG(current_execute_data) = task->execute_data;
EG(error_handling) = task->error_handling;
EG(exception_class) = task->exception_class;
EG(exception) = task->exception;
...
}
В это время состояние выполнения стека PHP было восстановлено до состояния, в котором только что была вызвана функция go () (основная задача). Затем давайте посмотрим, как обрабатывается переключатель стека C:
bool Context::swap_out()
{
jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true);
return true;
}
Вспомните функцию подкачки. Swap CTX сохраняет RSP при выполнении swap in. CTX сохраняет верхнюю позицию стека, инициализированную контекстом make. См. раздел Выполнение контекста перехода еще раз
// jump_x86_64_sysv_elf_gas.S
jump_fcontext:
/*The current register is pushed into the stack. Note that there is actually a rip above RBP, because call jump ﹣ fcontext is equivalent to push rip, JMP jump ﹣ fcontext*/
/*Rip holds the next instruction to be executed. Here is the return true after jump fcontext in swap out*/
pushq %rbp /* save RBP */
pushq %rbx /* save RBX */
pushq %r15 /* save R15 */
pushq %r14 /* save R14 */
pushq %r13 /* save R13 */
pushq %r12 /* save R12 */
/* prepare stack for FPU */
leaq -0x8(%rsp), %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 1f
/* save MMX control- and status-word */
stmxcsr (%rsp)
/* save x87 control-word */
fnstcw 0x4(%rsp)
1:
/* store RSP (pointing to context-data) in RDI */
/** CTX = RSP, save the top position of the stack*/
movq %rsp, (%rdi)
/* restore RSP (pointing to context-data) from RSI */
/*RSP = swap ﹣ CTX ﹣, which points the current execution stack to the RSP of the previous execution of swap ﹣ in*/
movq %rsi, %rsp
/* test for flag preserve_fpu */
cmp $0, %rcx
je 2f
/* restore MMX control- and status-word */
ldmxcsr (%rsp)
/* restore x87 control-word */
fldcw 0x4(%rsp)
2:
/* prepare stack for FPU */
leaq 0x8(%rsp), %rsp
/*Restore the register to the status when executing swap in*/
popq %r12 /* restrore R12 */
popq %r13 /* restrore R13 */
popq %r14 /* restrore R14 */
popq %r15 /* restrore R15 */
popq %rbx /* restrore RBX */
popq %rbp /* restrore RBP */
/* restore return-address */
/* r8 = Context::swap_in::return true */
popq %r8
/* use third arg as return-value after jump */
/* rax = this */
movq %rdx, %rax
/* use third arg as first arg in context function */
/* rdi = this */
movq %rdx, %rdi
/* indirect jump to context */
/*Next, continue to execute at the location of the last swap in*/
jmp *%r8В это время стеки PHP и C были восстановлены в состояние выполнения подкачки, и код полностью возвращается к сопрограмме ZIF [swoole] [создать]
bool Context::swap_in()
{
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
Return true; // from here, continue execution, back to the function before calling it.
}
inline long run()
{
...
CTX. Swap_in(); // return
Check_end(); // check whether the execution of the cooperation process has been completed. You need to clean up after the execution
return cid;
}
static inline long create(coroutine_func_t fn, void* args = nullptr)
{
return (new Coroutine(fn, args))->run();
}
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
return Coroutine::create(main_func, (void*) &php_coro_args);
}
PHP_FUNCTION(swoole_coroutine_create)
{
...
long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
...
Return_long (CID); // return the process ID
}Поскольку данные выполнения были переключены обратно в код операции основной задачи, следующий код операции – “эхо” a “, что эквивалентно пропуску кода после сна
Подождите до определенного времени, таймер вызовет функцию обратного вызова sleep [тайм-аут], зарегистрированную функцией sleep, и процесс пробуждения продолжится:
//Callback for timed event registration
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
((Coroutine *) tnode->data)->resume();
}
//Restore the entire execution environment
void Coroutine::resume()
{
...
State = SW ABCD Coro ABCD running; // the process state is changed to in progress
if (sw_likely(on_resume))
{
On "resume (task); // resume PHP execution status
}
origin = current;
current = this;
CTX. Swap_in(); // recover C stack
...
}
//Restore task
void PHPCoroutine::on_resume(void *arg)
{
php_coro_task *task = (php_coro_task *) arg;
php_coro_task *current_task = get_task();
Save? Task (current? Task); // save the current task
Restore_task (task); // restore task
Record? Last? Msec (task); // record the time
}Зенд? Виртуальная машина прочитает код операции “echo” a ” и продолжит выполнение
После выполнения всех кодов операций в текущем обратном вызове функция phpcoroutine:: Main выполнит ранее зарегистрированную функцию отсрочки один раз в порядке Filo, а затем очистит ресурсы
void PHPCoroutine::main_func(void *arg)
{
...
if (EXPECTED(func->type == ZEND_USER_FUNCTION))
{
...
//After the execution of the coordination callback function, return
zend_execute_ex(EG(current_execute_data));
}
if (task->defer_tasks)
{
std::stack *tasks = task->defer_tasks;
while (!tasks->empty())
{
php_swoole_fci *defer_fci = tasks->top();
tasks->pop(); // FILO
//Call the function registered by defer
if (UNEXPECTED(sw_zend_call_function_anyway(&defer_fci->fci, &defer_fci->fci_cache) != SUCCESS))
{
...
}
}
}
// resources release
...
} Когда основная функция будет завершена, вернитесь к методу context:: context func. Отметьте текущий процесс как завершенный. Сделайте еще один своп и вернитесь в то место, где только что произошел своп, то есть возобновите метод. Затем проверьте, завершен ли процесс пробуждения. Необходимо определить только конечный атрибут
void Context::context_func(void *arg)
{
Context *_this = (Context *) arg;
_This - > FN (this - > private data); // main func (close) Returns
_This - > end = true; // the current process is marked as closed
_This - > swap out(); // switch back to main C stack
}
void Coroutine::resume()
{
...
CTX. Swap_in(); // switch back here
Check_end(); // check whether the cooperation process has ended
}
inline void check_end()
{
if (ctx.is_end())
{
close();
}
}
inline bool is_end()
{
return end_;
}Метод close очистит стек виртуальных машин, созданный для этого сотрудничества, и вернется к основной задаче. В это время и стек C, и стек PHP вернулись к основному сотрудничеству
void Coroutine::close()
{
...
State = SW ﹐ Coro ﹐ end; // state changed to closed
if (on_close)
{
on_close(task);
}
current = origin;
Coroutines. Erase (CID); // remove the current process
delete this;
}
void PHPCoroutine::on_close(void *arg)
{
php_coro_task *task = (php_coro_task *) arg;
php_coro_task *origin_task = get_origin_task(task);
VM? Stack? Destroy(); // destroy VM? Stack
Restore? Task (origin? Task); // restore main? Task
}Когда будут выполняться события по времени? Это достигается за счет внутреннего цикла событий реактора. Смотрите конкретную реализацию ниже:
При создании контракта он определит, был ли реактор инициализирован. Если он не был инициализирован, он вызовет функцию активации для инициализации реактора. Функция активации состоит из следующих шагов:
1. Initialize reactor structure and register various callback functions (the most efficient multiplex API corresponding to the platform is used for reading and writing events, which is encapsulated into a unified callback function to help shield the implementation details of different APIs) Инициализируйте структуру реактора и зарегистрируйте различные функции обратного вызова (наиболее эффективный мультиплексный API, соответствующий платформе, используется для чтения и записи событий, который инкапсулирован в единую функцию обратного вызова, чтобы помочь скрыть детали реализации различных API).
2. Register a function called in the request shutdown phase (recall the PHP life cycle, which will be called at the end of the script) through PHP [swoole] register [shutdown] function (“swoole \ event:: rshutdown”). In fact, the event cycle is executed in this phase Зарегистрируйте функцию, вызываемую на этапе завершения запроса (вспомните жизненный цикл PHP, который будет вызван в конце сценария), с помощью функции PHP [swool] register [shutdown] (“swool \ событие:: rshutdown”). Фактически, цикл событий выполняется на этой фазе
3. Enable preemptive scheduling thread (later on) Включить поток упреждающего планирования (позже)
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
...
if (sw_unlikely(!active))
{
activate();
}
...
}
inline void PHPCoroutine::activate()
{
...
/* init reactor and register event wait */
php_swoole_check_reactor();
/* replace interrupt function */
Orig ﹐ interrupt ﹐ function = Zend ﹐ interrupt ﹐ function; // save the original interrupt callback function
Zend? Interrupt? Function = cor? Interrupt? Function; // replace the interrupt function
//Start preemptive scheduling
if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
{
/* create a thread to interrupt the coroutine that takes up too much time */
interrupt_thread_start();
}
...
active = true;
}
static sw_inline int php_swoole_check_reactor()
{
...
if (sw_unlikely(!SwooleG.main_reactor))
{
return php_swoole_reactor_init() == SW_OK ? 1 : -1;
}
...
}
int php_swoole_reactor_init()
{
...
if (!SwooleG.main_reactor)
{
swoole_event_init();
SwooleG.main_reactor->wait_exit = 1;
//Register rshutdown function
php_swoole_register_shutdown_function("Swoole\Event::rshutdown");
}
...
}
#define sw_reactor() (SwooleG.main_reactor)
#define SW_REACTOR_MAXEVENTS 4096
int swoole_event_init()
{
SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor));
if (swReactor_create(sw_reactor(), SW_REACTOR_MAXEVENTS) < 0)
{
...
}
...
}
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);
#elif defined(HAVE_KQUEUE)
ret = swReactorKqueue_create(reactor, max_event);
#elif defined(HAVE_POLL)
ret = swReactorPoll_create(reactor, max_event);
#else
ret = swReactorSelect_create(reactor);
#endif
...
Reactor - > ontimeout = reactor Ou timeout; // callback triggered when timer timeout
...
Socket::init_reactor(reactor);
...
}
int swReactorEpoll_create(swReactor *reactor, int max_event_num)
{
...
//binding method
reactor->add = swReactorEpoll_add;
reactor->set = swReactorEpoll_set;
reactor->del = swReactorEpoll_del;
reactor->wait = swReactorEpoll_wait;
reactor->free = swReactorEpoll_free;
}В запросе? Фаза выключения, будет выполнена зарегистрированная функция swool \ event:: shutdown, и swool? Событие? Завершение работы приведет к выполнению ранее зарегистрированной функции ожидания:
static PHP_FUNCTION(swoole_event_rshutdown)
{
/* prevent the program from jumping out of the rshutdown */
zend_try
{
PHP_FN(swoole_event_wait)(INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
zend_end_try();
}
int swoole_event_wait()
{
int retval = sw_reactor()->wait(sw_reactor(), NULL);
swoole_event_free();
return retval;
}
Давайте взглянем на регистрацию событий по времени. Во-первых, таймер будет инициализирован:
int System::sleep(double sec)
{
Coroutine * co = coroutine:: get Fu current Fu safe(); // get the current coroutine
if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL)
{
...
}
}
swTimer_node* swoole_timer_add(long ms, uchar persistent, swTimerCallback callback, void *private_data)
{
return swTimer_add(sw_timer(), ms, persistent, private_data, callback);
}
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
if (sw_unlikely(!timer->initialized))
{
If (SW? Unlikely (SW timer? Init (timer,? Msec)! = SW? OK)) // initialize timer
{
return NULL;
}
}
...
}
static int swTimer_init(swTimer *timer, long msec)
{
...
Timer - > heap = swheap UU new (1024, SW UU min UU heap); // initialize the minimum heap
timer->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
Timer - > "current" id = - 1; // current timer ID
Timer - >
timer->_next_id = 1;
timer->round = 0;
ret = swReactorTimer_init(SwooleG.main_reactor, timer, msec);
...
}
static int swReactorTimer_init(swReactor *reactor, swTimer *timer, long exec_msec)
{
reactor->check_timer = SW_TRUE;
Reactor - > timeout ﹣ msec = exec ﹣ msec; // the shortest timeout in the timer
reactor->timer = timer;
timer->reactor = reactor;
timer->set = swReactorTimer_set;
timer->close = swReactorTimer_close;
...
}Затем добавьте событие. Примечание:
1. Time. Next msec and reactor. Timeout msec keep the shortest timeout (relative value) in all timers Время. Следующая секунда и реактор. Тайм-аут мсек сохраняйте самый короткий тайм-аут (относительное значение) во всех таймерах
2. For tnode.exec ﹣ msec and tnode Minimum heap To save, so that the element at the top of the heap is the earliest timeout element Для tnode.exec ﹣ msec и tnode
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
swTimer_node *tnode = sw_malloc(sizeof(swTimer_node));
int64_t now_msec = swTimer_get_relative_msec();
tnode->data = data;
tnode->type = SW_TIMER_TYPE_KERNEL;
Tnode - > exec ﹣ msec = now ﹣ msec + ﹣ msec; // absolute time
Tnode - > interval = interval? _msec: 0; // do you want to call all the time
tnode->removed = 0;
tnode->callback = callback;
tnode->round = timer->round;
tnode->dtor = NULL;
If (Timer - >
{
timer->set(timer, _msec);
timer->_next_msec = _msec;
}
tnode->id = timer->_next_id++;
Tnode - > heap [node = swheap [push (Timer - > heap, tnode - > exec [msec, tnode); // put it in the heap, priority = tnode - > exec [msec]
If (SW? Unlikely (SW HashMap? Add? Int (Timer - > map, tnode - > ID, tnode)! = SW? OK)) // HashMap saves the tnodeid and tnode mapping relationship
{
...
}
...
}После регистрации времени синхронизации вы можете дождаться выполнения цикла событий. Давайте возьмем epoll в качестве примера:
Использовать epoll? Подождите, чтобы дождаться события чтения/записи FD, передайте в reactor – > тайм-аут? Мсек и дождитесь прибытия события FD
1. If no FD read-write event has been obtained after the timeout, execute the ontimeout function to handle the timing event Если после истечения тайм-аута не было получено событие чтения-записи FD, выполните функцию ontimeout для обработки события синхронизации
2. If there is an FD event, the FD read-write event will be handled. After handling the triggered event, the next cycle will be started Если есть событие FD, будет обработано событие FD для чтения и записи. После обработки инициированного события будет запущен следующий цикл
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
...
reactor->running = 1;
reactor->start = 1;
while (reactor->running > 0)
{
...
n = epoll_wait(epoll_fd, events, max_event_num, reactor->timeout_msec);
if (n < 0)
{
...
//Error handling
}
else if (n == 0)
{
reactor->onTimeout(reactor);
}
for (i = 0; i < n; i++)
{
...
//FD read write event handling
}
...
}
return 0;
}Если в течение этого периода не произойдет события FD, будет выполнено событие синхронизации. Время ожидания-это ранее зарегистрированный функциональный реактор [тайм-аут]. Функция таймера sw [Выбрать] выполнит текущее событие с истекшим сроком действия, а затем завершит цикл. Когда мы выполним функцию сна [тайм-аут], зарегистрированную выше, мы разбудим процесс сна и продолжим выполнение:
static void reactor_timeout(swReactor *reactor)
{
reactor_finish(reactor);
...
}
static void reactor_finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(reactor->timer);
}
...
//the event loop is empty
If (reactor - > wait [Exit & & reactor - > is [empty (reactor)) // no task, exit the loop
{
reactor->running = 0;
}
}
int swTimer_select(swTimer *timer)
{
Int64? T now? Msec = swtimer? Get? Relative? Msec(); // current time
While ((TMP = swheap_top (Timer - > heap))) // get the earliest expired event
{
tnode = tmp->data;
If (tnode - > exec > now // not yet
{
break;
}
if (!tnode->removed)
{
Tnode - > callback (timer, tnode); // execute the callback function registered by the timing event
}
timer->num--;
swHeap_pop(timer->heap);
swHashMap_del_int(timer->map, tnode->id);
}
...
}До сих пор весь процесс был представлен и обобщен следующим образом:
- В отсутствие активного вмешательства в планирование, При выполнении событий ввода-вывода/синхронизации процессы взаимодействия активно отказываются, регистрируют соответствующие события, а затем ожидают, пока события пройдут цикл событий в фазе запроса “завершение работы”, запуская возобновление процесса сотрудничества, чтобы добиться одновременного эффекта нескольких процессов сотрудничества
- Ввод-вывод/события синхронизации могут быть не вовремя
Из вышесказанного мы можем знать, что если в процессе нет события ввода-вывода/синхронизации, то на самом деле для процесса нет возможности переключения. В сценариях с интенсивным использованием процессора некоторые процессы будут голодать, потому что они не смогут получить срез времени процессора. Цель упреждающего планирования, введенного в swool 4.4, состоит в том, чтобы решить эту проблему
Прерывание виртуальной машины-это механизм выполнения, введенный после php7.1.0. Swool-это упреждающее планирование, реализованное с помощью этой функции:
1. Zend? VM? Interrupt? Check jump and call When Зенд? ВМ? Прервать? Проверка
2. Zend? VM? Interrupt? Check will check the flag bit eg (VM? Interrupt). If it is 1, Zend? Interrupt? Function will be triggered Зенд? ВМ? Прервать? Проверка проверит бит флага, например (VM? Прервать). Если это 1, Зенд? Прервать? Функция будет запущена
// php 7.1.26 src
#define ZEND_VM_INTERRUPT_CHECK() do { \
if (UNEXPECTED(EG(vm_interrupt))) { \
ZEND_VM_INTERRUPT(); \
} \
} while (0)
#define ZEND_VM_INTERRUPT() ZEND_VM_TAIL_CALL(zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS_PASSTHRU));
static ZEND_OPCODE_HANDLER_RET ZEND_FASTCALL zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS)
{
...
EG(vm_interrupt) = 0;
if (zend_interrupt_function) {
zend_interrupt_function(execute_data);
}
}
Давайте посмотрим на конкретную реализацию:
Инициализация:
1. Save the original interrupt function, and replace zend_interrupt_function with a new interrupt function Сохраните исходную функцию прерывания и замените функцию zend_interrupt_function новой функцией прерывания
2. Enable thread execution interrupt? Thread? Loop Включить прерывание выполнения потока? Нить? Петля
3. Set eg (VM · interrupt) to 1 every 5ms in interrupt · thread · loop Установите, например, (VM · прерывание) на 1 каждые 5 мс в цикле прерывания · поток ·
inline void PHPCoroutine::activate()
{
...
/* replace interrupt function */
Orig ﹐ interrupt ﹐ function = Zend ﹐ interrupt ﹐ function; // save the original interrupt callback function
Zend? Interrupt? Function = cor? Interrupt? Function; // replace the interrupt function
//Start preemptive scheduling
If (swoole? G (enable? Preemptive? Scheduler) |? Config. Enable? Preemptive? Scheduler) // configure to turn on the enable? Preemptive? Scheduler option
{
/* create a thread to interrupt the coroutine that takes up too much time */
interrupt_thread_start();
}
}
void PHPCoroutine::interrupt_thread_start()
{
zend_vm_interrupt = &EG(vm_interrupt);
interrupt_thread_running = true;
if (pthread_create(&interrupt_thread_id, NULL, (void * (*)(void *)) interrupt_thread_loop, NULL) < 0)
{
...
}
}
static const uint8_t MAX_EXEC_MSEC = 10;
void PHPCoroutine::interrupt_thread_loop()
{
static const useconds_t interval = (MAX_EXEC_MSEC / 2) * 1000;
while (interrupt_thread_running)
{
*zend_vm_interrupt = 1; // EG(vm_interrupt) = 1
Usleep (interval); // sleep for 5ms
}
pthread_exit(0);
}Функция прерывания проверит, можно ли запланировать текущий процесс (более 10 мс с момента последней передачи). Если это так, он напрямую откажется от текущего процесса и завершит упреждающее планирование
static void coro_interrupt_function(zend_execute_data *execute_data)
{
php_coro_task *task = PHPCoroutine::get_task();
if (task && task->co && PHPCoroutine::is_schedulable(task))
{
Task - > co - > yield(); // give up the current process
}
if (orig_interrupt_function)
{
Orig? Interrupt? Function (execute? Data); // execute the original interrupt function
}
}
static const uint8_t MAX_EXEC_MSEC = 10;
static inline bool is_schedulable(php_coro_task *task)
{
//Enable ﹣ the scheduler property is 1 and has been running for more than 10ms
return task->enable_scheduler && (swTimer_get_absolute_msec() - task->last_msec > MAX_EXEC_MSEC);
}