parallel: Propagate errors from worker threads and exit with error code.

In virt-df and virt-alignment-scan, ensure that errors that happen in
worker threads are propagated all the way up and result in
exit(EXIT_FAILURE).

Note that this makes the align/test-virt-alignment-scan-guests.sh test
fail (for a genuine reason).  This is fixed in the following commit.

This updates commit 8b90f55dc7.
This commit is contained in:
Richard W.M. Jones
2013-03-30 12:52:12 +00:00
parent 30702c7f54
commit 429ffda329
6 changed files with 87 additions and 41 deletions

View File

@@ -50,7 +50,7 @@ static pthread_mutex_t worst_alignment_mutex = PTHREAD_MUTEX_INITIALIZER;
static int scan (guestfs_h *g, const char *prefix, FILE *fp);
#ifdef HAVE_LIBVIRT
static void scan_work (guestfs_h *g, size_t i, FILE *fp);
static int scan_work (guestfs_h *g, size_t i, FILE *fp);
#endif
/* These globals are shared with options.c. */
@@ -131,6 +131,7 @@ main (int argc, char *argv[])
int option_index;
int exit_code;
size_t max_threads = 0;
int r;
g = guestfs_create ();
if (g == NULL) {
@@ -226,16 +227,16 @@ main (int argc, char *argv[])
if (drvs == NULL) {
#if defined(HAVE_LIBVIRT) && defined(HAVE_LIBXML2)
get_all_libvirt_domains (libvirt_uri);
start_threads (max_threads, g, scan_work);
r = start_threads (max_threads, g, scan_work);
free_domains ();
if (r == -1)
exit (EXIT_FAILURE);
#else
fprintf (stderr, _("%s: compiled without support for libvirt and/or libxml2.\n"),
program_name);
exit (EXIT_FAILURE);
#endif
} else { /* Single guest. */
int r;
if (uuid) {
fprintf (stderr, _("%s: --uuid option cannot be used with -a or -d\n"),
program_name);
@@ -353,7 +354,7 @@ scan (guestfs_h *g, const char *prefix, FILE *fp)
* in "parallel.c".
*/
static void
static int
scan_work (guestfs_h *g, size_t i, FILE *fp)
{
struct guestfs___add_libvirt_dom_argv optargs;
@@ -365,12 +366,12 @@ scan_work (guestfs_h *g, size_t i, FILE *fp)
optargs.readonlydisk = "read";
if (guestfs___add_libvirt_dom (g, domains[i].dom, &optargs) == -1)
return;
return -1;
if (guestfs_launch (g) == -1)
return;
return -1;
(void) scan (g, !uuid ? domains[i].name : domains[i].uuid, fp);
return scan (g, !uuid ? domains[i].name : domains[i].uuid, fp);
}
#endif /* HAVE_LIBVIRT */

View File

@@ -92,7 +92,7 @@ df_on_handle (guestfs_h *g, const char *name, const char *uuid, FILE *fp)
* in "parallel.c".
*/
void
int
df_work (guestfs_h *g, size_t i, FILE *fp)
{
struct guestfs___add_libvirt_dom_argv optargs;
@@ -103,13 +103,14 @@ df_work (guestfs_h *g, size_t i, FILE *fp)
optargs.readonly = 1;
optargs.readonlydisk = "read";
/* Traditionally we have ignored errors from adding disks in virt-df. */
if (guestfs___add_libvirt_dom (g, domains[i].dom, &optargs) == -1)
return;
return 0;
if (guestfs_launch (g) == -1)
return;
return -1;
(void) df_on_handle (g, domains[i].name, domains[i].uuid, fp);
return df_on_handle (g, domains[i].name, domains[i].uuid, fp);
}
#endif /* HAVE_LIBVIRT */

View File

@@ -130,6 +130,7 @@ main (int argc, char *argv[])
int c;
int option_index;
size_t max_threads = 0;
int err;
g = guestfs_create ();
if (g == NULL) {
@@ -274,7 +275,7 @@ main (int argc, char *argv[])
#if defined(HAVE_LIBVIRT)
get_all_libvirt_domains (libvirt_uri);
print_title ();
start_threads (max_threads, g, df_work);
err = start_threads (max_threads, g, df_work);
free_domains ();
#else
fprintf (stderr, _("%s: compiled without support for libvirt.\n"),
@@ -302,7 +303,7 @@ main (int argc, char *argv[])
* guestfs_add_domain so the UUID is not available easily for
* single '-d' command-line options.
*/
(void) df_on_handle (g, name, NULL, stdout);
err = df_on_handle (g, name, NULL, stdout);
/* Free up data structures, no longer needed after this point. */
free_drives (drvs);
@@ -310,7 +311,7 @@ main (int argc, char *argv[])
guestfs_close (g);
exit (EXIT_SUCCESS);
exit (err == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
/* Generate a display name for the single guest mode. See comments in

View File

@@ -68,30 +68,25 @@ static size_t next_domain_to_retire = 0;
static pthread_mutex_t retire_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t retire_cond = PTHREAD_COND_INITIALIZER;
static void thread_failure (const char *fn, int err) __attribute__((noreturn));
static void thread_failure (const char *fn, int err);
static void *worker_thread (void *arg);
struct thread_data {
int trace, verbose; /* Flags from the options_handle. */
work_fn work;
int r; /* Used to store the error status. */
};
/* Start threads. */
void
int
start_threads (size_t option_P, guestfs_h *options_handle, work_fn work)
{
struct thread_data thread_data = { .trace = 0, .verbose = 0, .work = work };
size_t i, nr_threads;
int err;
int err, errors;
void *status;
if (nr_domains == 0) /* Nothing to do. */
return;
if (options_handle) {
thread_data.trace = guestfs_get_trace (options_handle);
thread_data.verbose = guestfs_get_verbose (options_handle);
}
return 0;
/* If the user selected the -P option, then we use up to that many threads. */
if (option_P > 0)
@@ -99,21 +94,41 @@ start_threads (size_t option_P, guestfs_h *options_handle, work_fn work)
else
nr_threads = MIN (nr_domains, MIN (MAX_THREADS, estimate_max_threads ()));
struct thread_data thread_data[nr_threads];
pthread_t threads[nr_threads];
for (i = 0; i < nr_threads; ++i) {
if (options_handle) {
thread_data[i].trace = guestfs_get_trace (options_handle);
thread_data[i].verbose = guestfs_get_verbose (options_handle);
}
else {
thread_data[i].trace = 0;
thread_data[i].verbose = 0;
}
thread_data[i].work = work;
}
/* Start the worker threads. */
for (i = 0; i < nr_threads; ++i) {
err = pthread_create (&threads[i], NULL, worker_thread, &thread_data);
err = pthread_create (&threads[i], NULL, worker_thread, &thread_data[i]);
if (err != 0)
error (EXIT_FAILURE, err, "pthread_create [%zu]", i);
}
/* Wait for the threads to exit. */
errors = 0;
for (i = 0; i < nr_threads; ++i) {
err = pthread_join (threads[i], &status);
if (err != 0)
error (EXIT_FAILURE, err, "pthread_join [%zu]", i);
if (err != 0) {
error (0, err, "pthread_join [%zu]", i);
errors++;
}
if (*(int *)status == -1)
errors++;
}
return errors == 0 ? 0 : -1;
}
/* Worker thread. */
@@ -122,6 +137,8 @@ worker_thread (void *thread_data_vp)
{
struct thread_data *thread_data = thread_data_vp;
thread_data->r = 0;
while (1) {
size_t i; /* The current domain we're working on. */
FILE *fp;
@@ -132,10 +149,18 @@ worker_thread (void *thread_data_vp)
/* Take the next domain from the list. */
err = pthread_mutex_lock (&take_mutex);
if (err != 0) thread_failure ("pthread_mutex_lock", err);
if (err != 0) {
thread_failure ("pthread_mutex_lock", err);
thread_data->r = -1;
return &thread_data->r;
}
i = next_domain_to_take++;
err = pthread_mutex_unlock (&take_mutex);
if (err != 0) thread_failure ("pthread_mutex_unlock", err);
if (err != 0) {
thread_failure ("pthread_mutex_unlock", err);
thread_data->r = -1;
return &thread_data->r;
}
if (i >= nr_domains) /* Work finished. */
break;
@@ -146,14 +171,16 @@ worker_thread (void *thread_data_vp)
fp = open_memstream (&output, &output_len);
if (fp == NULL) {
perror ("open_memstream");
_exit (EXIT_FAILURE);
thread_data->r = -1;
return &thread_data->r;
}
/* Create a guestfs handle. */
g = guestfs_create ();
if (g == NULL) {
perror ("guestfs_create");
_exit (EXIT_FAILURE);
thread_data->r = -1;
return &thread_data->r;
}
/* Copy some settings from the options guestfs handle. */
@@ -161,7 +188,8 @@ worker_thread (void *thread_data_vp)
guestfs_set_verbose (g, thread_data->verbose);
/* Do work. */
thread_data->work (g, i, fp);
if (thread_data->work (g, i, fp) == -1)
thread_data->r = -1;
fclose (fp);
guestfs_close (g);
@@ -170,10 +198,18 @@ worker_thread (void *thread_data_vp)
* may mean waiting for another thread to finish here.
*/
err = pthread_mutex_lock (&retire_mutex);
if (err != 0) thread_failure ("pthread_mutex_lock", err);
if (err != 0) {
thread_failure ("pthread_mutex_lock", err);
thread_data->r = -1;
return &thread_data->r;
}
while (next_domain_to_retire != i) {
err = pthread_cond_wait (&retire_cond, &retire_mutex);
if (err != 0) thread_failure ("pthread_cond_wait", err);
if (err != 0) {
thread_failure ("pthread_cond_wait", err);
thread_data->r = -1;
return &thread_data->r;
}
}
if (DEBUG_PARALLEL)
@@ -186,20 +222,23 @@ worker_thread (void *thread_data_vp)
next_domain_to_retire = i+1;
pthread_cond_broadcast (&retire_cond);
err = pthread_mutex_unlock (&retire_mutex);
if (err != 0) thread_failure ("pthread_mutex_unlock", err);
if (err != 0) {
thread_failure ("pthread_mutex_unlock", err);
thread_data->r = -1;
return &thread_data->r;
}
}
if (DEBUG_PARALLEL)
printf ("thread exiting\n");
return NULL;
return &thread_data->r;
}
static void
thread_failure (const char *fn, int err)
{
fprintf (stderr, "%s: %s: %s\n", program_name, fn, strerror (err));
_exit (EXIT_FAILURE);
}
#endif /* HAVE_LIBVIRT */

View File

@@ -27,8 +27,9 @@
* on domain index 'i'. However it MUST NOT print out any result
* directly. Instead it prints anything it needs to the supplied
* 'FILE *'.
* Returns 0 on success or -1 on error.
*/
typedef void (*work_fn) (guestfs_h *g, size_t i, FILE *fp);
typedef int (*work_fn) (guestfs_h *g, size_t i, FILE *fp);
/* Run the threads and work through the global list of libvirt
* domains. 'option_P' is whatever the user passed in the '-P'
@@ -36,8 +37,11 @@ typedef void (*work_fn) (guestfs_h *g, size_t i, FILE *fp);
* the number of threads is chosen heuristically. 'options_handle'
* (which may be NULL) is the global guestfs handle created by the
* options mini-library.
*
* Returns 0 if all work items completed successfully, or -1 if there
* was an error.
*/
extern void start_threads (size_t option_P, guestfs_h *options_handle, work_fn work);
extern int start_threads (size_t option_P, guestfs_h *options_handle, work_fn work);
#endif /* HAVE_LIBVIRT */

View File

@@ -27,7 +27,7 @@ extern int uuid; /* --uuid */
/* df.c */
extern int df_on_handle (guestfs_h *g, const char *name, const char *uuid, FILE *fp);
#if defined(HAVE_LIBVIRT)
extern void df_work (guestfs_h *g, size_t i, FILE *fp);
extern int df_work (guestfs_h *g, size_t i, FILE *fp);
#endif
/* output.c */