From c3aa375bf945582478c47fb16188151534e4e1a2 Mon Sep 17 00:00:00 2001 From: thib Date: Sat, 30 Sep 2000 11:57:16 +0000 Subject: [PATCH] struct exe now includes the pid of the job executed in order to permit multiple executions of a job simultaneously struct CL includes the number of time the job is include in a queue --- database.c | 454 +++++++++++++++++++++++++++-------------------------- global.h | 21 ++- 2 files changed, 250 insertions(+), 225 deletions(-) diff --git a/database.c b/database.c index 24366cd..53b7439 100644 --- a/database.c +++ b/database.c @@ -22,16 +22,17 @@ * `LICENSE' that comes with the fcron source distribution. */ - /* $Id: database.c,v 1.30 2000-09-17 20:08:37 thib Exp $ */ + /* $Id: database.c,v 1.31 2000-09-30 11:59:00 thib Exp $ */ #include "fcron.h" int is_leap_year(int year); int get_nb_mdays(int year, int mon); void goto_non_matching(CL *line, struct tm *tm); +void run_normal_job(CL *line); void run_serial_job(void); -void run_queue_job(CL *line); void run_lavg_job(int i); +void run_queue_job(CL *line); void @@ -53,80 +54,47 @@ test_jobs(time_t t2) j->j_line->cl_remain = j->j_line->cl_runfreq; - if (j->j_line->cl_pid > 0) { - warn(" process already running: %s '%s'", - j->j_line->cl_file->cf_user, - j->j_line->cl_shell - ); - } - else if ( is_lavg(j->j_line->cl_option) ) + if ( is_lavg(j->j_line->cl_option) ) add_lavg_job(j->j_line); else if ( is_serial(j->j_line->cl_option) ) add_serial_job(j->j_line); else - run_queue_job(j->j_line); + run_normal_job(j->j_line); } } + void -add_serial_job(CL *line) - /* add the next queued job in serial queue */ +run_normal_job(CL *line) { - short int i; - -#if SERIAL_ONCE - /* check if the line is already in the serial queue */ - if ( line->cl_pid != -1 ) { -#endif /* SERIAL_ONCE */ - -/* // */ - debug("inserting in serial queue %s", line->cl_shell); -/* // */ - - line->cl_pid = -1; - - if ( serial_num >= serial_array_size ) { - if ( serial_num >= SERIAL_QUEUE_MAX ) - /* run next job in the queue before adding the new one */ - run_serial_job(); - else { - CL **ptr = NULL; - short int old_size = serial_array_size; - - debug("Resizing serial_array"); - serial_array_size = (serial_array_size + SERIAL_GROW_SIZE); - - if ( (ptr = calloc(serial_array_size, sizeof(CL *))) == NULL ) - die_e("could not calloc serial_array"); - - /* copy lines in order to have the first line at the index 0 */ - memcpy(ptr + serial_array_index, serial_array, - (sizeof(CL*) * (old_size - serial_array_index)) ); - memcpy(ptr, serial_array + (old_size - serial_array_index), - (sizeof(CL*) * serial_array_index)); - serial_array_index = 0; - free(serial_array); - serial_array = ptr; - } - } - if ( (i = serial_array_index + serial_num) >= serial_array_size ) - i -= serial_array_size; - - serial_array[i] = line; + if (line->cl_numexe <= 0 || + (is_exe_sev(line->cl_option) && line->cl_numexe < UCHAR_MAX)) { + line->cl_numexe += 1; + run_queue_job(line); + } + else { + warn(" process already running: %s '%s'", + line->cl_file->cf_user, + line->cl_shell + ); + } - serial_num++; +} - debug("num: %d size:%d index:%d curline:%d running:%d", serial_num, - serial_array_size, serial_array_index, i, serial_running); +void +run_lavg_job(int i) +{ + run_queue_job(lavg_array[i].l_line); -#if SERIAL_ONCE + if (i < --lavg_num) { + lavg_array[i] = lavg_array[lavg_num]; + lavg_array[lavg_num].l_line = NULL; } else - debug("already in serial queue %s", line->cl_shell); -#endif /* SERIAL_ONCE */ + lavg_array[i].l_line = NULL; } @@ -165,33 +133,210 @@ run_queue_job(CL *line) /* append job to the list of executed job */ if ( exe_num >= exe_array_size ) { - CL **ptr = NULL; + struct exe *ptr = NULL; short int old_size = exe_array_size; debug("Resizing exe_array"); exe_array_size = (exe_array_size + EXE_GROW_SIZE); - if ( (ptr = calloc(exe_array_size, sizeof(CL *))) == NULL ) + if ( (ptr = calloc(exe_array_size, sizeof(struct exe))) == NULL ) die_e("could not calloc exe_array"); - memcpy(ptr, exe_array, (sizeof(CL *) * old_size)); + memcpy(ptr, exe_array, (sizeof(struct exe) * old_size)); free(exe_array); exe_array = ptr; } - exe_array[exe_num++] = line; + exe_array[exe_num].e_line = line; - run_job(line); + run_job(&exe_array[exe_num++]); } +void +insert_nextexe(CL *line) + /* insert a job the queue list */ +{ + struct job *newjob; + + if (queue_base != NULL) { + struct job *j; + struct job *jprev = NULL; + struct job *old_entry = NULL; + + /* find the job in the list */ + for (j = queue_base; j != NULL ; j = j->j_next) + if ( j->j_line == line ) { + old_entry = j; + /* remove it from the list */ + if (jprev != NULL) { + jprev->j_next = j->j_next; + j = jprev; + } + else + /* first element of the list */ + j = queue_base = j->j_next; + + break; + } + else + jprev = j; + + jprev = NULL; + if (j == NULL || line->cl_nextexe < j->j_line->cl_nextexe) + j = queue_base; + while (j != NULL && (line->cl_nextexe >= j->j_line->cl_nextexe)) { + jprev = j; + j = j->j_next; + } + + if (old_entry == NULL) { + /* this job wasn't in the queue : we append it */ + Alloc(newjob, job); + newjob->j_line = line; + } + else + /* this job was already in the queue : we move it */ + newjob = old_entry; + + newjob->j_next = j; + + if (jprev == NULL) + queue_base = newjob; + else + jprev->j_next = newjob; + + } + else { + /* no job in queue */ + Alloc(newjob, job); + newjob->j_line = line; + queue_base = newjob; + } + +} + +void +add_serial_job(CL *line) + /* add the next queued job in serial queue */ +{ + short int i; + + /* check if the line is already in the serial queue */ + if ( (is_serial_once(line->cl_option) && line->cl_numexe > 0) || + line->cl_numexe >= UCHAR_MAX ) { + debug("already in serial queue %s", line->cl_shell); + return; + } + +/* // */ + debug("inserting in serial queue %s", line->cl_shell); +/* // */ + + if ( serial_num >= serial_array_size ) { + if ( serial_num >= SERIAL_QUEUE_MAX ) + /* run next job in the queue before adding the new one */ + run_serial_job(); + else { + CL **ptr = NULL; + short int old_size = serial_array_size; + + debug("Resizing serial_array"); + serial_array_size = (serial_array_size + SERIAL_GROW_SIZE); + + if ( (ptr = calloc(serial_array_size, sizeof(CL *))) == NULL ) + die_e("could not calloc serial_array"); + + /* copy lines in order to have the first line at the index 0 */ + memcpy(ptr + serial_array_index, serial_array, + (sizeof(CL*) * (old_size - serial_array_index)) ); + memcpy(ptr, serial_array + (old_size - serial_array_index), + (sizeof(CL*) * serial_array_index)); + serial_array_index = 0; + free(serial_array); + serial_array = ptr; + } + } + + if ( (i = serial_array_index + serial_num) >= serial_array_size ) + i -= serial_array_size; + + serial_array[i] = line; + + serial_num++; + line->cl_numexe += 1; + + debug("num: %d size:%d index:%d curline:%d running:%d", serial_num, + serial_array_size, serial_array_index, i, serial_running); + + +} + + +void +add_lavg_job(CL *line) + /* add the next queued job in lavg queue */ +{ + +#if LAVG_ONCE + /* check if the line is already in the lavg queue */ + if ( line->cl_numexe <= 0) { +#else + if ( line->cl_numexe < UCHAR_MAX ) { +#endif /* LAVG_ONCE */ + +/* // */ + debug("inserting in lavg queue %s", line->cl_shell); +/* // */ + + /* append job to the list of lavg job */ + if ( lavg_num >= lavg_array_size ) { + if ( lavg_num >= LAVG_QUEUE_MAX ) { + /* run the next lavg job (the oldest one) */ + register int i; + int j = 0; + + for (i = 1; i < lavg_num; i++) + if ( lavg_array[i].l_until < lavg_array[j].l_until ) + j = i; + run_lavg_job(j); + } + else { + struct lavg *ptr = NULL; + short int old_size = lavg_array_size; + + debug("Resizing lavg_array"); + lavg_array_size = (lavg_array_size + LAVG_GROW_SIZE); + + if ( (ptr = calloc(lavg_array_size, sizeof(lavg))) == NULL ) + die_e("could not calloc lavg_array"); + + memcpy(ptr, lavg_array, (sizeof(lavg) * old_size)); + free(lavg_array); + lavg_array = ptr; + } + } + + lavg_array[lavg_num].l_line = line; + line->cl_numexe += 1; + lavg_array[lavg_num++].l_until = + (line->cl_until > 0) ? now + line->cl_until : 0; + + } + else + debug("already in lavg queue %s", line->cl_shell); + +} + + void wait_chld(void) /* wait_chld() - check for job completion */ { short int i = 0; int pid; + CL *line = NULL; /* // */ @@ -201,28 +346,29 @@ wait_chld(void) while ( (pid = wait3(NULL, WNOHANG, NULL)) > 0 ) { i = 0; while ( i < exe_num ) { - if (pid == exe_array[i]->cl_pid) { - debug("job finished: %s", exe_array[i]->cl_shell); - exe_array[i]->cl_pid = 0; - exe_array[i]->cl_file->cf_running -= 1; - - if ( is_serial_once(exe_array[i]->cl_option) ) { - clear_serial_once(exe_array[i]->cl_option); - if ( --serial_running <= 0 ) - run_serial_job(); + if (pid == exe_array[i].e_pid) { + line = exe_array[i].e_line; + debug("job finished: %s", line->cl_shell); + line->cl_numexe -= 1; + line->cl_file->cf_running -= 1; + + if ( is_serial_once(line->cl_option) ) { + clear_serial_once(line->cl_option); + if ( --serial_running <= 0 ) + run_serial_job(); } - else if ( is_serial(exe_array[i]->cl_option) - && ! is_lavg(exe_array[i]->cl_option) ) { - if (--serial_running <= 0) - run_serial_job(); + else if ( is_serial(line->cl_option) + && ! is_lavg(line->cl_option) ) { + if (--serial_running <= 0) + run_serial_job(); } if (i < --exe_num) { exe_array[i] = exe_array[exe_num]; - exe_array[exe_num] = NULL; + exe_array[exe_num].e_line = NULL; } else - exe_array[i] = NULL; + exe_array[i].e_line = NULL; goto nextloop; } @@ -248,15 +394,20 @@ wait_all(int *counter) while ( (*counter > 0) && (pid = wait3(NULL, 0, NULL)) > 0 ) { i = 0; while ( i < exe_num ) { - if (pid == exe_array[i]->cl_pid) { - exe_array[i]->cl_pid = 0; - exe_array[i]->cl_file->cf_running -= 1; + if (pid == exe_array[i].e_pid) { + debug("job finished: %s", exe_array[i].e_line->cl_shell); + exe_array[i].e_line->cl_numexe -= 1; + exe_array[i].e_line->cl_file->cf_running -= 1; + + if ( is_serial_once(exe_array[i].e_line->cl_option) ) + clear_serial_once(exe_array[i].e_line->cl_option); + if (i < --exe_num) { exe_array[i] = exe_array[exe_num]; - exe_array[exe_num] = NULL; + exe_array[exe_num].e_line = NULL; } else - exe_array[i] = NULL; + exe_array[i].e_line = NULL; goto nextloop; } @@ -575,141 +726,6 @@ set_next_exe(CL *line, char is_new_line) } -void -insert_nextexe(CL *line) - /* insert a job the queue list */ -{ - struct job *newjob; - - if (queue_base != NULL) { - struct job *j; - struct job *jprev = NULL; - struct job *old_entry = NULL; - - /* find the job in the list */ - for (j = queue_base; j != NULL ; j = j->j_next) - if ( j->j_line == line ) { - old_entry = j; - /* remove it from the list */ - if (jprev != NULL) { - jprev->j_next = j->j_next; - j = jprev; - } - else - /* first element of the list */ - j = queue_base = j->j_next; - - break; - } - else - jprev = j; - - jprev = NULL; - if (j == NULL || line->cl_nextexe < j->j_line->cl_nextexe) - j = queue_base; - while (j != NULL && (line->cl_nextexe >= j->j_line->cl_nextexe)) { - jprev = j; - j = j->j_next; - } - - if (old_entry == NULL) { - /* this job wasn't in the queue : we append it */ - Alloc(newjob, job); - newjob->j_line = line; - } - else - /* this job was already in the queue : we move it */ - newjob = old_entry; - - newjob->j_next = j; - - if (jprev == NULL) - queue_base = newjob; - else - jprev->j_next = newjob; - - } - else { - /* no job in queue */ - Alloc(newjob, job); - newjob->j_line = line; - queue_base = newjob; - } - -} - -void -add_lavg_job(CL *line) - /* add the next queued job in lavg queue */ -{ - -#if LAVG_ONCE - /* check if the line is already in the lavg queue */ - if ( line->cl_pid != -1 ) { -#endif /* LAVG_ONCE */ - -/* // */ - debug("inserting in lavg queue %s", line->cl_shell); -/* // */ - - line->cl_pid = -1; - - /* append job to the list of lavg job */ - if ( lavg_num >= lavg_array_size ) { - if ( lavg_num >= LAVG_QUEUE_MAX ) { - /* run the next lavg job (the oldest one) */ - register int i; - int j = 0; - - for (i = 1; i < lavg_num; i++) - if ( lavg_array[i].l_until < lavg_array[j].l_until ) - j = i; - run_lavg_job(j); - } - else { - struct lavg *ptr = NULL; - short int old_size = lavg_array_size; - - debug("Resizing lavg_array"); - lavg_array_size = (lavg_array_size + LAVG_GROW_SIZE); - - if ( (ptr = calloc(lavg_array_size, sizeof(lavg))) == NULL ) - die_e("could not calloc lavg_array"); - - memcpy(ptr, lavg_array, (sizeof(lavg) * old_size)); - free(lavg_array); - lavg_array = ptr; - } - } - - lavg_array[lavg_num].l_line = line; - lavg_array[lavg_num++].l_until = - (line->cl_until > 0) ? now + line->cl_until : 0; - -#if LAVG_ONCE - } - else - debug("already in lavg queue %s", line->cl_shell); -#endif /* LAVG_ONCE */ - -} - -void -run_lavg_job(int i) -{ - - run_queue_job(lavg_array[i].l_line); - - if (i < --lavg_num) { - lavg_array[i] = lavg_array[lavg_num]; - lavg_array[lavg_num].l_line = NULL; - } - else - lavg_array[i].l_line = NULL; - -} - - time_t check_lavg(time_t lim) /* run a job based on system load average if one should be run diff --git a/global.h b/global.h index b5b3263..d3c498b 100644 --- a/global.h +++ b/global.h @@ -21,7 +21,7 @@ * `LICENSE' that comes with the fcron source distribution. */ - /* $Id: global.h,v 1.19 2000-09-12 19:53:10 thib Exp $ */ + /* $Id: global.h,v 1.20 2000-09-30 11:57:16 thib Exp $ */ /* @@ -88,11 +88,15 @@ #include #endif +#ifdef HAVE_LIMITS_H +#include +#endif + #include "bitstring.h" #include "option.h" -#define FILEVERSION "012" /* syntax's version of fcrontabs : +#define FILEVERSION "014" /* syntax's version of fcrontabs : * must have a length of 3 characters */ /* you should not change this (nor need to do it) */ @@ -127,16 +131,16 @@ typedef struct CL { struct CF *cl_file; /* the file in which the line is */ unsigned short cl_option; /* options for that line (see option.h) */ char *cl_shell; /* shell command */ - char cl_lavg[3]; /* load averages needed (1, 5, 15 mins) */ + unsigned char cl_numexe; /* num of entries in lavg/serial queue */ + unsigned char cl_lavg[3]; /* load averages needed (1, 5, 15 mins) */ time_t cl_until; /* timeout of the wait for a lavg value */ char cl_nice; /* nice value to control priority */ uid_t cl_runas; /* determine permissions of the job */ uid_t cl_mailto; /* mail output to cl_mailto */ - pid_t cl_pid; /* running pid, 0, or armed (-1) */ time_t cl_nextexe; /* time and date of the next execution */ - short int cl_remain; /* remaining until next execution */ + unsigned short cl_remain; /* remaining until next execution */ time_t cl_timefreq; /* Run every n seconds */ - short int cl_runfreq; /* Run once every n matches */ + unsigned short cl_runfreq; /* Run once every n matches */ /* see bitstring(3) man page for more details */ bitstr_t bit_decl(cl_mins, 60); /* 0-59 */ bitstr_t bit_decl(cl_hrs, 24); /* 0-23 */ @@ -155,5 +159,10 @@ typedef struct lavg { time_t l_until; /* the timeout of the wait for load averages */ } lavg; +typedef struct exe { + struct CL *e_line; + pid_t e_pid; +} exe; + #endif /* __GLOBALH__ */ -- 2.40.0