Skip to content

Commit

Permalink
Add --blob-meta and --blob-cache arg use to generate raw blob cache a…
Browse files Browse the repository at this point in the history
…nd meta

generate blob cache and blob meta through the —blob-met and
—blob-cache parameters, so that nydusd can be started directly
from these two files without going to the backend to download.
this can improve the performance of data loading in localfs mode.

Signed-off-by: zyfjeff <[email protected]>
  • Loading branch information
zyfjeff committed Sep 27, 2023
1 parent b777564 commit 274cfba
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 14 deletions.
2 changes: 2 additions & 0 deletions builder/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,8 @@ impl BlobCompactor {
false,
Features::new(),
false,
None,
None,
);
let mut bootstrap_mgr =
BootstrapManager::new(Some(ArtifactStorage::SingleFile(d_bootstrap)), None);
Expand Down
18 changes: 14 additions & 4 deletions builder/src/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
// SPDX-License-Identifier: Apache-2.0

use std::borrow::Cow;
use std::io::Write;
use std::io::{Seek, Write};
use std::mem::size_of;
use std::slice;

use anyhow::{Context, Result};
use nydus_rafs::metadata::RAFS_MAX_CHUNK_SIZE;
use nydus_storage::device::BlobFeatures;
use nydus_storage::meta::{toc, BlobMetaChunkArray};
use nydus_storage::meta::{toc, BlobCompressionContextHeader, BlobMetaChunkArray};
use nydus_utils::digest::{self, DigestHasher, RafsDigest};
use nydus_utils::{compress, crypt};
use nydus_utils::{compress, crypt, try_round_up_4k};
use sha2::digest::Digest;

use super::layout::BlobLayout;
Expand Down Expand Up @@ -194,7 +195,6 @@ impl Blob {
} else if ctx.blob_tar_reader.is_some() {
header.set_separate_blob(true);
};

let mut compressor = Self::get_compression_algorithm_for_meta(ctx);
let (compressed_data, compressed) = compress::compress(ci_data, compressor)
.with_context(|| "failed to compress blob chunk info array".to_string())?;
Expand Down Expand Up @@ -223,6 +223,16 @@ impl Blob {
}

blob_ctx.blob_meta_header = header;
if let Some(meta_writer) = ctx.blob_meta_writer.as_ref() {
let mut meta = meta_writer.lock().unwrap();
let aligned_uncompressed_size = try_round_up_4k(uncompressed_size as u64).unwrap();
meta.set_len(
aligned_uncompressed_size + size_of::<BlobCompressionContextHeader>() as u64,
)?;
meta.write_all(ci_data)?;
meta.seek(std::io::SeekFrom::Start(aligned_uncompressed_size))?;
meta.write_all(header.as_bytes())?;
}
let encrypted_header =
crypt::encrypt_with_context(header.as_bytes(), cipher_obj, cipher_ctx, encrypt)?;
let header_size = encrypted_header.len();
Expand Down
16 changes: 15 additions & 1 deletion builder/src/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Write for ArtifactMemoryWriter {
}
}

struct ArtifactFileWriter(ArtifactWriter);
pub struct ArtifactFileWriter(pub ArtifactWriter);

impl RafsIoWrite for ArtifactFileWriter {
fn as_any(&self) -> &dyn Any {
Expand All @@ -215,6 +215,12 @@ impl RafsIoWrite for ArtifactFileWriter {
}
}

impl ArtifactFileWriter {
pub fn set_len(&mut self, s: u64) -> std::io::Result<()> {
self.0.file.get_mut().set_len(s)
}
}

impl Seek for ArtifactFileWriter {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
self.0.file.seek(pos)
Expand Down Expand Up @@ -1182,6 +1188,8 @@ pub struct BuildContext {

pub features: Features,
pub configuration: Arc<ConfigV2>,
pub blob_cache_writer: Option<Mutex<ArtifactFileWriter>>,
pub blob_meta_writer: Option<Mutex<ArtifactFileWriter>>,
}

impl BuildContext {
Expand All @@ -1201,6 +1209,8 @@ impl BuildContext {
blob_inline_meta: bool,
features: Features,
encrypt: bool,
blob_cache_writer: Option<Mutex<ArtifactFileWriter>>,
blob_meta_writer: Option<Mutex<ArtifactFileWriter>>,
) -> Self {
// It's a flag for images built with new nydus-image 2.2 and newer.
let mut blob_features = BlobFeatures::CAP_TAR_TOC;
Expand Down Expand Up @@ -1250,6 +1260,8 @@ impl BuildContext {

features,
configuration: Arc::new(ConfigV2::default()),
blob_cache_writer,
blob_meta_writer,
}
}

Expand Down Expand Up @@ -1299,6 +1311,8 @@ impl Default for BuildContext {
blob_inline_meta: false,
features: Features::new(),
configuration: Arc::new(ConfigV2::default()),
blob_cache_writer: None,
blob_meta_writer: None,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display, Formatter, Result as FmtResult};
use std::fs::{self, File};
use std::io::{Read, Write};
use std::io::{Read, Seek, Write};
use std::ops::Deref;
#[cfg(target_os = "linux")]
use std::os::linux::fs::MetadataExt;
Expand Down Expand Up @@ -462,6 +462,18 @@ impl Node {
chunk.set_compressed(is_compressed);
}

if let Some(writer) = ctx.blob_cache_writer.as_ref() {
let mut guard = writer.lock().unwrap();
let curr_pos = guard.seek(std::io::SeekFrom::End(0))?;
if curr_pos < chunk.uncompressed_offset() + aligned_d_size as u64 {
guard.set_len(chunk.uncompressed_offset() + aligned_d_size as u64)?;
}

guard.seek(std::io::SeekFrom::Start(chunk.uncompressed_offset()))?;
guard
.write_all(&chunk_data)
.context("failed to write blob cache")?;
}
event_tracer!("blob_uncompressed_size", +d_size);

Ok(chunk_info)
Expand Down
14 changes: 12 additions & 2 deletions builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
extern crate log;

use std::ffi::OsString;
use std::io::Write;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};

Expand All @@ -26,8 +27,8 @@ pub use self::compact::BlobCompactor;
pub use self::core::bootstrap::Bootstrap;
pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict};
pub use self::core::context::{
ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapContext, BootstrapManager,
BuildContext, BuildOutput, ConversionType,
ArtifactFileWriter, ArtifactStorage, ArtifactWriter, BlobContext, BlobManager,
BootstrapContext, BootstrapManager, BuildContext, BuildOutput, ConversionType,
};
pub use self::core::feature::{Feature, Features};
pub use self::core::node::{ChunkSource, NodeChunk};
Expand Down Expand Up @@ -237,6 +238,15 @@ fn finalize_blob(
// blob file.
if !is_tarfs {
blob_writer.finalize(Some(blob_meta_id))?;
if let Some(writer) = ctx.blob_cache_writer.as_ref() {
let mut guard = writer.lock().unwrap();
guard.flush()?;
}

if let Some(writer) = ctx.blob_meta_writer.as_ref() {
let mut guard = writer.lock().unwrap();
guard.flush()?;
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions builder/src/stargz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ mod tests {
false,
Features::new(),
false,
None,
None,
);
ctx.fs_version = RafsVersion::V6;
let mut bootstrap_mgr =
Expand Down
4 changes: 4 additions & 0 deletions builder/src/tarball.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ mod tests {
false,
Features::new(),
false,
None,
None,
);
let mut bootstrap_mgr =
BootstrapManager::new(Some(ArtifactStorage::FileDir(tmp_dir)), None);
Expand Down Expand Up @@ -721,6 +723,8 @@ mod tests {
false,
Features::new(),
true,
None,
None,
);
let mut bootstrap_mgr =
BootstrapManager::new(Some(ArtifactStorage::FileDir(tmp_dir)), None);
Expand Down
127 changes: 127 additions & 0 deletions smoke/tests/blobcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package tests

import (
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/containerd/containerd/log"
"github.com/dragonflyoss/image-service/smoke/tests/texture"
"github.com/dragonflyoss/image-service/smoke/tests/tool"
"github.com/dragonflyoss/image-service/smoke/tests/tool/test"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
)

type BlobCacheTestSuite struct {
T *testing.T
}

func (a *BlobCacheTestSuite) compare_two_files(t *testing.T, left, right string) {

leftHash := digest.Digest("")
lf, err := os.Open(left)
require.NoError(t, err)
defer lf.Close()
digester := digest.Canonical.Digester()
_, err = io.Copy(digester.Hash(), lf)
require.NoError(t, err)
leftHash = digester.Digest()

rightHash := digest.Digest("")
rf, err := os.Open(right)
require.NoError(t, err)
defer rf.Close()
digester = digest.Canonical.Digester()
_, err = io.Copy(digester.Hash(), rf)
require.NoError(t, err)
rightHash = digester.Digest()
require.Equal(t, leftHash, rightHash)
}

func (a *BlobCacheTestSuite) TestGenerateBlobcache(t *testing.T) {

ctx := tool.DefaultContext(t)

ctx.PrepareWorkDir(t)
defer ctx.Destroy(t)

rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs"))

rootfsReader := rootFs.ToOCITar(t);

ociBlobDigester := digest.Canonical.Digester()
ociBlob, err := ioutil.TempFile(ctx.Env.BlobDir, "oci-blob-")
require.NoError(t, err)

_, err = io.Copy(io.MultiWriter(ociBlobDigester.Hash(), ociBlob), rootfsReader)
require.NoError(t, err)

ociBlobDigest := ociBlobDigester.Digest()
err = os.Rename(ociBlob.Name(), filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex()))
require.NoError(t, err)

// use to generate blob.data and blob.meta
blobcacheDir := filepath.Join(ctx.Env.WorkDir, "blobcache")
err = os.MkdirAll(blobcacheDir, 0755)
require.NoError(t, err)

ctx.Env.BootstrapPath = filepath.Join(ctx.Env.WorkDir, "bootstrap")


tool.Run(t, fmt.Sprintf("%s create -t targz-ref --bootstrap %s --blob-dir %s --blob-cache %s --blob-meta %s %s",
ctx.Binary.Builder, ctx.Env.BootstrapPath, ctx.Env.BlobDir,
filepath.Join(blobcacheDir, "blob.data"), filepath.Join(blobcacheDir, "blob.meta"),
filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex())));

nydusd, err := tool.NewNydusd(tool.NydusdConfig{
NydusdPath: ctx.Binary.Nydusd,
BootstrapPath: ctx.Env.BootstrapPath,
ConfigPath: filepath.Join(ctx.Env.WorkDir, "nydusd-config.fusedev.json"),
MountPath: ctx.Env.MountDir,
APISockPath: filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock"),
BackendType: "localfs",
BackendConfig: fmt.Sprintf(`{"dir": "%s"}`, ctx.Env.BlobDir),
EnablePrefetch: ctx.Runtime.EnablePrefetch,
BlobCacheDir: ctx.Env.CacheDir,
CacheType: ctx.Runtime.CacheType,
CacheCompressed: ctx.Runtime.CacheCompressed,
RafsMode: ctx.Runtime.RafsMode,
DigestValidate: false,
})
require.NoError(t, err)

err = nydusd.Mount()
require.NoError(t, err)
defer func() {
if err := nydusd.Umount(); err != nil {
log.L.WithError(err).Errorf("umount")
}
}()

// make sure blobcache ready
err = filepath.WalkDir(ctx.Env.MountDir, func(path string, entry fs.DirEntry, err error) error {
require.Nil(t, err)
if entry.Type().IsRegular() {
targetPath, err := filepath.Rel(ctx.Env.MountDir, path)
require.NoError(t, err)
_, _ = os.ReadFile(targetPath)
}
return nil
})
require.NoError(t, err)

a.compare_two_files(t, filepath.Join(blobcacheDir, "blob.data"), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())))
a.compare_two_files(t, filepath.Join(blobcacheDir, "blob.meta"), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())))
}


func TestBlobCache(t *testing.T) {
os.Setenv("NYDUS_BUILDER_latest", "/root/code/image-service/target/release/nydus-image");
os.Setenv("NYDUS_NYDUSD_latest", "/root/code/image-service/target/release/nydusd");
test.Run(t, &BlobCacheTestSuite{T: t})
}
6 changes: 3 additions & 3 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (l *Layer) TargetPath(t *testing.T, path string) string {

func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir string) digest.Digest {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand All @@ -141,7 +141,7 @@ func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir stri

func (l *Layer) PackRef(t *testing.T, ctx Context, blobDir string, compress bool) (digest.Digest, digest.Digest) {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand Down Expand Up @@ -238,7 +238,7 @@ func (l *Layer) recordFileTree(t *testing.T) {
})
}

func (l *Layer) toOCITar(t *testing.T) io.ReadCloser {
func (l *Layer) ToOCITar(t *testing.T) io.ReadCloser {
return archive.Diff(context.Background(), "", l.workDir)
}

Expand Down
Loading

0 comments on commit 274cfba

Please sign in to comment.