zed: allow limiting concurrent jobs

200ms time-out is relatively long, but if we already hit the cap,
then we'll likely be able to spawn multiple new jobs when we wake up

Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Ahelenia Ziemiańska <nabijaczleweli@nabijaczleweli.xyz>
Closes #11807
This commit is contained in:
наб
2021-03-29 15:21:54 +02:00
committed by Brian Behlendorf
parent 02a0fa1999
commit 73218f41b4
8 changed files with 59 additions and 23 deletions
+23 -8
View File
@@ -63,6 +63,7 @@ static pthread_t _reap_children_tid = (pthread_t)-1;
static volatile boolean_t _reap_children_stop;
static avl_tree_t _launched_processes;
static pthread_mutex_t _launched_processes_lock = PTHREAD_MUTEX_INITIALIZER;
static int16_t _launched_processes_limit;
/*
* Create an environment string array for passing to execve() using the
@@ -122,12 +123,18 @@ _zed_exec_fork_child(uint64_t eid, const char *dir, const char *prog,
int fd;
struct launched_process_node *node;
sigset_t mask;
struct timespec launch_timeout =
{ .tv_sec = 0, .tv_nsec = 200 * 1000 * 1000, };
assert(dir != NULL);
assert(prog != NULL);
assert(env != NULL);
assert(zfd >= 0);
while (__atomic_load_n(&_launched_processes_limit,
__ATOMIC_SEQ_CST) <= 0)
(void) nanosleep(&launch_timeout, NULL);
n = snprintf(path, sizeof (path), "%s/%s", dir, prog);
if ((n < 0) || (n >= sizeof (path))) {
zed_log_msg(LOG_WARNING,
@@ -159,6 +166,7 @@ _zed_exec_fork_child(uint64_t eid, const char *dir, const char *prog,
/* parent process */
__atomic_sub_fetch(&_launched_processes_limit, 1, __ATOMIC_SEQ_CST);
zed_log_msg(LOG_INFO, "Invoking \"%s\" eid=%llu pid=%d",
prog, eid, pid);
@@ -216,6 +224,8 @@ _reap_children(void *arg)
free(pnode);
}
(void) pthread_mutex_unlock(&_launched_processes_lock);
__atomic_add_fetch(&_launched_processes_limit, 1,
__ATOMIC_SEQ_CST);
if (WIFEXITED(status)) {
zed_log_msg(LOG_INFO,
@@ -270,20 +280,21 @@ zed_exec_fini(void)
* Process the event [eid] by synchronously invoking all zedlets with a
* matching class prefix.
*
* Each executable in [zedlets] from the directory [dir] is matched against
* the event's [class], [subclass], and the "all" class (which matches
* all events). Every zedlet with a matching class prefix is invoked.
* Each executable in [zcp->zedlets] from the directory [zcp->zedlet_dir]
* is matched against the event's [class], [subclass], and the "all" class
* (which matches all events).
* Every zedlet with a matching class prefix is invoked.
* The NAME=VALUE strings in [envs] will be passed to the zedlet as
* environment variables.
*
* The file descriptor [zfd] is the zevent_fd used to track the
* The file descriptor [zcp->zevent_fd] is the zevent_fd used to track the
* current cursor location within the zevent nvlist.
*
* Return 0 on success, -1 on error.
*/
int
zed_exec_process(uint64_t eid, const char *class, const char *subclass,
const char *dir, zed_strings_t *zedlets, zed_strings_t *envs, int zfd)
struct zed_conf *zcp, zed_strings_t *envs)
{
const char *class_strings[4];
const char *allclass = "all";
@@ -292,10 +303,12 @@ zed_exec_process(uint64_t eid, const char *class, const char *subclass,
char **e;
int n;
if (!dir || !zedlets || !envs || zfd < 0)
if (!zcp->zedlet_dir || !zcp->zedlets || !envs || zcp->zevent_fd < 0)
return (-1);
if (_reap_children_tid == (pthread_t)-1) {
_launched_processes_limit = zcp->max_jobs;
if (pthread_create(&_reap_children_tid, NULL,
_reap_children, NULL) != 0)
return (-1);
@@ -321,11 +334,13 @@ zed_exec_process(uint64_t eid, const char *class, const char *subclass,
e = _zed_exec_create_env(envs);
for (z = zed_strings_first(zedlets); z; z = zed_strings_next(zedlets)) {
for (z = zed_strings_first(zcp->zedlets); z;
z = zed_strings_next(zcp->zedlets)) {
for (csp = class_strings; *csp; csp++) {
n = strlen(*csp);
if ((strncmp(z, *csp, n) == 0) && !isalpha(z[n]))
_zed_exec_fork_child(eid, dir, z, e, zfd);
_zed_exec_fork_child(eid, zcp->zedlet_dir,
z, e, zcp->zevent_fd);
}
}
free(e);