Skip to content
This repository has been archived by the owner on Dec 16, 2023. It is now read-only.

Commit

Permalink
Update cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
Sirherobrine23 committed May 8, 2023
1 parent 627519d commit 2117804
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 81 deletions.
255 changes: 196 additions & 59 deletions packages/cloud/src/oracleBucket.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import * as ociBucket from "oci-objectstorage";
import * as ociAuth from "oci-common";
import { createReadStream, createWriteStream, promises as fs } from "node:fs";
import { createReadStream, promises as fs } from "node:fs";
import { extendsFS } from "@sirherobrine23/extends";
import { finished } from "node:stream/promises";
import extendsFS from "@sirherobrine23/extends";
import { http } from "@sirherobrine23/http";
import chokidar from "chokidar";
import stream from "node:stream";
import path from "node:path";
import { http } from "@sirherobrine23/http";

type RegionPretty<S extends string> = S extends `${infer T}_${infer U}` ? `${T}-${RegionPretty<U>}` : S
export type oracleRegions = RegionPretty<Lowercase<Exclude<Exclude<keyof typeof ociAuth.Region, typeof ociAuth.Region>, "values"|"enableInstanceMetadata"|"register"|"fromRegionId"|"getRegionIdFromShortCode"|"hasUsedConfigFile"|"prototype"|"REGION_STRING"|"REGION_ID_STRING"|"REGION_ID">>>;
Expand Down Expand Up @@ -129,31 +129,94 @@ export async function oracleBucket(config: oracleOptions) {
if (!(typeof config.namespace === "string" && !!(config.namespace = config.namespace.trim()))) config.namespace = (await client.getNamespace({})).value;
await client.getBucket({bucketName: config.name, namespaceName: config.namespace});


const partialFunctions = {
/**
* Upload file to Oracle cloud bucket.
*
* @param fileName - File location.
* @param storageTier - Optional storage tier, default from seted in the Bucket.
* @returns - Writable stream to Write file (is a PassThrough but for writing only).
*/
uploadFile(fileName: string, storageTier?: "Archive"|"InfrequentAccess"|"Standard"): stream.Writable {
const strm = new stream.PassThrough();
client.putObject({
namespaceName: config.namespace,
bucketName: config.name,
objectName: fileName,
putObjectBody: stream.Readable.from(strm),
storageTier: storageTier === "Archive" ? ociBucket.models.StorageTier.Archive : storageTier === "InfrequentAccess" ? ociBucket.models.StorageTier.InfrequentAccess : storageTier === "Standard" ? ociBucket.models.StorageTier.Standard : undefined,
}).then(() => {}, err => strm.emit("error", err));
return strm;
uploadFile(fileName: string, storageTier?: "Archive"|"InfrequentAccess"|"Standard") {
let uploadId: string, uploadPartNum: string[] = [];
return new stream.Writable({
autoDestroy: true,
emitClose: true,
async write(chunk: Buffer, encoding, callback) {
try {
if (!(Buffer.isBuffer(chunk))) chunk = Buffer.from(chunk, encoding);
if (!uploadId) {
const { multipartUpload } = await client.createMultipartUpload({
namespaceName: config.namespace,
bucketName: config.name,
createMultipartUploadDetails: {
object: fileName,
storageTier: storageTier === "Archive" ? ociBucket.models.StorageTier.Archive : storageTier === "InfrequentAccess" ? ociBucket.models.StorageTier.InfrequentAccess : storageTier === "Standard" ? ociBucket.models.StorageTier.Standard : undefined,
}
});
uploadId = multipartUpload.uploadId;
}
const res = await client.uploadPart({
namespaceName: config.namespace,
bucketName: config.name,
objectName: fileName,
uploadId,
uploadPartBody: chunk,
uploadPartNum: uploadPartNum.length + 1,
});
uploadPartNum.push(res.eTag);
callback();
} catch (e) {
callback(e);
}
},
async destroy(error, callback) {
if (!uploadId) return callback(error);
try {
if (error) {
await client.abortMultipartUpload({
namespaceName: config.namespace,
bucketName: config.name,
objectName: fileName,
uploadId
});
callback(error);
} else {
await client.commitMultipartUpload({
namespaceName: config.namespace,
bucketName: config.name,
objectName: fileName,
uploadId,
commitMultipartUploadDetails: {
partsToCommit: uploadPartNum.map((etag, index) => ({etag, partNum: index + 1}))
}
});
callback(null);
}
} catch (e) {
e["originalError"] = error;
callback(e);
}
},
});
},
async deleteFile(pathLocation: string) {
/**
* Delete file in the Bucket
* @param pathLocation - File location/name.
*/
async deleteObject(pathLocation: string) {
await client.deleteObject({
namespaceName: config.namespace,
bucketName: config.name,
objectName: pathLocation
});
},
/**
* List files and folder in the Bucket
* @param folder - Folder name
* @returns
*/
async listFiles(folder?: string) {
const objects: oracleFileListObject[] = [];
let start: any;
Expand Down Expand Up @@ -182,38 +245,16 @@ export async function oracleBucket(config: oracleOptions) {

return objects;
},
/**
* Get file from bucket.
* @param pathLocation - File path
* @returns return file stream
*/
async getFileStream(pathLocation: string) {
const { value } = await client.getObject({namespaceName: config.namespace, bucketName: config.name, objectName: pathLocation});
if (!value) throw new Error("No file found");
else if (value instanceof stream.Readable) return value;
else return stream.Readable.fromWeb(value as any);
},
async watch(folderPath: string, options?: {downloadFist?: boolean, remoteFolder?: string}) {
if (!options) options = {};
if (!folderPath) throw new TypeError("Folder path is required");
else if (!(await extendsFS.exists(folderPath))) throw new Error("Folder path is not exists");
else if (!(await extendsFS.isDirectory(folderPath))) throw new Error("Folder path is not a directory");
if (options.downloadFist) {
let { remoteFolder = "" } = options;
const filesList = (await partialFunctions!.listFiles(remoteFolder)).map(d => d.name);
const localList = (await extendsFS.readdir(folderPath)).map(file => path.posix.resolve("/", path.relative(folderPath, file)));
for (const local of localList) if (!filesList.includes(local)) await fs.unlink(path.posix.resolve(folderPath, local));
for await (const remote of filesList) await new Promise(async (done, reject) => (await partialFunctions!.getFileStream(remote)).pipe(createWriteStream(path.posix.resolve(folderPath, remote))).on("error", reject).once("done", done));
}

return chokidar.watch(folderPath, {
ignoreInitial: true,
atomic: true,
}).on("add", async (filePath) => {
await finished(createReadStream(filePath).pipe(partialFunctions.uploadFile(path.posix.resolve("/", path.relative(folderPath, filePath)))))
}).on("change", async (filePath) => {
await finished(createReadStream(filePath).pipe(partialFunctions.uploadFile(path.posix.resolve("/", path.relative(folderPath, filePath)))))
}).on("unlink", async (filePath) => {
await partialFunctions!.deleteFile(path.posix.resolve("/", path.relative(folderPath, filePath)));
}).on("unlinkDir", async (filePath) => {
const filesList = (await partialFunctions!.listFiles(path.posix.resolve("/", path.relative(folderPath, filePath)))).map(d => d.name);
for (const remote of filesList) await partialFunctions!.deleteFile(remote);
});
}
};
return partialFunctions;
Expand All @@ -229,6 +270,8 @@ export async function oracleBucket(config: oracleOptions) {
*/
export function oracleBucketPreAuth(region: oracleRegions, namespace: string, name: string, preAuthKey: string) {
getRegion(region); // Check valid region
if (!preAuthKey) throw new Error("Pre auth key is required");
const bucketPath = path.posix.join("/p", preAuthKey, "n", namespace, "b", name);
const funs = {
/**
* Get file from Bucket
Expand All @@ -237,9 +280,11 @@ export function oracleBucketPreAuth(region: oracleRegions, namespace: string, na
* @returns
*/
getFile(filename: string) {
return http.streamRoot(new URL(path.posix.join("/p", preAuthKey, "n", namespace, "b", name, "o", encodeURIComponent(filename)), `https://objectstorage.${region}.oraclecloud.com`), {
disableHTTP2: true
}, true);
if (typeof filename !== "string") throw new Error("Requrie file name!");
filename = filename.trim().slice(-1024).split(path.win32.sep).join(path.posix.sep);
if (filename.startsWith(path.posix.sep)) filename = filename.slice(1);
filename = encodeURIComponent(filename);
return http.streamRoot(new URL(path.posix.join(bucketPath, "o", filename), `https://objectstorage.${region}.oraclecloud.com`), {disableHTTP2: true}, true);
},
/**
* Upload file to bucket
Expand All @@ -248,21 +293,67 @@ export function oracleBucketPreAuth(region: oracleRegions, namespace: string, na
* @param storageTier - Another tier to storage file
* @returns Stream to write file
*/
uploadFile(filename: string, storageTier?: oracleFileListObject["storageTier"]): stream.Writable {
return new class writeFile extends stream.PassThrough {
constructor() {
super();
http.bufferRequest(new URL(path.posix.join("/p", preAuthKey, "n", namespace, "b", name, "o", encodeURIComponent(filename)), `https://objectstorage.${region}.oraclecloud.com`), {
method: "PUT",
body: stream.Readable.from(this),
disableHTTP2: true,
headers: {
...(!!storageTier ? {"storage-tier": storageTier} : {}),
"Content-Type": "application/octet-stream",
uploadFile(filename: string, storageTier?: oracleFileListObject["storageTier"]) {
if (typeof filename !== "string") throw new Error("Requrie file name!");
filename = filename.trim().slice(-1024).split(path.win32.sep).join(path.posix.sep);
if (filename.startsWith(path.posix.sep)) filename = filename.slice(1);
filename = encodeURIComponent(filename);
let uploaduuid: string, uploadPartNum = 1;
return new stream.Writable({
autoDestroy: true,
emitClose: true,
async write(chunk: Buffer, encoding, callback) {
try {
if (!(Buffer.isBuffer(chunk))) chunk = Buffer.from(chunk, encoding);
if (!uploaduuid) {
const { body } = await http.jsonRequest(new URL(path.posix.join(bucketPath, "o", filename), `https://objectstorage.${region}.oraclecloud.com`), {
disableHTTP2: true,
method: "PUT",
headers: {
"opc-multipart": "true",
...(!!storageTier ? {"storage-tier": storageTier} : {}),
},
body: Buffer.from([])
});
uploaduuid = body.uploadId;
}
}).catch(err => this.emit("error", err));
}
}
await http.bufferRequest(new URL(path.posix.join(bucketPath, "u", filename, "id", uploaduuid, uploadPartNum.toString()), `https://objectstorage.${region}.oraclecloud.com`), {
disableHTTP2: true,
method: "PUT",
headers: {
"Content-Lenght": chunk.byteLength.toString(),
},
body: chunk
});
uploadPartNum++;
this.emit("progress", uploadPartNum, chunk.byteLength);
callback();
} catch (err) {
callback(err);
}
},
async destroy(error, callback) {
try {
if (uploaduuid) {
if (error) {
await http.bufferRequest(new URL(path.posix.join(bucketPath, "u", filename, "id", uploaduuid, "/"), `https://objectstorage.${region}.oraclecloud.com`), {
disableHTTP2: true,
method: "DELETE"
});
} else {
await http.bufferRequest(new URL(path.posix.join(bucketPath, "u", filename, "id", uploaduuid, "/"), `https://objectstorage.${region}.oraclecloud.com`), {
disableHTTP2: true,
method: "POST",
body: Buffer.from([])
});
}
}
callback(error);
} catch (err) {
callback(err);
}
},
});
},
/**
* List files in Bucket
Expand All @@ -272,7 +363,7 @@ export function oracleBucketPreAuth(region: oracleRegions, namespace: string, na
const data: oracleFileListObject[] = [];
let startAfter: string;
while (true) {
const response = await http.jsonRequest<{nextStartWith?: string, objects: ociBucket.models.ObjectSummary[]}>(new URL(path.posix.join("/p", preAuthKey, "n", namespace, "b", name, "o"), `https://objectstorage.${region}.oraclecloud.com`), {
const response = await http.jsonRequest<{nextStartWith?: string, objects: ociBucket.models.ObjectSummary[]}>(new URL(path.posix.join(bucketPath, "o"), `https://objectstorage.${region}.oraclecloud.com`), {
method: "GET",
query: {
limit: 1000,
Expand All @@ -299,4 +390,50 @@ export function oracleBucketPreAuth(region: oracleRegions, namespace: string, na
}
};
return funs;
}

export interface watchConfig {
skipSyncFiles?: boolean;
remoteFolder?: string;
listFiles(folderPath?: string): Promise<oracleFileListObject[]>;
uploadFile(filename: string): stream.Writable;
deleteObject?(path: string): Promise<void>;
}

/**
* Sync files and folder from local to bucket
* @param folderPath - Local folder path
* @param config - Settings to sync files and more
*/
export async function watch(folderPath: string, config: watchConfig) {
if (!folderPath) throw new TypeError("Folder path is required");
if (!config) throw new Error("Require configs!");
else if (!(await extendsFS.exists(folderPath))) throw new Error("Folder path is not exists");
else if (!(await extendsFS.isDirectory(folderPath))) throw new Error("Folder path is not a directory");
config.remoteFolder ||= "";
config.deleteObject ||= () => Promise.resolve();
if (config.skipSyncFiles !== true) {
const remoteFiles = await config.listFiles(config.remoteFolder);
await extendsFS.readdirV2(folderPath, true, () => true, async (relativePath, fullPath, stats) => {
if (stats.isDirectory()) return;
const find = remoteFiles.find(item => item.name === path.posix.resolve("/", config.remoteFolder||"", relativePath).slice(1));
if (!find) return finished(createReadStream(fullPath).pipe(config.uploadFile(path.posix.resolve("/", config.remoteFolder||"", relativePath))), {error: true});
else if (stats.size !== find.size) return finished(createReadStream(fullPath).pipe(config.uploadFile(path.posix.resolve("/", config.remoteFolder||"", relativePath))), {error: true});
});
}

async function syncFile(local: string) {
if ((await fs.lstat(local)).isDirectory()) return;
return finished(createReadStream(local).pipe(config.uploadFile(path.posix.resolve("/", config.remoteFolder||"", path.relative(folderPath, local)))))
}
async function deleteFF(local: string) {
return config.deleteObject(path.posix.resolve("/", config.remoteFolder||"", path.relative(folderPath, local)));
}

const watch = chokidar.watch(folderPath, {ignoreInitial: true, atomic: true});
watch.on("add", async (filePath) => syncFile(filePath).catch(err => watch.emit("error", err)));
watch.on("change", async (filePath) => syncFile(filePath).catch(err => watch.emit("error", err)));
watch.on("unlink", async (filePath) => deleteFF(filePath).catch(err => watch.emit("error", err)));
watch.on("unlinkDir", async (filePath) => deleteFF(filePath).catch(err => watch.emit("error", err)));
return watch;
}
1 change: 1 addition & 0 deletions packages/cloud/test/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Client auths
token.json
client_secret*.json
oci_secret*.json

# Save files
tree.json
Expand Down
35 changes: 35 additions & 0 deletions packages/cloud/test/ociPre.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { oracleBucketPreAuth, watch } from "../src/oracleBucket.js";
import { randomBytesStream } from "@sirherobrine23/extends/src/crypto.js";
import { readFile, readdir } from "node:fs/promises";
import { fileURLToPath } from "node:url";
import { randomInt } from "node:crypto";
import { finished } from "node:stream/promises";
import path from "node:path";
const __dirname = fileURLToPath(new URL(".", import.meta.url));

const { region, namespace, name, preauth } = JSON.parse(await readFile(path.join(__dirname, String((await readdir(__dirname)).find(r => r.startsWith("oci_secret")))), "utf8"));
const preOCI = oracleBucketPreAuth(region, namespace, name, preauth);

const files = await preOCI.listFiles();
console.log(files);

const size = Math.round(randomInt(1024, 1024 * 256));
let size2 = size;

console.log("Creating random file of size", size, "bytes");
await finished((new randomBytesStream(size)).pipe(preOCI.uploadFile(path.join("test"+files.length+".txt")).on("progress", (_, size) => console.log("Progress: ", size, size2 -= size)))).catch(err => {
console.error(err);
process.exit(1);
});
// console.log("Upload complete\nInit delete");
// await preOCI.deleteObject(path.join("test"+files.length+".txt"));

console.log("Init watch");
const re = await watch(__dirname, {
skipSyncFiles: false,
remoteFolder: "test",
listFiles: preOCI.listFiles,
uploadFile: preOCI.uploadFile,
});

re.once("change", () => re.once("unlink", () => re.close()));
Loading

0 comments on commit 2117804

Please sign in to comment.