байюнь
Вводить
Прежде чем прочитать эту статью, пожалуйста, прочитайте: Анализ способа повышения производительности параллельного ввода – вывода сервера-от сетевого программирования до epoll, чтобы лучше понять содержание этой статьи, спасибо. Мы знаем , что при использовании redis мы можем получить данные, возвращаемые сервером redis, отправив команду get от клиента. Redis основан на традиционной архитектуре C/S. Он получает соединение от клиента, прослушивая TCP-порт (6379), а затем выполняет последующие команды и возвращает результаты выполнения клиенту.
Redis-это квалифицированная серверная программа
Давайте начнем с вопроса: Как квалифицированная серверная программа, как сервер redis обрабатывает команду get в командной строке и возвращает результат клиенту? Чтобы ответить на этот вопрос, мы сначала рассмотрим то, что мы сказали в предыдущей статье. Клиентам и серверам необходимо создать сокет, чтобы указать свой сетевой адрес и номер порта соответственно, а затем обмениваться данными между сокетами на основе протокола TCP. Обычно поток связи сокетов серверной программы выглядит следующим образом:
int main(int argc, char *argv[]) {
ListenSocket = socket (); // Call socket () System call to create a listener socket descriptor
Bind (listenSocket); // Bind Address and Port
Listen (listenSocket); // Convert from the default active socket to the server-applicable passive socket
While (1) {// Continuous Loop to Listen for Client Connection Events
ConnSocket = accept ($listenSocket); // Accept client connection
Read (connsocket); // Read data from the client, only one client can be processed at the same time
Write (connsocket); // Return to client data, only one client can be processed at the same time
}
return 0;
}В красном-это те же шаги, которые требуются. После установления соединения с клиентом команда, отправленная клиентом, будет прочитана, а затем команда будет выполнена. Наконец, результат выполнения команды будет возвращен клиенту путем вызова системного вызова записи. Но такой процесс может обрабатывать только события подключения и чтения-записи одного клиента одновременно. Чтобы серверные приложения с одним процессом могли одновременно обрабатывать несколько клиентских событий, мы используем механизм мультиплексирования ввода-вывода. Лучший механизм мультиплексирования ввода-вывода-epoll. Просмотрите код сервера, который мы наконец создали с помощью epoll, в нашей последней статье:
int main(int argc, char *argv[]) {
ListenSocket = socket (AF_INET, SOCK_STREAM, 0); //Ibid., create a listener socket descriptor
Bid (listenSocket) // Ibid., Bind Address and Port
Listen (listenSocket) // Ibid., from the default active socket to the server-applicable passive socket
EPFD = epoll_create (EPOLL_SIZE); // Create an epoll instance
Ep_events= (epoll_event*) malloc (sizeof (epoll_event)* EPOLL_SIZE); // Create an epoll_event structure to store Socket Sets
event.events = EPOLLIN;
event.data.fd = listenSocket;
Epoll_ctl (epfd, EPOLL_CTL_ADD, listenSocket, & event); // Add listening sockets to the listening list
while (1) {
Event_cnt = epoll_wait (epfd, ep_events, EPOLL_SIZE, -1); /// Waiting to return the socket descriptors that are ready
For (int I = 0; I < event_cnt; ++i) {// traverse all ready socket descriptors
If (ep_events[i]. data.fd== listenSocket) {// If the listener socket descriptor is ready, a new client is connected to it
ConnSocket = accept (listenSocket); // Call accept () to establish a connection
event.events = EPOLLIN;
event.data.fd = connSocket;
Epoll_ctl (epfd, EPOLL_CTL_ADD, connSocket, & event); // Add listening to newly established connection socket descriptors to listen for subsequent read and write events on connection descriptors
} else {// If the connection socket descriptor event is ready, you can read and write
Strlen = read (ep_events [i]. data.fd, buf, BUF_SIZE); // Read data from the connection socket descriptor, at this time it will read the data without blocking.
If (strlen = 0) {// can't read data from the connection socket anymore, you need to remove the socket listening
Epoll_ctl (ep fd, EPOLL_CTL_DEL, ep_events[i], data.fd, NULL); //Delete the monitoring of this descriptor
close(ep_events[i].data.fd);
} else {
Write (ep_events [i]. data.fd, buf, str_len); // / If the client can write and write the data back to the client
}
}
}
}
close(listenSocket);
close(epfd);
return 0;
}Redis, основанный на оригинальном механизме отбора, опроса и epoll, в сочетании со своими уникальными бизнес-потребностями, инкапсулировал свой собственный набор функций обработки событий, мы называем его AE (простая библиотека программирования, управляемая событиями). В то время как redis использует технологию select, epoll или kqueue на mac, redis сначала оценит, а затем выберет ту, которая обладает наилучшей производительностью:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endifПоскольку функция select является системным вызовом в стандарте POSIX, она может быть реализована в разных версиях операционной системы, поэтому используется в качестве восходящего решения. Для удобства во всех следующих статьях для объяснения используется механизм epoll.
Мультиплексирование ввода-вывода в redis
Когда мы запускаем redis-сервер в командной строке, redis фактически делает что-то похожее на сервер epoll, о котором мы писали ранее. Существует три вызова ключевых функций:
int main(int argc, char **argv) {
...
InitServerConfig (); // Initializes the structure for storing server-side information
...
InitServer (); // Initializes redis event loops and calls epoll_create and epoll_ctl. Create socket, bind, listen, accept to call in this function, and register the listener descriptor and connection descriptor returned after the call
...
AeMain (); // Execute while (1) Event Loop and call epoll_wait to get the ready descriptor and call the corresponding handler
...
}Далее давайте рассмотрим каждый из них:
initServerConfig()
Вся информация на сервере redis хранится в структуре сервера redis, которая содержит множество полей, таких как информация о сокете (например, адрес и порт) на стороне сервера, а также множество сведений о конфигурации, поддерживающих другие функции redis, такие как кластеризация, сохраняемость и так далее. Этот вызов функции инициализирует все поля структуры сервера redis и присваивает начальное значение. Поскольку мы говорим о применении механизма мультиплексирования событий и ввода-вывода в redis, мы сосредоточимся только на нескольких областях.
initServer()
Этот вызов функции является нашим главным приоритетом. После инициализации соответствующей информации сервера необходимо создать, привязать, отслеживать сокеты и устанавливать соединения с клиентом. В этой функции мы вызываем сокет, привязку, прослушивание, принятие, epoll_create, epoll_ctl. Мы можем шаг за шагом изучить механизм событий redis с сервера epoll, упомянутого выше. Основные вызовы функций initServer () заключаются в следующем:
void initServer(void) {
...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
...
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
...
}Мы интерпретируем эти строки кода ключа в порядке сверху вниз:
aeCreateEventLoop()
В redis существует концепция aeEventLoop, которая управляет всеми соответствующими полями описания событий, хранит зарегистрированные события и готовые события:
typedef struct aeEventLoop {
Int stop; // Identify whether the event loop (that is, while (1)) ends
AeFileEvent * events; // Stores registered file events (file events, client connection and read-write events)
AeFiredEvent * fired; // Store ready file events
AeTimeEvent * timeEventHead; // Storage time events (more on time events later)
Void * apidata; /* stores epoll-related information*/
AeBeforeSleepProc * beforesleep; // Functions that need to be called before an event occurs
AeBeforeSleepProc * aftersleep; // Functions to be called after an event occurs
} aeEventLoop;Redis сохраняет все готовые дескрипторы, возвращенные через epoll_wait (), в первом массиве, затем пересекает массив и вызывает соответствующий обработчик событий для обработки всех событий сразу. В функции aeCreateEventLoop () инициализируется поле структуры, управляющее всей информацией о событиях, которое также включает вызов функции epoll_create() для инициализации EPFD epoll:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
If (aeApiCreate (eventLoop) == - 1) go to err; // Call aeApiCreate (), internal call epoll_create ()
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}В функции ae Api Create () вызывается функция epoll_create (), и созданный EPFD сохраняется в поле данных api структуры цикла событий:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
State - > EPFD = epoll_create (1024); /* Initialize EPFD of epoll by calling epoll_create*/
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
EvetLoop - > apidata = state; // Keep the created EPFD in the apidata field of the eventLoop structure
return 0;
}прослушайте Порт()
После создания epfd мы займемся созданием сокетов, привязкой и мониторингом. Эти действия выполняются в функции прослушивания порта ():
int listenToPort(int port, int *fds, int *count) {
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
For (j = 0; J < server. bindaddr_count | J = 0; j + +) {// traversal of all IP addresses
If (server. bindaddr [j]== NULL) {// No binding address yet
...
} Other if (strchr (server. bindaddr [j],':'){//bind IPv6 address)
...
} Other {// Bind IPv4 Address, usually into this if branch
FDS [* count] = anetTcpServer (server. neterr, port, server. bindaddr [j], server. tcp_backlog); // true binding logic
}
...
}
return C_OK;
}Redis сначала определит тип IP-адреса привязки, мы, как правило, IPv4, поэтому мы, как правило, переходим в третью ветвь, вызываем функцию anetTcpServer () для конкретной логики привязки:
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
...
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
If ((s = socket (p - > ai_family, p - > ai_socktype, p - > ai_protocol) == - 1) // Call socket () to create a listening socket
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
If (anetListen (err, s, p-> ai_addr, p-> ai_addrlen, backlog) == ANET_ERR) s = ANET_ERR; //Call bind () to listen () to bind the port and convert it into a passive socket on the server side
goto end;
}
}После вызова системных вызовов socket () для создания сокетов необходимы дальнейшие вызовы bind () и listen (). Эти два шага реализованы в функции anet Listen ():
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
If (bind (s, sa, len) == - 1) {// Call bind () Bind Port
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
If (listen (s, backlog) = - 1 {// call listen () converts an active socket to a passive listening socket
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}Видя это, мы знаем, что red, как и серверы epoll, о которых мы писали, – это процесс, требующий создания сокетов, привязки и мониторинга.
aeCreateFileEvent
В redis события подключения клиента и события чтения-записи в совокупности называются событиями файлов. Мы только что завершили процесс создания сокета, привязки и прослушивания. Теперь, когда у нас есть дескриптор прослушивателя, нам нужно сначала добавить дескриптор прослушивателя в список прослушивателей epoll для прослушивания событий подключения клиента. В initServer () событие подключения клиента обрабатывается вызовом aeCreateFileEvent () и указанием его функции обработчика событий acceptTcpHandler ():
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}Следуя функции aeCreateFileEvent (), мы обнаружили, что функция aeApiAddEvent () была дополнительно вызвана внутри:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
If (epoll_ctl (state - > epfd, op, fd, & ee) == - 1) return - 1; // Call epoll_ctl to add client connection events
return 0;
}Функция aeApiAddEvent вызывает функцию epoll_ctl() для добавления событий подключения клиента в список прослушивателей. В то же время redis помещает обработчик событий в структуру aeFileEvent для хранения:
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
AeFileProc * rfileProc; // Read Event Handler
AeFileProc * wfileProc; // Write event handler
Void * clientData; // client data
} aeFileEvent;По сравнению с серверной программой epoll, которую мы писали ранее, мы выполнили следующие шаги:
int main(int argc, char *argv[]) {
ListenSocket = socket (AF_INET, SOCK_STREAM, 0); // Create a listener socket descriptor
Bid (listenSocket) // Binding Address and Port
Listen (listenSocket) // Convert from the default active socket to the server-applicable passive socket
EPFD = epoll_create (EPOLL_SIZE); // Create an epoll instance
Ep_events= (epoll_event*) malloc (sizeof (epoll_event)* EPOLL_SIZE); // Create an epoll_event structure to store Socket Sets
event.events = EPOLLIN;
event.data.fd = listenSocket;
Epoll_ctl (epfd, EPOLL_CTL_ADD, listenSocket, & event); // Add listening sockets to the listening list
...
}Мы реализовали создание сокета, привязку, прослушивание, создание EPFD с помощью функции epoll_create () и добавили событие начального дескриптора сокета прослушивания в список прослушивания epoll и указали для него функцию обработки событий. Затем пришло время вызвать функцию epoll_wait() в цикле while (1). При вызове функции epoll_wait() через блокировку возвращаются все готовые дескрипторы сокетов, запускаются соответствующие события, а затем события обрабатываются.
остаться()
Наконец, через цикл while (1), ожидая прибытия событий подключения клиента:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}В Eventloop флаг остановки используется для определения того, заканчивается цикл или нет. Если нет, цикл вызывает processevents (). Мы предполагаем, что здесь вызывается функция epoll_wait (), блокирующая ожидание прибытия событий, затем обход всех готовых дескрипторов сокетов, а затем вызов соответствующей функции обработчика событий.
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
Numevents = aeApiPoll (eventLoop, tvp); //Call epoll_wait()
...
}Давайте проследим за aeApiPoll, чтобы увидеть, как вызывается функция epoll_wait():
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata; //
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}Во-первых, EPFD и зарегистрированный набор событий, созданный в aeApiCreate (), извлекаются из EvetLoop, и вызывается функция epoll_wait() для ожидания прибытия событий, и возвращается набор дескрипторов всех готовых событий. Позже все готовые наборы дескрипторов просматриваются, чтобы определить, какой это тип дескриптора, является ли он читаемым или доступным для записи, а затем все события, готовые к обработке, сохраняются в запущенных массивах в EvetLoop вместе с читаемыми или доступными для записи тегами в соответствующих расположениях массива. Возвращаясь к внешнему вызывающему объекту, мы теперь поместили все события, которые могут быть обработаны, в запущенный массив, чтобы мы могли пройти по массиву, чтобы получить все события, которые могут быть обработаны, а затем вызвать соответствующую функцию обработчика событий:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
Numevents = aeApiPoll (eventLoop, tvp); //Call epoll_wait()
for (j = 0; j < numevents; j++) {
AeFileEvent * Fe = & & eventLoop - > events [eventLoop - > fired [j]. fd]; // loop out all ready events
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
if (!invert && fe->mask & mask & AE_READABLE) {
Fe - > rfileProc (eventLoop, fd, Fe - > clientData, mask); /// If the event is a read event, call the read event handler
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
Fe - > wfileProc (eventLoop, fd, Fe - > clientData, mask); /// If the event is a write event, call the write event handler
fired++;
}
}
}
}
...
}Что касается того, как различать события подключения клиента и события чтения-записи, redis указывает различные обработчики событий (например, события принятия являются обработчиками событий acceptTcpHandler), а события чтения-записи являются другими обработчиками событий. Инкапсулируя этот уровень, можно избежать процедуры определения типа дескриптора сокета, и можно напрямую вызвать ранее зарегистрированный обработчик событий. Оглядываясь назад на сервер epoll, который мы писали ранее, очень ли он похож на этот код?
while (1) {
Event_cnt = epoll_wait (epfd, ep_events, EPOLL_SIZE, -1); /// Waiting to return the socket descriptors that are ready
For (int I = 0; I < event_cnt; ++i) {// traverse all ready socket descriptors
If (ep_events[i]. data.fd== listenSocket) {// If the listener socket descriptor is ready, a new client is connected to it
ConnSocket = accept (listenSocket); // Call accept () to establish a connection
event.events = EPOLLIN;
event.data.fd = connSocket;
Epoll_ctl (epfd, EPOLL_CTL_ADD, connSocket, & event); // Add listening to newly established connection socket descriptors to listen for subsequent read and write events on connection descriptors
} else {// If the connection socket descriptor event is ready, you can read and write
Strlen = read (ep_events [i]. data.fd, buf, BUF_SIZE); // Read data from the connection socket descriptor, at this time it will read the data without blocking.
If (strlen = 0) {// can't read data from the connection socket anymore, you need to remove the socket listening
Epoll_ctl (ep fd, EPOLL_CTL_DEL, ep_events[i], data.fd, NULL); //Delete the monitoring of this descriptor
close(ep_events[i].data.fd);
} else {
Write (ep_events [i]. data.fd, buf, str_len); // / If the client can write and write the data back to the client
}
}
}
}резюме
До сих пор мы освоили сценарий мультиплексирования ввода-вывода в redis. Redis централизует все соединения с событиями чтения-записи и событиями времени, о которых мы не упоминали, и инкапсулирует базовый механизм мультиплексирования ввода-вывода. Наконец, один процесс может обрабатывать несколько подключений и событий чтения-записи. Это приложение мультиплексирования ввода-вывода в красном цвете.