#include "scoreboard.h"
#include "acceptlock.h"
-#include "http_accept.h"
+#include <poll.h>
#include <netinet/tcp.h>
-#include <pthread.h>
+#include <pthread.h>
/*
* Actual definitions of config globals
*/
int ap_threads_per_child=0; /* Worker threads per child */
-int ap_acceptors_per_child=0; /* Accept threads per child */
int ap_max_requests_per_child=0;
static char *ap_pid_fname=NULL;
static char *ap_scoreboard_fname=NULL;
static int ap_daemons_limit=0;
static time_t ap_restart_time=0;
API_VAR_EXPORT int ap_extended_status = 0;
+static int workers_may_exit = 0;
+static int requests_this_child;
+static int num_listenfds = 0;
+static struct pollfd *listenfds_child; /* The listenfds that each thread copies
+ for itself */
+
+/* The structure used to pass unique initialization info to each thread */
+typedef struct {
+ int pid;
+ int tid;
+ int sd;
+ pool *tpool; /* "pthread" would be confusing */
+} proc_info;
+#ifdef SINGLE_LISTEN_UNSERIALIZED_ACCEPT
+#define SAFE_ACCEPT(stmt) do {if (ap_listeners->next != NULL) {stmt;}} while (0)
+#else
+#define SAFE_ACCEPT(stmt) do {stmt;} while (0)
+#endif
/*
* The max child slot ever assigned, preserved across restarts. Necessary
static char ap_coredump_dir[MAX_STRING_LEN];
-int ap_pipe_of_death[2];
+static int pipe_of_death[2];
+static pthread_mutex_t pipe_of_death_mutex;
/* *Non*-shared http_main globals... */
int thread_slot = ti->tid;
pool *tpool = ti->tpool;
struct sockaddr sa_client;
- int csd;
+ int csd = -1;
pool *ptrans; /* Pool for per-transaction stuff */
+ int sd = -1;
+ int srv;
+ int ret;
+ char pipe_read_char;
+ int curr_pollfd, last_pollfd = 0;
+ size_t len = sizeof(struct sockaddr);
+ struct pollfd *listenfds;
free(ti);
worker_thread_count++;
pthread_mutex_unlock(&worker_thread_count_mutex);
- while (1) {
+ /* TODO: Switch to a system where threads reuse the results from earlier
+ poll calls - manoj */
+ /* set up each thread's individual pollfd array */
+ listenfds = ap_palloc(tpool, sizeof(struct pollfd) * (num_listenfds + 1));
+ memcpy(listenfds, listenfds_child, sizeof(struct pollfd) * (num_listenfds + 1));
+ while (!workers_may_exit) {
+ workers_may_exit |= (ap_max_requests_per_child != 0) && (requests_this_child <= 0);
+ if (workers_may_exit) break;
+
(void) ap_update_child_status(process_slot, thread_slot, SERVER_READY,
- (request_rec *) NULL);
- csd = get_connection(&sa_client);
- if (csd < 0) {
+ (request_rec *) NULL);
+ SAFE_ACCEPT(intra_mutex_on(0));
+ if (workers_may_exit) {
+ SAFE_ACCEPT(intra_mutex_off(0));
break;
- }
+ }
+ SAFE_ACCEPT(accept_mutex_on(0));
+ while (!workers_may_exit) {
+ srv = poll(listenfds, num_listenfds + 1, -1);
+ if (srv < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ /* poll() will only return errors in catastrophic
+ * circumstances. Let's try exiting gracefully, for now. */
+ ap_log_error(APLOG_MARK, APLOG_ERR, (const server_rec *)
+ ap_get_server_conf(), "poll: (listen)");
+ workers_may_exit = 1;
+ }
+
+ if (workers_may_exit) break;
+
+ if (listenfds[0].revents & POLLIN) {
+ /* A process got a signal on the shutdown pipe. Check if we're
+ * the lucky process to die. */
+ pthread_mutex_lock(&pipe_of_death_mutex);
+ if (!workers_may_exit) {
+ ret = read(listenfds[0].fd, &pipe_read_char, 1);
+ if (ret == -1 && errno == EAGAIN) {
+ /* It lost the lottery. It must continue to suffer
+ * through a life of servitude. */
+ pthread_mutex_unlock(&pipe_of_death_mutex);
+ continue;
+ }
+ else {
+ /* It won the lottery (or something else is very
+ * wrong). Embrace death with open arms. */
+ workers_may_exit = 1;
+ pthread_mutex_unlock(&pipe_of_death_mutex);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&pipe_of_death_mutex);
+ }
+
+ if (num_listenfds == 1) {
+ sd = ap_listeners->fd;
+ goto got_fd;
+ }
+ else {
+ /* find a listener */
+ curr_pollfd = last_pollfd;
+ do {
+ curr_pollfd++;
+ if (curr_pollfd > num_listenfds) {
+ curr_pollfd = 1;
+ }
+ /* XXX: Should we check for POLLERR? */
+ if (listenfds[curr_pollfd].revents & POLLIN) {
+ last_pollfd = curr_pollfd;
+ sd = listenfds[curr_pollfd].fd;
+ goto got_fd;
+ }
+ } while (curr_pollfd != last_pollfd);
+ }
+ }
+ got_fd:
+ SAFE_ACCEPT(accept_mutex_off(0));
+ SAFE_ACCEPT(intra_mutex_off(0));
+ if (workers_may_exit) break;
+ csd = ap_accept(sd, &sa_client, &len);
process_socket(ptrans, &sa_client, csd, process_slot, thread_slot);
- ap_clear_pool(ptrans);
+ ap_clear_pool(ptrans);
+ requests_this_child--;
}
ap_destroy_pool(tpool);
int i;
int my_child_num = child_num_arg;
proc_info *my_info = NULL;
+ ap_listen_rec *lr;
my_pid = getpid();
pchild = ap_make_sub_pool(pconf);
clean_child_exit(APEXIT_CHILDFATAL);
}
- accept_child_init(pchild, ap_threads_per_child);
+ SAFE_ACCEPT(intra_mutex_init(pchild, 1));
+ SAFE_ACCEPT(accept_mutex_child_init(pchild));
ap_child_init_hook(pchild, server_conf);
/*done with init critical section */
ap_log_error(APLOG_MARK, APLOG_ALERT, server_conf, "pthread_sigmask");
}
+ requests_this_child = ap_max_requests_per_child;
+
+ /* Set up the pollfd array */
+ listenfds_child = ap_palloc(pchild, sizeof(struct pollfd) * (num_listenfds + 1));
+ listenfds_child[0].fd = pipe_of_death[0];
+ listenfds_child[0].events = POLLIN;
+ listenfds_child[0].revents = 0;
+ for (lr = ap_listeners, i = 1; i <= num_listenfds; lr = lr->next, ++i) {
+ listenfds_child[i].fd = lr->fd;
+ listenfds_child[i].events = POLLIN; /* should we add POLLPRI ?*/
+ listenfds_child[i].revents = 0;
+ }
+
/* Setup worker threads */
worker_thread_count = 0;
pthread_mutex_init(&worker_thread_count_mutex, NULL);
+ pthread_mutex_init(&pipe_of_death_mutex, NULL);
pthread_attr_init(&thread_attr);
pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
for (i=0; i < ap_threads_per_child; i++) {
}
pthread_attr_destroy(&thread_attr);
- start_accepting_connections(my_child_num);
/* This thread will be the one responsible for handling signals */
sigemptyset(&sig_mask);
{
int pid;
- if (ap_acceptors_per_child + ap_threads_per_child > HARD_THREAD_LIMIT) {
- ap_log_error(APLOG_MARK, APLOG_ERR, s,
- "Worker threads plus acceptor threads is greater than HARD_THREAD_LIMIT, please correct");
- exit(-1);
- }
-
-
if (slot + 1 > max_daemons_limit) {
max_daemons_limit = slot + 1;
}
if (idle_count_ceil > ap_daemons_max_free) {
/* Kill off one child */
char char_of_death = '!';
- if (write(ap_pipe_of_death[1], &char_of_death, 1) == -1) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, server_conf, "write ap_pipe_of_death");
+ if (write(pipe_of_death[1], &char_of_death, 1) == -1) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, server_conf, "write pipe_of_death");
}
idle_spawn_rate = 1;
}
if (pid >= 0) {
child_slot = find_child_by_pid(pid);
if (child_slot >= 0) {
- for (i = 0; i < ap_threads_per_child + ap_acceptors_per_child; i++)
+ for (i = 0; i < ap_threads_per_child; i++)
ap_update_child_status(child_slot, i, SERVER_DEAD, (request_rec *) NULL);
if (remaining_children_to_start
int ap_mpm_run(pool *_pconf, pool *plog, server_rec *s)
{
int remaining_children_to_start;
- int listener_count;
pconf = _pconf;
server_conf = s;
- if (pipe(ap_pipe_of_death) == -1) {
+ if (pipe(pipe_of_death) == -1) {
ap_log_error(APLOG_MARK, APLOG_ERR,
(const server_rec*) server_conf,
"pipe: (pipe_of_death)");
exit(1);
}
- ap_note_cleanups_for_fd(pconf, ap_pipe_of_death[0]);
- ap_note_cleanups_for_fd(pconf, ap_pipe_of_death[1]);
- if (fcntl(ap_pipe_of_death[0], F_SETFD, O_NONBLOCK) == -1) {
+ ap_note_cleanups_for_fd(pconf, pipe_of_death[0]);
+ ap_note_cleanups_for_fd(pconf, pipe_of_death[1]);
+ if (fcntl(pipe_of_death[0], F_SETFD, O_NONBLOCK) == -1) {
ap_log_error(APLOG_MARK, APLOG_ERR,
(const server_rec*) server_conf,
"fcntl: O_NONBLOCKing (pipe_of_death)");
exit(1);
}
server_conf = s;
- if ((listener_count = setup_listeners(pconf, server_conf)) < 1) {
+ if ((num_listenfds = setup_listeners(pconf, server_conf)) < 1) {
/* XXX: hey, what's the right way for the mpm to indicate a fatal error? */
ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_ALERT, s,
"no listening sockets available, shutting down");
ap_clear_pool(plog);
ap_open_logs(server_conf, plog);
ap_log_pid(pconf, ap_pid_fname);
- accept_parent_init(pconf, listener_count);
+ SAFE_ACCEPT(accept_mutex_init(pconf, 1));
if (!is_graceful) {
reinit_scoreboard(pconf);
}
/* kill off the idle ones */
for (i = 0; i < ap_daemons_limit; ++i) {
- if (write(ap_pipe_of_death[1], &char_of_death, 1) == -1) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, server_conf, "write ap_pipe_of_death");
+ if (write(pipe_of_death[1], &char_of_death, 1) == -1) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, server_conf, "write pipe_of_death");
}
}
*/
for (i = 0; i < ap_daemons_limit; ++i) {
- for (j = 0; j < ap_threads_per_child + ap_acceptors_per_child;
- j++) {
+ for (j = 0; j < ap_threads_per_child; j++) {
if (ap_scoreboard_image->servers[i][j].status != SERVER_DEAD) {
ap_scoreboard_image->servers[i][j].status = SERVER_GRACEFUL;
}
}
ap_threads_per_child = atoi(arg);
- if (ap_threads_per_child < 1) {
+ if (ap_threads_per_child > HARD_THREAD_LIMIT) {
+ fprintf(stderr, "WARNING: ThreadsPerChild of %d exceeds compile time"
+ "limit of %d threads,\n", ap_threads_per_child,
+ HARD_THREAD_LIMIT);
+ fprintf(stderr, " lowering ThreadsPerChild to %d. To increase, please"
+ "see the\n", HARD_THREAD_LIMIT);
+ fprintf(stderr, " HARD_THREAD_LIMIT define in src/include/httpd.h.\n");
+ }
+ else if (ap_threads_per_child < 1) {
fprintf(stderr, "WARNING: Require ThreadsPerChild > 0, setting to 1\n");
ap_threads_per_child = 1;
}