2015-07-11 1 views
7

Можно ли разделить один и тот же Epoll fd (not socket fd) среди нескольких потоков? И если да, будет ли каждый поток передавать свой собственный массив событий до epoll_wait(2) или они могут поделиться им?Можно ли использовать один и тот же дескриптор файла epoll среди потоков?

Например

void *thread_func(void *thread_args) { 
     // extract socket_fd, epoll_fd, &event, &events_array from 
     //  thread_args 
     // epoll_wait() using epoll_fd and events_array received from main 
     // now all threads would be using same epoll_fd and events array 
    } 

    void main(void) { 
     // create and bind to socket 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // pack the socket_fd, epoll_fd, &events, &events_array into 
     // thread_args struct. 

     // create multiple threads and pass thread_func and 
     // same thread_args to all threads 
    } 

Или лучше сделать это следующим образом:

void *thread_func(void *socket_fd) { 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // epoll_wait using own epoll_fd and events_array 
     // now all threads would have a separate epoll_fd with 
     // events populated on its own array 
    } 

    void main(void) { 
    // create and bind to socket 

    //create multiple threads and pass thread_func and socket_fd to 
    // all threads 
    } 

Есть хороший пример того, как сделать это в C? Примеры, которые я видел, запускают цикл событий в main() и порождают новый поток для обработки запроса всякий раз, когда обнаружено событие. То, что я хочу сделать, это создать определенное количество потоков в начале программы и каждый поток, выполняющий цикл цикла и запросы обработки.

ответ

13

Можно ли разделить один и тот же Epoll fd (не сокет fd) среди нескольких потоков .

Да, это безопасно - интерфейс epoll(7) потокобезопасно - но вы должны быть осторожны, делая так, вы должны по крайней мере использовать EPOLLET (режим фронта сигнала, в отличие от по умолчанию уровня срабатывает) чтобы избежать побочных пробуждений в других потоках. Это связано с тем, что режим уровня запускает пробуждение каждого потока, когда новое событие доступно для обработки. Поскольку только один поток будет иметь дело с ним, это просыпает большинство потоков без необходимости.

Если общий э.п.п.м. используется будет каждый поток должен пройти его собственные события массива или общие события массив epoll_wait()

Да, нужны отдельные события массива в каждом потоке, или иначе у вас будут условия гонки, и неприятные вещи могут случиться. Например, у вас может быть поток, который все еще выполняет итерацию через события, возвращаемые epoll_wait(2), и обработку запросов, когда вдруг другой поток вызывает epoll_wait(2) с тем же массивом, а затем события будут перезаписаны в то же время, когда другой поток их читает. Нехорошо! Вам абсолютно нужен отдельный массив для каждого потока.

Предполагая, что у вас есть отдельный массив для каждого потока, либо возможность - ожидание на одном и том же epoll fd, либо отдельная epoll fd для каждого потока - будет работать одинаково хорошо, но обратите внимание, что семантика отличается. С глобально разделяемым epoll fd каждый поток ожидает запроса от любого клиента, потому что все клиенты добавляются к тому же epoll fd. При использовании отдельного epoll fd для каждого потока каждый поток несет основную ответственность за подмножество клиентов (те клиенты, которые были приняты этим потоком).

Это может быть неуместным для вашей системы, или это может иметь огромное значение. Например, может случиться так, что поток достаточно неудачен, чтобы получить группу опытных пользователей, которые делают тяжелые и частые запросы, оставляя этот поток перегруженным, в то время как другие потоки с менее агрессивными клиентами почти бездействуют. Разве это не было бы несправедливо? С другой стороны, возможно, вы хотели бы иметь только некоторые потоки, относящиеся к определенному классу пользователей, и в этом случае, возможно, имеет смысл иметь разные epoll fds для каждого потока. Как обычно, вам нужно рассмотреть обе возможности, оценить компромисс, подумать о своей конкретной проблеме и принять решение.

Ниже приведен пример использования всемирно разделяемой эпохи fd.Я изначально не планировал делать все это, но одна вещь привела к другой, и, ну, это было весело, и я думаю, что это может помочь вам начать работу. Это эхо-сервер, который прослушивает порт 3000 и имеет пул из 20 потоков, используя epoll, чтобы одновременно принимать новых клиентов и обслуживать запросы.

#include <stdio.h> 
#include <stdlib.h> 
#include <inttypes.h> 
#include <errno.h> 
#include <string.h> 
#include <pthread.h> 
#include <assert.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <sys/epoll.h> 

#define SERVERPORT 3000 
#define SERVERBACKLOG 10 
#define THREADSNO 20 
#define EVENTS_BUFF_SZ 256 

static int serversock; 
static int epoll_fd; 
static pthread_t threads[THREADSNO]; 

int accept_new_client(void) { 

    int clientsock; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     close(clientsock); 
     return -1; 
    } 

    printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port)); 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = clientsock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed attempting to add new client"); 
     close(clientsock); 
     return -1; 
    } 

    return 0; 
} 

int handle_request(int clientfd) { 
    char readbuff[512]; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    ssize_t n; 

    if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) { 
     return -1; 
    } 

    if (n == 0) { 
     return 0; 
    } 

    readbuff[n] = '\0'; 

    if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     return -1; 
    } 

    printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    ssize_t sent; 
    if ((sent = send(clientfd, readbuff, n, 0)) < 0) { 
     return -1; 
    } 

    readbuff[sent] = '\0'; 

    printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    return 0; 
} 

void *worker_thr(void *args) { 
    struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ); 
    if (events == NULL) { 
     perror("malloc(3) failed when attempting to allocate events buffer"); 
     pthread_exit(NULL); 
    } 

    int events_cnt; 
    while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) { 
     int i; 
     for (i = 0; i < events_cnt; i++) { 
      assert(events[i].events & EPOLLIN); 

      if (events[i].data.fd == serversock) { 
       if (accept_new_client() == -1) { 
        fprintf(stderr, "Error accepting new client: %s\n", 
         strerror(errno)); 
       } 
      } else { 
       if (handle_request(events[i].data.fd) == -1) { 
        fprintf(stderr, "Error handling request: %s\n", 
         strerror(errno)); 
       } 
      } 
     } 
    } 

    if (events_cnt == 0) { 
     fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?"); 
    } else { 
     perror("epoll_wait(2) error"); 
    } 

    free(events); 

    return NULL; 
} 

int main(void) { 
    if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { 
     perror("socket(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct sockaddr_in serveraddr; 
    serveraddr.sin_family = AF_INET; 
    serveraddr.sin_port = htons(SERVERPORT); 
    serveraddr.sin_addr.s_addr = INADDR_ANY; 

    if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { 
     perror("bind(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if (listen(serversock, SERVERBACKLOG) < 0) { 
     perror("listen(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if ((epoll_fd = epoll_create(1)) < 0) { 
     perror("epoll_create(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = serversock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed on main server socket"); 
     exit(EXIT_FAILURE); 
    } 

    int i; 
    for (i = 0; i < THREADSNO; i++) { 
     if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) { 
      perror("pthread_create(3) failed"); 
      exit(EXIT_FAILURE); 
     } 
    } 

    /* main thread also contributes as worker thread */ 
    worker_thr(NULL); 

    return 0; 
} 

Несколько примечаний:

  • main() должен возвращать int, не void (как вы показываете в вашем примере)
  • Всегда иметь дело с ответными коды ошибок. Очень часто их игнорируют, и когда что-то ломается, трудно понять, что произошло.
  • Код предполагает, что запрос не превышает 511 байт (как видно из размера буфера в handle_request()). Если запрос больше этого, возможно, что некоторые данные остаются в сокете в течение очень долгого времени, потому что epoll_wait(2) не сообщит об этом до тех пор, пока в этом дескрипторе файла не возникнет новое событие (поскольку мы используем EPOLLET). В худшем случае клиент никогда не может отправлять какие-либо новые данные и ждать ответа навсегда.
  • Код, который печатает идентификатор потока для каждого запроса, предполагает, что pthread_t является непрозрачным типом указателя. Действительно, pthread_t - это тип указателя в Linux, но он может быть целым типом на других платформах, поэтому это не переносимо. Однако, вероятно, это не проблема, так как epoll является специфичным для Linux, поэтому код все равно не переносится.
  • Предполагается, что другие запросы от одного и того же клиента не поступают, когда поток все еще обслуживает запрос от этого клиента. Если к этому моменту приходит новый запрос, и другой поток начинает его обслуживать, у нас есть условие гонки, и клиент не обязательно получит эхо-сообщения в том же порядке, который он отправил им (однако write(2) является атомарным, так что ответы могут быть не по порядку, они не будут пересекаться).
+0

Спасибо за всесторонний ответ. Это очень помогло. – MiJo

+0

@MiJo Glad Я мог бы помочь. Это был отличный вопрос :) –