Implement progress messages in the daemon and library.

This implements progress notification messages in the daemon, and
adds a callback in the library to handle them.

No calls are changed so far, so in fact no progress messages can
be generated by this commit.

For more details, see:
https://www.redhat.com/archives/libguestfs/2010-July/msg00003.html
https://www.redhat.com/archives/libguestfs/2010-July/msg00024.html
This commit is contained in:
Richard Jones
2010-08-28 10:33:24 +01:00
parent a8a44cecba
commit e776a46ffc
8 changed files with 226 additions and 10 deletions

View File

@@ -21,6 +21,7 @@
#include <stdio.h>
#include <stdarg.h>
#include <stdint.h>
#include <errno.h>
#include <unistd.h>
@@ -161,6 +162,12 @@ extern int send_file_end (int cancel);
/* only call this if there is a FileOut parameter */
extern void reply (xdrproc_t xdrp, char *ret);
/* Notify progress to caller. This function is self-rate-limiting so
* you can call it as often as necessary. Actions which call this
* should add 'Progress' note in generator.
*/
extern void notify_progress (uint64_t position, uint64_t total);
/* Helper for functions that need a root filesystem mounted.
* NB. Cannot be used for FileIn functions.
*/

View File

@@ -26,6 +26,7 @@
#include <errno.h>
#include <sys/param.h> /* defines MIN */
#include <sys/select.h>
#include <sys/time.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
@@ -43,6 +44,15 @@
int proc_nr;
int serial;
/* Time at which we received the current request. */
static struct timeval start_t;
/* Time at which the last progress notification was sent. */
static struct timeval last_progress_t;
/* Counts the number of progress notifications sent during this call. */
static int count_progress;
/* The daemon communications socket. */
static int sock;
@@ -54,8 +64,6 @@ main_loop (int _sock)
char lenbuf[4];
uint32_t len;
struct guestfs_message_header hdr;
struct timeval start_t, end_t;
int64_t start_us, end_us, elapsed_us;
sock = _sock;
@@ -112,9 +120,9 @@ main_loop (int _sock)
}
#endif
/* In verbose mode, display the time taken to run each command. */
if (verbose)
gettimeofday (&start_t, NULL);
gettimeofday (&start_t, NULL);
last_progress_t = start_t;
count_progress = 0;
/* Decode the message header. */
xdrmem_create (&xdr, buf, len, XDR_DECODE);
@@ -160,11 +168,14 @@ main_loop (int _sock)
/* In verbose mode, display the time taken to run each command. */
if (verbose) {
struct timeval end_t;
gettimeofday (&end_t, NULL);
int64_t start_us, end_us, elapsed_us;
start_us = (int64_t) start_t.tv_sec * 1000000 + start_t.tv_usec;
end_us = (int64_t) end_t.tv_sec * 1000000 + end_t.tv_usec;
elapsed_us = end_us - start_us;
fprintf (stderr, "proc %d (%s) took %d.%02d seconds\n",
proc_nr,
proc_nr >= 0 && proc_nr < GUESTFS_PROC_NR_PROCS
@@ -533,3 +544,78 @@ send_chunk (const guestfs_chunk *chunk)
return err;
}
/* Initial delay before sending notification messages, and
* the period at which we send them thereafter. These times
* are in microseconds.
*/
#define NOTIFICATION_INITIAL_DELAY 2000000
#define NOTIFICATION_PERIOD 333333
void
notify_progress (uint64_t position, uint64_t total)
{
struct timeval now_t;
gettimeofday (&now_t, NULL);
/* Always send a notification at 100%. This simplifies callers by
* allowing them to 'finish' the progress bar at 100% without
* needing special code.
*/
if (count_progress > 0 && position == total)
goto send;
/* Calculate time in microseconds since the last progress message
* was sent out (or since the start of the call).
*/
int64_t last_us, now_us, elapsed_us;
last_us =
(int64_t) last_progress_t.tv_sec * 1000000 + last_progress_t.tv_usec;
now_us = (int64_t) now_t.tv_sec * 1000000 + now_t.tv_usec;
elapsed_us = now_us - last_us;
/* Rate limit. */
if ((count_progress == 0 && elapsed_us < NOTIFICATION_INITIAL_DELAY) ||
(count_progress > 0 && elapsed_us < NOTIFICATION_PERIOD))
return;
send:
/* We're going to send a message now ... */
count_progress++;
last_progress_t = now_t;
/* Send the header word. */
XDR xdr;
char buf[128];
uint32_t i = GUESTFS_PROGRESS_FLAG;
size_t len;
xdrmem_create (&xdr, buf, 4, XDR_ENCODE);
xdr_u_int (&xdr, &i);
xdr_destroy (&xdr);
if (xwrite (sock, buf, 4) == -1) {
fprintf (stderr, "xwrite failed\n");
exit (EXIT_FAILURE);
}
guestfs_progress message = {
.proc = proc_nr,
.serial = serial,
.position = position,
.total = total,
};
xdrmem_create (&xdr, buf, sizeof buf, XDR_ENCODE);
if (!xdr_guestfs_progress (&xdr, &message)) {
fprintf (stderr, "xdr_guestfs_progress: failed to encode message\n");
xdr_destroy (&xdr);
return;
}
len = xdr_getpos (&xdr);
xdr_destroy (&xdr);
if (xwrite (sock, buf, len) == -1) {
fprintf (stderr, "xwrite failed\n");
exit (EXIT_FAILURE);
}
}

View File

@@ -6327,11 +6327,12 @@ and generate_xdr () =
*/
const GUESTFS_PROGRAM = 0x2000F5F5;
const GUESTFS_PROTOCOL_VERSION = 1;
const GUESTFS_PROTOCOL_VERSION = 2;
/* These constants must be larger than any possible message length. */
const GUESTFS_LAUNCH_FLAG = 0xf5f55ff5;
const GUESTFS_CANCEL_FLAG = 0xffffeeee;
const GUESTFS_PROGRESS_FLAG = 0xffff5555;
enum guestfs_message_direction {
GUESTFS_DIRECTION_CALL = 0, /* client -> daemon */
@@ -6370,6 +6371,23 @@ struct guestfs_chunk {
/* data size is 0 bytes if the transfer has finished successfully */
opaque data<GUESTFS_MAX_CHUNK_SIZE>;
};
/* Progress notifications. Daemon self-limits these messages to
* at most one per second. The daemon can send these messages
* at any time, and the caller should discard unexpected messages.
* 'position' and 'total' have undefined units; however they may
* have meaning for some calls.
*
* NB. guestfs___recv_from_daemon assumes the XDR-encoded
* structure is 24 bytes long.
*/
struct guestfs_progress {
guestfs_procedure proc; /* @0: GUESTFS_PROC_x */
unsigned serial; /* @4: message serial number */
unsigned hyper position; /* @8: 0 <= position <= total */
unsigned hyper total; /* @16: total size of operation */
/* @24: size of structure */
};
"
(* Generate the guestfs-structs.h file. *)
@@ -6869,6 +6887,7 @@ and generate_linker_script () =
"guestfs_set_launch_done_callback";
"guestfs_set_log_message_callback";
"guestfs_set_out_of_memory_handler";
"guestfs_set_progress_callback";
"guestfs_set_subprocess_quit_callback";
(* Unofficial parts of the API: the bindings code use these

View File

@@ -122,6 +122,8 @@ struct guestfs_h
void * launch_done_cb_data;
guestfs_close_cb close_cb;
void * close_cb_data;
guestfs_progress_cb progress_cb;
void * progress_cb_data;
int msg_next_serial;

View File

@@ -645,3 +645,11 @@ guestfs_set_close_callback (guestfs_h *g,
g->close_cb = cb;
g->close_cb_data = opaque;
}
void
guestfs_set_progress_callback (guestfs_h *g,
guestfs_progress_cb cb, void *opaque)
{
g->progress_cb = cb;
g->progress_cb_data = opaque;
}

View File

@@ -34,6 +34,8 @@
extern "C" {
#endif
#include <stdint.h>
typedef struct guestfs_h guestfs_h;
/*--- Connection management ---*/
@@ -57,14 +59,15 @@ typedef void (*guestfs_log_message_cb) (guestfs_h *g, void *data, char *buf, int
typedef void (*guestfs_subprocess_quit_cb) (guestfs_h *g, void *data);
typedef void (*guestfs_launch_done_cb) (guestfs_h *g, void *data);
typedef void (*guestfs_close_cb) (guestfs_h *g, void *data);
typedef void (*guestfs_progress_cb) (guestfs_h *g, void *data, int proc_nr, int serial, uint64_t position, uint64_t total);
extern void guestfs_set_log_message_callback (guestfs_h *g, guestfs_log_message_cb cb, void *opaque);
extern void guestfs_set_subprocess_quit_callback (guestfs_h *g, guestfs_subprocess_quit_cb cb, void *opaque);
extern void guestfs_set_launch_done_callback (guestfs_h *g, guestfs_launch_done_cb cb, void *opaque);
extern void guestfs_set_close_callback (guestfs_h *g, guestfs_close_cb cb, void *opaque);
extern void guestfs_set_progress_callback (guestfs_h *g, guestfs_progress_cb cb, void *opaque);
/*--- Structures and actions ---*/
#include <stdint.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
#include <guestfs-structs.h>

View File

@@ -1186,6 +1186,56 @@ languages (eg. if your HLL interpreter has already been cleaned
up by the time this is called, and if your callback then jumps
into some HLL function).
=head2 guestfs_set_progress_callback
typedef void (*guestfs_progress_cb) (guestfs_h *g, void *opaque,
int proc_nr, int serial,
uint64_t position, uint64_t total);
void guestfs_set_progress_callback (guestfs_h *g,
guestfs_progress_cb cb,
void *opaque);
Some long-running operations can generate progress messages. If
this callback is registered, then it will be called each time a
progress message is generated (usually two seconds after the
operation started, and three times per second thereafter until
it completes, although the frequency may change in future versions).
The callback receives two numbers: C<position> and C<total>.
The units of C<total> are not defined, although for some
operations C<total> may relate in some way to the amount of
data to be transferred (eg. in bytes or megabytes), and
C<position> may be the portion which has been transferred.
The only defined and stable parts of the API are:
=over 4
=item *
The callback can display to the user some type of progress bar or
indicator which shows the ratio of C<position>:C<total>.
=item *
0 E<lt>= C<position> E<lt>= C<total>
=item *
If any progress notification is sent during a call, then a final
progress notification is always sent when C<position> = C<total>.
This is to simplify caller code, so callers can easily set the
progress indicator to "100%" at the end of the operation, without
requiring special code to detect this case.
=back
The callback also receives the procedure number and serial number of
the call. These are only useful for debugging protocol issues, and
the callback can normally ignore them. The callback may want to
print these numbers in error messages or debugging messages.
=head1 BLOCK DEVICE NAMING
In the kernel there is now quite a profusion of schemata for naming

View File

@@ -373,7 +373,15 @@ guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
*
* It also checks for EOF (qemu died) and passes that up through the
* child_cleanup function above.
*
* Progress notifications are handled transparently by this function.
* If the callback exists, it is called. The caller of this function
* will not see GUESTFS_PROGRESS_FLAG.
*/
/* Size of guestfs_progress message on the wire. */
#define PROGRESS_MESSAGE_SIZE 24
int
guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
{
@@ -400,7 +408,13 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
*/
ssize_t nr = -4;
while (nr < (ssize_t) *size_rtn) {
for (;;) {
ssize_t message_size =
*size_rtn != GUESTFS_PROGRESS_FLAG ?
*size_rtn : PROGRESS_MESSAGE_SIZE;
if (nr >= message_size)
break;
rset2 = rset;
int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
if (r == -1) {
@@ -450,6 +464,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
xdr_uint32_t (&xdr, size_rtn);
xdr_destroy (&xdr);
/* *size_rtn changed, recalculate message_size */
message_size =
*size_rtn != GUESTFS_PROGRESS_FLAG ?
*size_rtn : PROGRESS_MESSAGE_SIZE;
if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
if (g->state != LAUNCHING)
error (g, _("received magic signature from guestfsd, but in state %d"),
@@ -463,6 +482,8 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
}
else if (*size_rtn == GUESTFS_CANCEL_FLAG)
return 0;
else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
/*FALLTHROUGH*/;
/* If this happens, it's pretty bad and we've probably lost
* synchronization.
*/
@@ -473,11 +494,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
}
/* Allocate the complete buffer, size now known. */
*buf_rtn = safe_malloc (g, *size_rtn);
*buf_rtn = safe_malloc (g, message_size);
/*FALLTHROUGH*/
}
size_t sizetoread = *size_rtn - nr;
size_t sizetoread = message_size - nr;
if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread);
@@ -524,6 +545,26 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
}
#endif
if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
if (g->state == BUSY && g->progress_cb) {
guestfs_progress message;
XDR xdr;
xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
xdr_guestfs_progress (&xdr, &message);
xdr_destroy (&xdr);
g->progress_cb (g, g->progress_cb_data,
message.proc, message.serial,
message.position, message.total);
}
free (*buf_rtn);
*buf_rtn = NULL;
/* Process next message. */
return guestfs___recv_from_daemon (g, size_rtn, buf_rtn);
}
return 0;
}