Рубрики
Uncategorized

Анализ мультиплексирования ввода-вывода и механизма событий в красном цвете

Автор оригинала: David Wong.

байюнь

Вводить

Прежде чем прочитать эту статью, пожалуйста, прочитайте: Анализ способа повышения производительности параллельного ввода – вывода сервера-от сетевого программирования до epoll, чтобы лучше понять содержание этой статьи, спасибо. Мы знаем , что при использовании redis мы можем получить данные, возвращаемые сервером redis, отправив команду get от клиента. Redis основан на традиционной архитектуре C/S. Он получает соединение от клиента, прослушивая TCP-порт (6379), а затем выполняет последующие команды и возвращает результаты выполнения клиенту.

Redis-это квалифицированная серверная программа

Давайте начнем с вопроса: Как квалифицированная серверная программа, как сервер redis обрабатывает команду get в командной строке и возвращает результат клиенту? Чтобы ответить на этот вопрос, мы сначала рассмотрим то, что мы сказали в предыдущей статье. Клиентам и серверам необходимо создать сокет, чтобы указать свой сетевой адрес и номер порта соответственно, а затем обмениваться данными между сокетами на основе протокола TCP. Обычно поток связи сокетов серверной программы выглядит следующим образом:

int main(int argc, char *argv[]) {
    ListenSocket = socket (); // Call socket () System call to create a listener socket descriptor
    Bind (listenSocket); // Bind Address and Port
    Listen (listenSocket); // Convert from the default active socket to the server-applicable passive socket
    While (1) {// Continuous Loop to Listen for Client Connection Events
        ConnSocket = accept ($listenSocket); // Accept client connection
        Read (connsocket); // Read data from the client, only one client can be processed at the same time
        Write (connsocket); // Return to client data, only one client can be processed at the same time
    }
    return 0;
}

В красном-это те же шаги, которые требуются. После установления соединения с клиентом команда, отправленная клиентом, будет прочитана, а затем команда будет выполнена. Наконец, результат выполнения команды будет возвращен клиенту путем вызова системного вызова записи. Но такой процесс может обрабатывать только события подключения и чтения-записи одного клиента одновременно. Чтобы серверные приложения с одним процессом могли одновременно обрабатывать несколько клиентских событий, мы используем механизм мультиплексирования ввода-вывода. Лучший механизм мультиплексирования ввода-вывода-epoll. Просмотрите код сервера, который мы наконец создали с помощью epoll, в нашей последней статье:

int main(int argc, char *argv[]) {

    ListenSocket = socket (AF_INET, SOCK_STREAM, 0); //Ibid., create a listener socket descriptor
    
    Bid (listenSocket) // Ibid., Bind Address and Port
    
    Listen (listenSocket) // Ibid., from the default active socket to the server-applicable passive socket
    
    EPFD = epoll_create (EPOLL_SIZE); // Create an epoll instance
    
    Ep_events= (epoll_event*) malloc (sizeof (epoll_event)* EPOLL_SIZE); // Create an epoll_event structure to store Socket Sets
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    Epoll_ctl (epfd, EPOLL_CTL_ADD, listenSocket, & event); // Add listening sockets to the listening list
    
    while (1) {
    
        Event_cnt = epoll_wait (epfd, ep_events, EPOLL_SIZE, -1); /// Waiting to return the socket descriptors that are ready
        
        For (int I = 0; I < event_cnt; ++i) {// traverse all ready socket descriptors
            If (ep_events[i]. data.fd== listenSocket) {// If the listener socket descriptor is ready, a new client is connected to it
            
                ConnSocket = accept (listenSocket); // Call accept () to establish a connection
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                Epoll_ctl (epfd, EPOLL_CTL_ADD, connSocket, & event); // Add listening to newly established connection socket descriptors to listen for subsequent read and write events on connection descriptors
                
            } else {// If the connection socket descriptor event is ready, you can read and write
            
                Strlen = read (ep_events [i]. data.fd, buf, BUF_SIZE); // Read data from the connection socket descriptor, at this time it will read the data without blocking.
                If (strlen = 0) {// can't read data from the connection socket anymore, you need to remove the socket listening
                
                    Epoll_ctl (ep fd, EPOLL_CTL_DEL, ep_events[i], data.fd, NULL); //Delete the monitoring of this descriptor
                    
                    close(ep_events[i].data.fd);
                } else {
                    Write (ep_events [i]. data.fd, buf, str_len); // / If the client can write and write the data back to the client
                }
            }
        }
    }
    close(listenSocket);
    close(epfd);
    return 0;
}

Redis, основанный на оригинальном механизме отбора, опроса и epoll, в сочетании со своими уникальными бизнес-потребностями, инкапсулировал свой собственный набор функций обработки событий, мы называем его AE (простая библиотека программирования, управляемая событиями). В то время как redis использует технологию select, epoll или kqueue на mac, redis сначала оценит, а затем выберет ту, которая обладает наилучшей производительностью:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

Поскольку функция select является системным вызовом в стандарте POSIX, она может быть реализована в разных версиях операционной системы, поэтому используется в качестве восходящего решения. Для удобства во всех следующих статьях для объяснения используется механизм epoll.

Мультиплексирование ввода-вывода в redis

Когда мы запускаем redis-сервер в командной строке, redis фактически делает что-то похожее на сервер epoll, о котором мы писали ранее. Существует три вызова ключевых функций:

int main(int argc, char **argv) {
    ...
    InitServerConfig (); // Initializes the structure for storing server-side information
    ...
    InitServer (); // Initializes redis event loops and calls epoll_create and epoll_ctl. Create socket, bind, listen, accept to call in this function, and register the listener descriptor and connection descriptor returned after the call
    ...
    AeMain (); // Execute while (1) Event Loop and call epoll_wait to get the ready descriptor and call the corresponding handler
    ...
}

Далее давайте рассмотрим каждый из них:

initServerConfig()

Вся информация на сервере redis хранится в структуре сервера redis, которая содержит множество полей, таких как информация о сокете (например, адрес и порт) на стороне сервера, а также множество сведений о конфигурации, поддерживающих другие функции redis, такие как кластеризация, сохраняемость и так далее. Этот вызов функции инициализирует все поля структуры сервера redis и присваивает начальное значение. Поскольку мы говорим о применении механизма мультиплексирования событий и ввода-вывода в redis, мы сосредоточимся только на нескольких областях.

initServer()

Этот вызов функции является нашим главным приоритетом. После инициализации соответствующей информации сервера необходимо создать, привязать, отслеживать сокеты и устанавливать соединения с клиентом. В этой функции мы вызываем сокет, привязку, прослушивание, принятие, epoll_create, epoll_ctl. Мы можем шаг за шагом изучить механизм событий redis с сервера epoll, упомянутого выше. Основные вызовы функций initServer () заключаются в следующем:

void initServer(void) {
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 
    ...

    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...

    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
       }
    }
    ...
}

Мы интерпретируем эти строки кода ключа в порядке сверху вниз:

aeCreateEventLoop()

В redis существует концепция aeEventLoop, которая управляет всеми соответствующими полями описания событий, хранит зарегистрированные события и готовые события:

typedef struct aeEventLoop {
    Int stop; // Identify whether the event loop (that is, while (1)) ends
    
    AeFileEvent * events; // Stores registered file events (file events, client connection and read-write events)
    AeFiredEvent * fired; // Store ready file events
    AeTimeEvent * timeEventHead; // Storage time events (more on time events later)
    
    Void * apidata; /* stores epoll-related information*/
    
    AeBeforeSleepProc * beforesleep; // Functions that need to be called before an event occurs
    AeBeforeSleepProc * aftersleep; // Functions to be called after an event occurs
} aeEventLoop;

Redis сохраняет все готовые дескрипторы, возвращенные через epoll_wait (), в первом массиве, затем пересекает массив и вызывает соответствующий обработчик событий для обработки всех событий сразу. В функции aeCreateEventLoop () инициализируется поле структуры, управляющее всей информацией о событиях, которое также включает вызов функции epoll_create() для инициализации EPFD epoll:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    If (aeApiCreate (eventLoop) == - 1) go to err; // Call aeApiCreate (), internal call epoll_create ()
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
}

В функции ae Api Create () вызывается функция epoll_create (), и созданный EPFD сохраняется в поле данных api структуры цикла событий:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    State - > EPFD = epoll_create (1024); /* Initialize EPFD of epoll by calling epoll_create*/
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    EvetLoop - > apidata = state; // Keep the created EPFD in the apidata field of the eventLoop structure
    return 0;
}

прослушайте Порт()

После создания epfd мы займемся созданием сокетов, привязкой и мониторингом. Эти действия выполняются в функции прослушивания порта ():

int listenToPort(int port, int *fds, int *count) {
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    For (j = 0; J < server. bindaddr_count | J = 0; j + +) {// traversal of all IP addresses
        If (server. bindaddr [j]== NULL) {// No binding address yet
           ...
        } Other if (strchr (server. bindaddr [j],':'){//bind IPv6 address)
            ...
        } Other {// Bind IPv4 Address, usually into this if branch
            FDS [* count] = anetTcpServer (server. neterr, port, server. bindaddr [j], server. tcp_backlog); // true binding logic
        }
        ...
    }
    return C_OK;
}

Redis сначала определит тип IP-адреса привязки, мы, как правило, IPv4, поэтому мы, как правило, переходим в третью ветвь, вызываем функцию anetTcpServer () для конкретной логики привязки:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
   ...
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        If ((s = socket (p - > ai_family, p - > ai_socktype, p - > ai_protocol) == - 1) // Call socket () to create a listening socket
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        If (anetListen (err, s, p-> ai_addr, p-> ai_addrlen, backlog) == ANET_ERR) s = ANET_ERR; //Call bind () to listen () to bind the port and convert it into a passive socket on the server side
        goto end;
    }
}

После вызова системных вызовов socket () для создания сокетов необходимы дальнейшие вызовы bind () и listen (). Эти два шага реализованы в функции anet Listen ():

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    If (bind (s, sa, len) == - 1) {// Call bind () Bind Port
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    If (listen (s, backlog) = - 1 {// call listen () converts an active socket to a passive listening socket
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

Видя это, мы знаем, что red, как и серверы epoll, о которых мы писали, – это процесс, требующий создания сокетов, привязки и мониторинга.

aeCreateFileEvent

В redis события подключения клиента и события чтения-записи в совокупности называются событиями файлов. Мы только что завершили процесс создания сокета, привязки и прослушивания. Теперь, когда у нас есть дескриптор прослушивателя, нам нужно сначала добавить дескриптор прослушивателя в список прослушивателей epoll для прослушивания событий подключения клиента. В initServer () событие подключения клиента обрабатывается вызовом aeCreateFileEvent () и указанием его функции обработчика событий acceptTcpHandler ():

    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
        }
    }

Следуя функции aeCreateFileEvent (), мы обнаружили, что функция aeApiAddEvent () была дополнительно вызвана внутри:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; 
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    If (epoll_ctl (state - > epfd, op, fd, & ee) == - 1) return - 1; // Call epoll_ctl to add client connection events
    return 0;
}

Функция aeApiAddEvent вызывает функцию epoll_ctl() для добавления событий подключения клиента в список прослушивателей. В то же время redis помещает обработчик событий в структуру aeFileEvent для хранения:

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    AeFileProc * rfileProc; // Read Event Handler
    AeFileProc * wfileProc; // Write event handler
    Void * clientData; // client data
} aeFileEvent;

По сравнению с серверной программой epoll, которую мы писали ранее, мы выполнили следующие шаги:

int main(int argc, char *argv[]) {

    ListenSocket = socket (AF_INET, SOCK_STREAM, 0); // Create a listener socket descriptor
    
    Bid (listenSocket) // Binding Address and Port
    
    Listen (listenSocket) // Convert from the default active socket to the server-applicable passive socket
    
    EPFD = epoll_create (EPOLL_SIZE); // Create an epoll instance
    
    Ep_events= (epoll_event*) malloc (sizeof (epoll_event)* EPOLL_SIZE); // Create an epoll_event structure to store Socket Sets
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    Epoll_ctl (epfd, EPOLL_CTL_ADD, listenSocket, & event); // Add listening sockets to the listening list
   ...
}

Мы реализовали создание сокета, привязку, прослушивание, создание EPFD с помощью функции epoll_create () и добавили событие начального дескриптора сокета прослушивания в список прослушивания epoll и указали для него функцию обработки событий. Затем пришло время вызвать функцию epoll_wait() в цикле while (1). При вызове функции epoll_wait() через блокировку возвращаются все готовые дескрипторы сокетов, запускаются соответствующие события, а затем события обрабатываются.

остаться()

Наконец, через цикл while (1), ожидая прибытия событий подключения клиента:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

В Eventloop флаг остановки используется для определения того, заканчивается цикл или нет. Если нет, цикл вызывает processevents (). Мы предполагаем, что здесь вызывается функция epoll_wait (), блокирующая ожидание прибытия событий, затем обход всех готовых дескрипторов сокетов, а затем вызов соответствующей функции обработчика событий.

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        Numevents = aeApiPoll (eventLoop, tvp); //Call epoll_wait()
        ...
}

Давайте проследим за aeApiPoll, чтобы увидеть, как вызывается функция epoll_wait():

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata; //
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

Во-первых, EPFD и зарегистрированный набор событий, созданный в aeApiCreate (), извлекаются из EvetLoop, и вызывается функция epoll_wait() для ожидания прибытия событий, и возвращается набор дескрипторов всех готовых событий. Позже все готовые наборы дескрипторов просматриваются, чтобы определить, какой это тип дескриптора, является ли он читаемым или доступным для записи, а затем все события, готовые к обработке, сохраняются в запущенных массивах в EvetLoop вместе с читаемыми или доступными для записи тегами в соответствующих расположениях массива. Возвращаясь к внешнему вызывающему объекту, мы теперь поместили все события, которые могут быть обработаны, в запущенный массив, чтобы мы могли пройти по массиву, чтобы получить все события, которые могут быть обработаны, а затем вызвать соответствующую функцию обработчика событий:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        Numevents = aeApiPoll (eventLoop, tvp); //Call epoll_wait()

        for (j = 0; j < numevents; j++) {
            AeFileEvent * Fe = & & eventLoop - > events [eventLoop - > fired [j]. fd]; // loop out all ready events
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; 

            if (!invert && fe->mask & mask & AE_READABLE) {
                Fe - > rfileProc (eventLoop, fd, Fe - > clientData, mask); /// If the event is a read event, call the read event handler
                fired++;
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    Fe - > wfileProc (eventLoop, fd, Fe - > clientData, mask); /// If the event is a write event, call the write event handler
                    fired++;
                }
            }
        }
    }
    ...
}

Что касается того, как различать события подключения клиента и события чтения-записи, redis указывает различные обработчики событий (например, события принятия являются обработчиками событий acceptTcpHandler), а события чтения-записи являются другими обработчиками событий. Инкапсулируя этот уровень, можно избежать процедуры определения типа дескриптора сокета, и можно напрямую вызвать ранее зарегистрированный обработчик событий. Оглядываясь назад на сервер epoll, который мы писали ранее, очень ли он похож на этот код?

while (1) {
    
        Event_cnt = epoll_wait (epfd, ep_events, EPOLL_SIZE, -1); /// Waiting to return the socket descriptors that are ready
        
        For (int I = 0; I < event_cnt; ++i) {// traverse all ready socket descriptors
            If (ep_events[i]. data.fd== listenSocket) {// If the listener socket descriptor is ready, a new client is connected to it
            
                ConnSocket = accept (listenSocket); // Call accept () to establish a connection
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                Epoll_ctl (epfd, EPOLL_CTL_ADD, connSocket, & event); // Add listening to newly established connection socket descriptors to listen for subsequent read and write events on connection descriptors
                
            } else {// If the connection socket descriptor event is ready, you can read and write
            
                Strlen = read (ep_events [i]. data.fd, buf, BUF_SIZE); // Read data from the connection socket descriptor, at this time it will read the data without blocking.
                If (strlen = 0) {// can't read data from the connection socket anymore, you need to remove the socket listening
                
                    Epoll_ctl (ep fd, EPOLL_CTL_DEL, ep_events[i], data.fd, NULL); //Delete the monitoring of this descriptor
                    
                    close(ep_events[i].data.fd);
                } else {
                    Write (ep_events [i]. data.fd, buf, str_len); // / If the client can write and write the data back to the client
                }
            }
        }
    }

резюме

До сих пор мы освоили сценарий мультиплексирования ввода-вывода в redis. Redis централизует все соединения с событиями чтения-записи и событиями времени, о которых мы не упоминали, и инкапсулирует базовый механизм мультиплексирования ввода-вывода. Наконец, один процесс может обрабатывать несколько подключений и событий чтения-записи. Это приложение мультиплексирования ввода-вывода в красном цвете.