[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v5 4/5] migration: Introduce 'qatzip' compression method
From: |
Fabiano Rosas |
Subject: |
Re: [PATCH v5 4/5] migration: Introduce 'qatzip' compression method |
Date: |
Fri, 12 Jul 2024 11:17:29 -0300 |
Yichen Wang <yichen.wang@bytedance.com> writes:
> From: Bryan Zhang <bryan.zhang@bytedance.com>
>
> Adds support for 'qatzip' as an option for the multifd compression
> method parameter, and implements using QAT for 'qatzip' compression and
> decompression.
>
> Signed-off-by: Bryan Zhang <bryan.zhang@bytedance.com>
> Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> ---
> hw/core/qdev-properties-system.c | 6 +-
> migration/meson.build | 1 +
> migration/multifd-qatzip.c | 403 +++++++++++++++++++++++++++++++
> migration/multifd.h | 5 +-
> qapi/migration.json | 3 +
> tests/qtest/meson.build | 4 +
> 6 files changed, 419 insertions(+), 3 deletions(-)
> create mode 100644 migration/multifd-qatzip.c
>
> diff --git a/hw/core/qdev-properties-system.c
> b/hw/core/qdev-properties-system.c
> index f13350b4fb..eb50d6ec5b 100644
> --- a/hw/core/qdev-properties-system.c
> +++ b/hw/core/qdev-properties-system.c
> @@ -659,7 +659,11 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
> const PropertyInfo qdev_prop_multifd_compression = {
> .name = "MultiFDCompression",
> .description = "multifd_compression values, "
> - "none/zlib/zstd/qpl/uadk",
> + "none/zlib/zstd/qpl/uadk"
> +#ifdef CONFIG_QATZIP
> + "/qatzip"
> +#endif
It seems the other accelerators don't need the ifdef. What's different
here?
> + ,
> .enum_table = &MultiFDCompression_lookup,
> .get = qdev_propinfo_get_enum,
> .set = qdev_propinfo_set_enum,
> diff --git a/migration/meson.build b/migration/meson.build
> index 5ce2acb41e..c9454c26ae 100644
> --- a/migration/meson.build
> +++ b/migration/meson.build
> @@ -41,6 +41,7 @@ system_ss.add(when: rdma, if_true: files('rdma.c'))
> system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
> system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
> system_ss.add(when: uadk, if_true: files('multifd-uadk.c'))
> +system_ss.add(when: qatzip, if_true: files('multifd-qatzip.c'))
>
> specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
> if_true: files('ram.c',
> diff --git a/migration/multifd-qatzip.c b/migration/multifd-qatzip.c
> new file mode 100644
> index 0000000000..d01d51de8f
> --- /dev/null
> +++ b/migration/multifd-qatzip.c
> @@ -0,0 +1,403 @@
> +/*
> + * Multifd QATzip compression implementation
> + *
> + * Copyright (c) Bytedance
> + *
> + * Authors:
> + * Bryan Zhang <bryan.zhang@bytedance.com>
> + * Hao Xiang <hao.xiang@bytedance.com>
> + * Yichen Wang <yichen.wang@bytedance.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "exec/ramblock.h"
> +#include "qapi/error.h"
> +#include "qemu/error-report.h"
> +#include "qapi/qapi-types-migration.h"
> +#include "options.h"
> +#include "multifd.h"
> +#include <qatzip.h>
> +
> +typedef struct {
> + /*
> + * Unique session for use with QATzip API
> + */
> + QzSession_T sess;
> +
> + /*
> + * For compression: Buffer for pages to compress
> + * For decompression: Buffer for data to decompress
> + */
> + uint8_t *in_buf;
> + uint32_t in_len;
> +
> + /*
> + * For compression: Output buffer of compressed data
> + * For decompression: Output buffer of decompressed data
> + */
> + uint8_t *out_buf;
> + uint32_t out_len;
> +} QatzipData;
> +
> +/**
> + * qatzip_send_setup: Set up QATzip session and private buffers.
> + *
> + * @param p Multifd channel params
> + * @param errp Pointer to error, which will be set in case of error
> + * @return 0 on success, -1 on error (and *errp will be set)
> + */
> +static int qatzip_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> + QatzipData *q;
> + QzSessionParamsDeflate_T params;
> + const char *err_msg;
> + int ret;
> +
> + q = g_new0(QatzipData, 1);
> + p->compress_data = q;
> + /* We need one extra place for the packet header */
> + p->iov = g_new0(struct iovec, 2);
> +
> + /* Prefer without sw_fallback because of bad performance with
> sw_fallback.
> + * Warn if sw_fallback needs to be used. */
Please run scripts/checkpatch.pl on your series. This style of comments
should have been flagged as non-conformant with our guidelines.
> + ret = qzInit(&q->sess, false);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + /* Warn, and try with sw_fallback. */
> + warn_report("Initilizing QAT with sw_fallback...");
This will warn for each multifd channel, maybe use warn_report_once
instead. Also s/Initilizing/Initializing/ and let's spell out "software
fallback".
> + ret = qzInit(&q->sess, true);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + /* Warn, and try with sw_fallback. */
> + err_msg = "qzInit failed";
> + goto err_free_q;
> + }
> + }
> +
> + ret = qzGetDefaultsDeflate(¶ms);
> + if (ret != QZ_OK) {
> + err_msg = "qzGetDefaultsDeflate failed";
> + goto err_close;
> + }
> +
> + /* Make sure to use configured QATzip compression level. */
> + params.common_params.comp_lvl = migrate_multifd_qatzip_level();
> +
> + ret = qzSetupSessionDeflate(&q->sess, ¶ms);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + err_msg = "qzSetupSessionDeflate failed";
> + goto err_close;
> + }
> +
> + if (MULTIFD_PACKET_SIZE > UINT32_MAX) {
> + err_msg = "packet size too large for QAT";
> + goto err_close;
> + }
> +
> + q->in_len = MULTIFD_PACKET_SIZE;
> + q->in_buf = qzMalloc(q->in_len, 0, PINNED_MEM);
> + if (!q->in_buf) {
> + err_msg = "qzMalloc failed";
> + goto err_close;
> + }
> +
> + q->out_len = qzMaxCompressedLength(MULTIFD_PACKET_SIZE, &q->sess);
> + q->out_buf = qzMalloc(q->out_len, 0, PINNED_MEM);
> + if (!q->out_buf) {
> + err_msg = "qzMalloc failed";
> + goto err_free_inbuf;
> + }
> +
> + return 0;
> +
> +err_free_inbuf:
> + qzFree(q->in_buf);
> +err_close:
> + qzClose(&q->sess);
> +err_free_q:
> + g_free(q);
> + g_free(p->iov);
> + p->iov = NULL;
> + p->compress_data = NULL;
> + error_setg(errp, "multifd %u: %s", p->id, err_msg);
> + return -1;
> +}
> +
> +/**
> + * qatzip_send_cleanup: Tear down QATzip session and release private buffers.
> + *
> + * @param p Multifd channel params
> + * @param errp Pointer to error, which will be set in case of error
> + * @return None
> + */
> +static void qatzip_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> + QatzipData *q = p->compress_data;
> + const char *err_msg;
> + int ret;
> +
> + ret = qzTeardownSession(&q->sess);
> + if (ret != QZ_OK) {
> + err_msg = "qzTeardownSession failed";
> + goto err;
> + }
> +
> + ret = qzClose(&q->sess);
> + if (ret != QZ_OK) {
> + err_msg = "qzClose failed";
> + goto err;
> + }
Can qzClose() be called twice on the same session pointer? It's possible
that we have already failed at multifd_send_setup() and still reach
here.
And what about qzTeardownSession()? Can it cope with an already closed
session?
And what about the sessions that never got created because we might have
exited early at the ops->send_setup() loop?
> +
> + qzFree(q->in_buf);
> + q->in_buf = NULL;
> + qzFree(q->out_buf);
> + q->out_buf = NULL;
These will double free here if send_setup has already freed.
> + g_free(p->iov);
> + p->iov = NULL;
> + g_free(p->compress_data);
> + p->compress_data = NULL;
> + return;
> +
> +err:
> + error_setg(errp, "multifd %u: %s", p->id, err_msg);
> +}
> +
> +/**
> + * qatzip_send_prepare: Compress pages and update IO channel info.
> + *
> + * @param p Multifd channel params
> + * @param errp Pointer to error, which will be set in case of error
> + * @return 0 on success, -1 on error (and *errp will be set)
> + */
> +static int qatzip_send_prepare(MultiFDSendParams *p, Error **errp)
> +{
> + MultiFDPages_t *pages = p->pages;
> + QatzipData *q = p->compress_data;
> + int ret;
> + unsigned int in_len, out_len;
> +
> + if (!multifd_send_prepare_common(p)) {
> + goto out;
> + }
> +
> + /* Unlike other multifd compression implementations, we use a
> + * non-streaming API and place all the data into one buffer, rather than
> + * sending each page to the compression API at a time. */
> + for (int i = 0; i < pages->normal_num; i++) {
> + memcpy(q->in_buf + (i * p->page_size),
> + p->pages->block->host + pages->offset[i],
pages->block->host
> + p->page_size);
> + }
> +
> + in_len = pages->normal_num * p->page_size;
> + if (in_len > q->in_len) {
> + error_setg(errp, "multifd %u: unexpectedly large input", p->id);
> + return -1;
> + }
> + out_len = q->out_len;
> +
> + /*
> + * Unlike other multifd compression implementations, we use a
> non-streaming
> + * API and place all the data into one buffer, rather than sending each
> page
> + * to the compression API at a time. Based on initial benchmarks, the
> + * non-streaming API outperforms the streaming API. Plus, the logic in
> QEMU
> + * is friendly to using the non-streaming API anyway. If either of these
> + * statements becomes no longer true, we can revisit adding a streaming
> + * implementation.
> + */
> + ret = qzCompress(&q->sess, q->in_buf, &in_len, q->out_buf, &out_len, 1);
> + if (ret != QZ_OK) {
> + error_setg(errp, "multifd %u: QATzip returned %d instead of QZ_OK",
> + p->id, ret);
> + return -1;
> + }
> + if (in_len != pages->normal_num * p->page_size) {
> + error_setg(errp, "multifd %u: QATzip failed to compress all input",
> + p->id);
> + return -1;
> + }
> +
> + p->iov[p->iovs_num].iov_base = q->out_buf;
> + p->iov[p->iovs_num].iov_len = out_len;
> + p->iovs_num++;
> + p->next_packet_size = out_len;
> +
> +out:
> + p->flags |= MULTIFD_FLAG_QATZIP;
> + multifd_send_fill_packet(p);
> + return 0;
> +}
> +
> +/**
> + * qatzip_recv_setup: Set up QATzip session and allocate private buffers.
> + *
> + * @param p Multifd channel params
> + * @param errp Pointer to error, which will be set in case of error
> + * @return 0 on success, -1 on error (and *errp will be set)
> + */
> +static int qatzip_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> + QatzipData *q;
> + QzSessionParamsDeflate_T params;
> + const char *err_msg;
> + int ret;
> +
> + q = g_new0(QatzipData, 1);
> + p->compress_data = q;
> +
> + /* Prefer without sw_fallback because of bad performance with
> sw_fallback.
> + * Warn if sw_fallback needs to be used. */
> + ret = qzInit(&q->sess, false);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + /* Warn, and try with sw_fallback. */
> + warn_report("Initilizing QAT with sw_fallback...");
Same here. Also please add a hint that this is recv and the other one is
send. It helps with debug.
> + ret = qzInit(&q->sess, true);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + /* Warn, and try with sw_fallback. */
> + err_msg = "qzInit failed";
> + goto err_free_q;
> + }
> + }
> +
> + ret = qzGetDefaultsDeflate(¶ms);
> + if (ret != QZ_OK) {
> + err_msg = "qzGetDefaultsDeflate failed";
> + goto err_close;
> + }
> +
> + ret = qzSetupSessionDeflate(&q->sess, ¶ms);
> + if (ret != QZ_OK && ret != QZ_DUPLICATE) {
> + err_msg = "qzSetupSessionDeflate failed";
> + goto err_close;
> + }
> +
> + /*
> + * Mimic multifd-zlib, which reserves extra space for the
> + * incoming packet.
I'd put the actual rationale here. It will also help in the future to
spot that this implementation doesn't send uncompressed pages in case
the compression got too big.
> + */
> + q->in_len = MULTIFD_PACKET_SIZE * 2;
> + /* PINNED_MEM is an enum from qatzip headers, which means to use
> + * kzalloc_node() to allocate memory for QAT DMA purposes. */
> + q->in_buf = qzMalloc(q->in_len, 0, PINNED_MEM);
> + if (!q->in_buf) {
> + err_msg = "qzMalloc failed";
> + goto err_close;
> + }
> +
> + q->out_len = MULTIFD_PACKET_SIZE;
> + q->out_buf = qzMalloc(q->out_len, 0, PINNED_MEM);
> + if (!q->out_buf) {
> + err_msg = "qzMalloc failed";
> + goto err_free_inbuf;
> + }
> +
> + return 0;
> +
> +err_free_inbuf:
> + qzFree(q->in_buf);
> +err_close:
> + qzClose(&q->sess);
> +err_free_q:
> + g_free(q);
> + p->compress_data = NULL;
> + error_setg(errp, "multifd %u: %s", p->id, err_msg);
Or maybe put the recv/send information on this string.
> + return -1;
> +}
> +
> +/**
> + * qatzip_recv_cleanup: Tear down QATzip session and release private buffers.
> + *
> + * @param p Multifd channel params
> + * @return None
> + */
> +static void qatzip_recv_cleanup(MultiFDRecvParams *p)
> +{
> + QatzipData *q = p->compress_data;
> +
> + /* Ignoring return values here due to function signature. */
> + qzTeardownSession(&q->sess);
> + qzClose(&q->sess);
> + qzFree(q->in_buf);
> + qzFree(q->out_buf);
> + g_free(p->compress_data);
> +}
> +
> +
> +/**
> + * qatzip_recv: Decompress pages and copy them to the appropriate
> + * locations.
> + *
> + * @param p Multifd channel params
> + * @param errp Pointer to error, which will be set in case of error
> + * @return 0 on success, -1 on error (and *errp will be set)
> + */
> +static int qatzip_recv(MultiFDRecvParams *p, Error **errp)
> +{
> + QatzipData *q = p->compress_data;
> + int ret;
> + unsigned int in_len, out_len;
> + uint32_t in_size = p->next_packet_size;
> + uint32_t expected_size = p->normal_num * p->page_size;
> + uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +
> + if (in_size > q->in_len) {
> + error_setg(errp, "multifd %u: received unexpectedly large packet",
> + p->id);
> + return -1;
> + }
> +
> + if (flags != MULTIFD_FLAG_QATZIP) {
> + error_setg(errp, "multifd %u: flags received %x flags expected %x",
> + p->id, flags, MULTIFD_FLAG_QATZIP);
> + return -1;
> + }
> +
> + multifd_recv_zero_page_process(p);
> + if (!p->normal_num) {
> + assert(in_size == 0);
> + return 0;
> + }
> +
> + ret = qio_channel_read_all(p->c, (void *)q->in_buf, in_size, errp);
> + if (ret != 0) {
> + return ret;
> + }
> +
> + in_len = in_size;
> + out_len = q->out_len;
> + ret = qzDecompress(&q->sess, q->in_buf, &in_len, q->out_buf, &out_len);
> + if (ret != QZ_OK) {
> + error_setg(errp, "multifd %u: qzDecompress failed", p->id);
> + return -1;
> + }
> + if (out_len != expected_size) {
> + error_setg(errp, "multifd %u: packet size received %u size expected
> %u",
> + p->id, out_len, expected_size);
> + return -1;
> + }
> +
> + /* Copy each page to its appropriate location. */
> + for (int i = 0; i < p->normal_num; i++) {
> + memcpy(p->host + p->normal[i],
> + q->out_buf + p->page_size * i,
> + p->page_size);
> + }
> + return 0;
> +}
> +
> +static MultiFDMethods multifd_qatzip_ops = {
> + .send_setup = qatzip_send_setup,
> + .send_cleanup = qatzip_send_cleanup,
> + .send_prepare = qatzip_send_prepare,
> + .recv_setup = qatzip_recv_setup,
> + .recv_cleanup = qatzip_recv_cleanup,
> + .recv = qatzip_recv
> +};
> +
> +static void multifd_qatzip_register(void)
> +{
> + multifd_register_ops(MULTIFD_COMPRESSION_QATZIP, &multifd_qatzip_ops);
> +}
> +
> +migration_init(multifd_qatzip_register);
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 0ecd6f47d7..adceb65050 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -34,14 +34,15 @@ MultiFDRecvData *multifd_get_recv_data(void);
> /* Multifd Compression flags */
> #define MULTIFD_FLAG_SYNC (1 << 0)
>
> -/* We reserve 4 bits for compression methods */
> -#define MULTIFD_FLAG_COMPRESSION_MASK (0xf << 1)
> +/* We reserve 5 bits for compression methods */
> +#define MULTIFD_FLAG_COMPRESSION_MASK (0x1f << 1)
> /* we need to be compatible. Before compression value was 0 */
> #define MULTIFD_FLAG_NOCOMP (0 << 1)
> #define MULTIFD_FLAG_ZLIB (1 << 1)
> #define MULTIFD_FLAG_ZSTD (2 << 1)
> #define MULTIFD_FLAG_QPL (4 << 1)
> #define MULTIFD_FLAG_UADK (8 << 1)
> +#define MULTIFD_FLAG_QATZIP (16 << 1)
>
> /* This value needs to be a multiple of qemu_target_page_size() */
> #define MULTIFD_PACKET_SIZE (512 * 1024)
> diff --git a/qapi/migration.json b/qapi/migration.json
> index cd08f2f710..42b5363449 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -558,6 +558,8 @@
> #
> # @zstd: use zstd compression method.
> #
> +# @qatzip: use qatzip compression method. (Since 9.1)
> +#
> # @qpl: use qpl compression method. Query Processing Library(qpl) is
> # based on the deflate compression algorithm and use the Intel
> # In-Memory Analytics Accelerator(IAA) accelerated compression
> @@ -570,6 +572,7 @@
> { 'enum': 'MultiFDCompression',
> 'data': [ 'none', 'zlib',
> { 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
> + { 'name': 'qatzip', 'if': 'CONFIG_QATZIP'},
> { 'name': 'qpl', 'if': 'CONFIG_QPL' },
> { 'name': 'uadk', 'if': 'CONFIG_UADK' } ] }
>
> diff --git a/tests/qtest/meson.build b/tests/qtest/meson.build
> index 6508bfb1a2..3068d73e08 100644
> --- a/tests/qtest/meson.build
> +++ b/tests/qtest/meson.build
> @@ -327,6 +327,10 @@ if gnutls.found()
> endif
> endif
>
> +if qatzip.found()
> + migration_files += [qatzip]
> +endif
> +
> qtests = {
> 'bios-tables-test': [io, 'boot-sector.c', 'acpi-utils.c', 'tpm-emu.c'],
> 'cdrom-test': files('boot-sector.c'),
- [PATCH v5 0/5] Implement QATzip compression method, Yichen Wang, 2024/07/10
- [PATCH v5 1/5] docs/migration: add qatzip compression feature, Yichen Wang, 2024/07/10
- [PATCH v5 3/5] migration: Add migration parameters for QATzip, Yichen Wang, 2024/07/10
- [PATCH v5 5/5] tests/migration: Add integration test for 'qatzip' compression method, Yichen Wang, 2024/07/10
- [PATCH v5 2/5] meson: Introduce 'qatzip' feature to the build system, Yichen Wang, 2024/07/10
- [PATCH v5 4/5] migration: Introduce 'qatzip' compression method, Yichen Wang, 2024/07/10
- Re: [PATCH v5 4/5] migration: Introduce 'qatzip' compression method,
Fabiano Rosas <=
- Re: [PATCH v5 0/5] Implement QATzip compression method, Peter Xu, 2024/07/11