run-command API: have run_process_parallel() take an "opts" struct

As noted in fd3aaf53f7 (run-command: add an "ungroup" option to
run_process_parallel(), 2022-06-07) which added the "ungroup" passing
it to "run_process_parallel()" via the global
"run_processes_parallel_ungroup" variable was a compromise to get the
smallest possible regression fix for "maint" at the time.

This follow-up to that is a start at passing that parameter and others
via a new "struct run_process_parallel_opts", as the earlier
version[1] of what became fd3aaf53f7 did.

Since we need to change all of the occurrences of "n" to
"opt->SOMETHING" let's take the opportunity and rename the terse "n"
to "processes". We could also have picked "max_processes", "jobs",
"threads" etc., but as the API is named "run_processes_parallel()"
let's go with "processes".

Since the new "run_processes_parallel()" function is able to take an
optional "tr2_category" and "tr2_label" via the struct we can at this
point migrate all of the users of "run_processes_parallel_tr2()" over
to it.

But let's not migrate all the API users yet, only the two users that
passed the "ungroup" parameter via the
"run_processes_parallel_ungroup" global

1. https://lore.kernel.org/git/cover-v2-0.8-00000000000-20220518T195858Z-avarab@gmail.com/

Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
Ævar Arnfjörð Bjarmason 2022-10-12 23:02:26 +02:00 committed by Junio C Hamano
parent c333e6f3a8
commit 6e5ba0bae4
4 changed files with 121 additions and 59 deletions

23
hook.c
View File

@ -114,8 +114,20 @@ int run_hooks_opt(const char *hook_name, struct run_hooks_opt *options)
.options = options, .options = options,
}; };
const char *const hook_path = find_hook(hook_name); const char *const hook_path = find_hook(hook_name);
int jobs = 1;
int ret = 0; int ret = 0;
const struct run_process_parallel_opts opts = {
.tr2_category = "hook",
.tr2_label = hook_name,
.processes = 1,
.ungroup = 1,
.get_next_task = pick_next_hook,
.start_failure = notify_start_failure,
.task_finished = notify_hook_finished,
.data = &cb_data,
};
if (!options) if (!options)
BUG("a struct run_hooks_opt must be provided to run_hooks"); BUG("a struct run_hooks_opt must be provided to run_hooks");
@ -137,14 +149,7 @@ int run_hooks_opt(const char *hook_name, struct run_hooks_opt *options)
cb_data.hook_path = abs_path.buf; cb_data.hook_path = abs_path.buf;
} }
run_processes_parallel_ungroup = 1; run_processes_parallel(&opts);
run_processes_parallel_tr2(jobs,
pick_next_hook,
notify_start_failure,
notify_hook_finished,
&cb_data,
"hook",
hook_name);
ret = cb_data.rc; ret = cb_data.rc;
cleanup: cleanup:
strbuf_release(&abs_path); strbuf_release(&abs_path);

View File

@ -1496,7 +1496,6 @@ enum child_state {
GIT_CP_WAIT_CLEANUP, GIT_CP_WAIT_CLEANUP,
}; };
int run_processes_parallel_ungroup;
struct parallel_processes { struct parallel_processes {
void *const data; void *const data;
@ -1558,11 +1557,12 @@ static void handle_children_on_signal(int signo)
} }
static void pp_init(struct parallel_processes *pp, static void pp_init(struct parallel_processes *pp,
get_next_task_fn get_next_task, const struct run_process_parallel_opts *opts)
start_failure_fn start_failure,
task_finished_fn task_finished)
{ {
const size_t n = pp->max_processes; const size_t n = opts->processes;
get_next_task_fn get_next_task = opts->get_next_task;
start_failure_fn start_failure = opts->start_failure;
task_finished_fn task_finished = opts->task_finished;
if (!n) if (!n)
BUG("you must provide a non-zero number of processes!"); BUG("you must provide a non-zero number of processes!");
@ -1769,27 +1769,27 @@ static int pp_collect_finished(struct parallel_processes *pp)
return result; return result;
} }
void run_processes_parallel(size_t n, void run_processes_parallel(const struct run_process_parallel_opts *opts)
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
void *pp_cb)
{ {
int i, code; int i, code;
int output_timeout = 100; int output_timeout = 100;
int spawn_cap = 4; int spawn_cap = 4;
int ungroup = run_processes_parallel_ungroup;
struct parallel_processes pp = { struct parallel_processes pp = {
.max_processes = n, .max_processes = opts->processes,
.data = pp_cb, .data = opts->data,
.buffered_output = STRBUF_INIT, .buffered_output = STRBUF_INIT,
.ungroup = ungroup, .ungroup = opts->ungroup,
}; };
/* options */
const char *tr2_category = opts->tr2_category;
const char *tr2_label = opts->tr2_label;
const int do_trace2 = tr2_category && tr2_label;
/* unset for the next API user */ if (do_trace2)
run_processes_parallel_ungroup = 0; trace2_region_enter_printf(tr2_category, tr2_label, NULL,
"max:%d", opts->processes);
pp_init(&pp, get_next_task, start_failure, task_finished); pp_init(&pp, opts);
while (1) { while (1) {
for (i = 0; for (i = 0;
i < spawn_cap && !pp.shutdown && i < spawn_cap && !pp.shutdown &&
@ -1806,7 +1806,7 @@ void run_processes_parallel(size_t n,
} }
if (!pp.nr_processes) if (!pp.nr_processes)
break; break;
if (ungroup) { if (opts->ungroup) {
for (size_t i = 0; i < pp.max_processes; i++) for (size_t i = 0; i < pp.max_processes; i++)
pp.children[i].state = GIT_CP_WAIT_CLEANUP; pp.children[i].state = GIT_CP_WAIT_CLEANUP;
} else { } else {
@ -1822,19 +1822,27 @@ void run_processes_parallel(size_t n,
} }
pp_cleanup(&pp); pp_cleanup(&pp);
if (do_trace2)
trace2_region_leave(tr2_category, tr2_label, NULL);
} }
void run_processes_parallel_tr2(size_t n, get_next_task_fn get_next_task, void run_processes_parallel_tr2(size_t processes, get_next_task_fn get_next_task,
start_failure_fn start_failure, start_failure_fn start_failure,
task_finished_fn task_finished, void *pp_cb, task_finished_fn task_finished, void *pp_cb,
const char *tr2_category, const char *tr2_label) const char *tr2_category, const char *tr2_label)
{ {
trace2_region_enter_printf(tr2_category, tr2_label, NULL, "max:%d", n); const struct run_process_parallel_opts opts = {
.tr2_category = tr2_category,
.tr2_label = tr2_label,
.processes = processes,
run_processes_parallel(n, get_next_task, start_failure, .get_next_task = get_next_task,
task_finished, pp_cb); .start_failure = start_failure,
.task_finished = task_finished,
};
trace2_region_leave(tr2_category, tr2_label, NULL); run_processes_parallel(&opts);
} }
int run_auto_maintenance(int quiet) int run_auto_maintenance(int quiet)

View File

@ -459,17 +459,64 @@ typedef int (*task_finished_fn)(int result,
void *pp_task_cb); void *pp_task_cb);
/** /**
* Runs up to n processes at the same time. Whenever a process can be * Option used by run_processes_parallel(), { 0 }-initialized means no
* started, the callback get_next_task_fn is called to obtain the data * options.
*/
struct run_process_parallel_opts
{
/**
* tr2_category & tr2_label: sets the trace2 category and label for
* logging. These must either be unset, or both of them must be set.
*/
const char *tr2_category;
const char *tr2_label;
/**
* processes: see 'processes' in run_processes_parallel() below.
*/
size_t processes;
/**
* ungroup: see 'ungroup' in run_processes_parallel() below.
*/
unsigned int ungroup:1;
/**
* get_next_task: See get_next_task_fn() above. This must be
* specified.
*/
get_next_task_fn get_next_task;
/**
* start_failure: See start_failure_fn() above. This can be
* NULL to omit any special handling.
*/
start_failure_fn start_failure;
/**
* task_finished: See task_finished_fn() above. This can be
* NULL to omit any special handling.
*/
task_finished_fn task_finished;
/**
* data: user data, will be passed as "pp_cb" to the callback
* parameters.
*/
void *data;
};
/**
* Options are passed via the "struct run_process_parallel_opts" above.
*
* Runs N 'processes' at the same time. Whenever a process can be
* started, the callback opts.get_next_task is called to obtain the data
* required to start another child process. * required to start another child process.
* *
* The children started via this function run in parallel. Their output * The children started via this function run in parallel. Their output
* (both stdout and stderr) is routed to stderr in a manner that output * (both stdout and stderr) is routed to stderr in a manner that output
* from different tasks does not interleave (but see "ungroup" below). * from different tasks does not interleave (but see "ungroup" below).
* *
* start_failure_fn and task_finished_fn can be NULL to omit any
* special handling.
*
* If the "ungroup" option isn't specified, the API will set the * If the "ungroup" option isn't specified, the API will set the
* "stdout_to_stderr" parameter in "struct child_process" and provide * "stdout_to_stderr" parameter in "struct child_process" and provide
* the callbacks with a "struct strbuf *out" parameter to write output * the callbacks with a "struct strbuf *out" parameter to write output
@ -479,19 +526,10 @@ typedef int (*task_finished_fn)(int result,
* NULL "struct strbuf *out" parameter, and are responsible for * NULL "struct strbuf *out" parameter, and are responsible for
* emitting their own output, including dealing with any race * emitting their own output, including dealing with any race
* conditions due to writing in parallel to stdout and stderr. * conditions due to writing in parallel to stdout and stderr.
* The "ungroup" option can be enabled by setting the global
* "run_processes_parallel_ungroup" to "1" before invoking
* run_processes_parallel(), it will be set back to "0" as soon as the
* API reads that setting.
*/ */
extern int run_processes_parallel_ungroup; void run_processes_parallel(const struct run_process_parallel_opts *opts);
void run_processes_parallel(size_t n, void run_processes_parallel_tr2(size_t processes, get_next_task_fn,
get_next_task_fn, start_failure_fn, task_finished_fn, void *pp_cb,
start_failure_fn,
task_finished_fn,
void *pp_cb);
void run_processes_parallel_tr2(size_t n, get_next_task_fn, start_failure_fn,
task_finished_fn, void *pp_cb,
const char *tr2_category, const char *tr2_label); const char *tr2_category, const char *tr2_label);
/** /**

View File

@ -136,7 +136,7 @@ static const char * const testsuite_usage[] = {
static int testsuite(int argc, const char **argv) static int testsuite(int argc, const char **argv)
{ {
struct testsuite suite = TESTSUITE_INIT; struct testsuite suite = TESTSUITE_INIT;
int max_jobs = 1, i, ret; int max_jobs = 1, i, ret = 0;
DIR *dir; DIR *dir;
struct dirent *d; struct dirent *d;
struct option options[] = { struct option options[] = {
@ -152,6 +152,12 @@ static int testsuite(int argc, const char **argv)
"write JUnit-style XML files"), "write JUnit-style XML files"),
OPT_END() OPT_END()
}; };
struct run_process_parallel_opts opts = {
.get_next_task = next_test,
.start_failure = test_failed,
.task_finished = test_finished,
.data = &suite,
};
argc = parse_options(argc, argv, NULL, options, argc = parse_options(argc, argv, NULL, options,
testsuite_usage, PARSE_OPT_STOP_AT_NON_OPTION); testsuite_usage, PARSE_OPT_STOP_AT_NON_OPTION);
@ -192,8 +198,8 @@ static int testsuite(int argc, const char **argv)
fprintf(stderr, "Running %"PRIuMAX" tests (%d at a time)\n", fprintf(stderr, "Running %"PRIuMAX" tests (%d at a time)\n",
(uintmax_t)suite.tests.nr, max_jobs); (uintmax_t)suite.tests.nr, max_jobs);
run_processes_parallel(max_jobs, next_test, test_failed, opts.processes = max_jobs;
test_finished, &suite); run_processes_parallel(&opts);
if (suite.failed.nr > 0) { if (suite.failed.nr > 0) {
ret = 1; ret = 1;
@ -206,7 +212,7 @@ static int testsuite(int argc, const char **argv)
string_list_clear(&suite.tests, 0); string_list_clear(&suite.tests, 0);
string_list_clear(&suite.failed, 0); string_list_clear(&suite.failed, 0);
return !!ret; return ret;
} }
static uint64_t my_random_next = 1234; static uint64_t my_random_next = 1234;
@ -382,6 +388,9 @@ int cmd__run_command(int argc, const char **argv)
struct child_process proc = CHILD_PROCESS_INIT; struct child_process proc = CHILD_PROCESS_INIT;
int jobs; int jobs;
int ret; int ret;
struct run_process_parallel_opts opts = {
.data = &proc,
};
if (argc > 1 && !strcmp(argv[1], "testsuite")) if (argc > 1 && !strcmp(argv[1], "testsuite"))
return testsuite(argc - 1, argv + 1); return testsuite(argc - 1, argv + 1);
@ -427,7 +436,7 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "--ungroup")) { if (!strcmp(argv[1], "--ungroup")) {
argv += 1; argv += 1;
argc -= 1; argc -= 1;
run_processes_parallel_ungroup = 1; opts.ungroup = 1;
} }
jobs = atoi(argv[2]); jobs = atoi(argv[2]);
@ -435,18 +444,20 @@ int cmd__run_command(int argc, const char **argv)
strvec_pushv(&proc.args, (const char **)argv + 3); strvec_pushv(&proc.args, (const char **)argv + 3);
if (!strcmp(argv[1], "run-command-parallel")) { if (!strcmp(argv[1], "run-command-parallel")) {
run_processes_parallel(jobs, parallel_next, NULL, NULL, &proc); opts.get_next_task = parallel_next;
} else if (!strcmp(argv[1], "run-command-abort")) { } else if (!strcmp(argv[1], "run-command-abort")) {
run_processes_parallel(jobs, parallel_next, NULL, opts.get_next_task = parallel_next;
task_finished, &proc); opts.task_finished = task_finished;
} else if (!strcmp(argv[1], "run-command-no-jobs")) { } else if (!strcmp(argv[1], "run-command-no-jobs")) {
run_processes_parallel(jobs, no_job, NULL, task_finished, opts.get_next_task = no_job;
&proc); opts.task_finished = task_finished;
} else { } else {
ret = 1; ret = 1;
fprintf(stderr, "check usage\n"); fprintf(stderr, "check usage\n");
goto cleanup; goto cleanup;
} }
opts.processes = jobs;
run_processes_parallel(&opts);
ret = 0; ret = 0;
cleanup: cleanup:
child_process_clear(&proc); child_process_clear(&proc);