feat!: Download and extract from stream

This commit is contained in:
Lucàs
2024-09-24 14:35:30 +02:00
parent 0382499c0d
commit 5a00871319
18 changed files with 249 additions and 251 deletions
+27
View File
@@ -0,0 +1,27 @@
import { createHash } from "node:crypto";
import { join } from "node:path";
import * as fs from "fs-extra";
class CacheService {
public static readonly CACHE_DIR: string = "./cache";
public static generateCacheKey(url: string): string {
return createHash("md5").update(url).digest("hex");
}
public static getCachePath(url: string): string {
const cacheKey = this.generateCacheKey(url);
return join(CacheService.CACHE_DIR, cacheKey);
}
public static isCached(url: string): boolean {
const cacheKey = CacheService.generateCacheKey(url);
const cachedPath = join(CacheService.CACHE_DIR, cacheKey);
return fs.pathExistsSync(cachedPath);
}
}
fs.ensureDirSync(CacheService.CACHE_DIR);
export default CacheService;
+44
View File
@@ -0,0 +1,44 @@
import { extname } from "node:path";
import { ArchiveExtractorFactory, ArchiveType } from "./archive_extractor";
class FileService {
public static async downloadAndExtract(
url: string,
output: string
): Promise<void> {
const fileType: string = FileService.getFileExtension(url);
const archiveExtractor = ArchiveExtractorFactory.getExtractor(
fileType as ArchiveType
);
console.log(`Downloading ${url}`);
const stream = await FileService.getFileStream(url);
return archiveExtractor.extract(stream, output);
}
private static async getFileStream(url: string): Promise<ReadableStream> {
const response = await fetch(url, {
method: "GET",
headers: {
"Content-Type": "application/octet-stream",
},
});
if (!response.ok) {
throw new Error(`Failed to download file: ${response.statusText}`);
}
if (!response.body) {
throw new Error("Response body is not a readable stream");
}
return response.body;
}
private static getFileExtension(url: string): string {
return extname(url).toLowerCase();
}
}
export default FileService;
@@ -0,0 +1,5 @@
interface ArchiveExtractor {
extract(stream: ReadableStream, destinationPath: string): Promise<void>;
}
export default ArchiveExtractor;
@@ -0,0 +1,16 @@
import ArchiveExtractor from "./ArchiveExtractor";
import ZipExtractor from "./ZipExtractor";
enum ArchiveType {
ZIP = ".zip",
}
class ArchiveExtractorFactory {
static getExtractor(archiveType: ArchiveType): ArchiveExtractor {
if (archiveType === ArchiveType.ZIP) return ZipExtractor.instance;
throw new Error("Unsupported archive type");
}
}
export default ArchiveExtractorFactory;
export { ArchiveType };
@@ -0,0 +1,18 @@
import ArchiveExtractor from "./ArchiveExtractor";
import unzipper from "unzipper";
import { pipeline } from "node:stream";
import { promisify } from "node:util";
class ZipExtractor implements ArchiveExtractor {
public static instance: ArchiveExtractor = new ZipExtractor();
async extract(
stream: ReadableStream,
destinationPath: string
): Promise<void> {
const streamPipeline = promisify(pipeline);
await streamPipeline(stream, unzipper.Extract({ path: destinationPath }));
}
}
export default ZipExtractor;
+6
View File
@@ -0,0 +1,6 @@
export { default as ArchiveExtractor } from "./ArchiveExtractor";
export {
default as ArchiveExtractorFactory,
ArchiveType,
} from "./ArchiveExtractorFactory";
export { default as ZipExtractor } from "./ZipExtractor";
@@ -0,0 +1,40 @@
import FileService from "../FileService";
import CacheService from "../CacheService";
import { extname, join } from "node:path";
import { DatasetParserFactory, DatasetType } from "../dataset_parser";
class NudgerDatasetService {
private static URL: string =
"https://files.opendatarchives.fr/data.cquest.org/open4goods/gtin-open-data.zip";
private static SOURCE_FILE: string = "open4goods-full-gtin-dataset.csv";
private static CACHE_PATH: string = CacheService.getCachePath(
NudgerDatasetService.URL
);
public static loadDataset(): Promise<void> {
if (CacheService.isCached(NudgerDatasetService.URL)) {
return Promise.resolve();
}
return FileService.downloadAndExtract(
NudgerDatasetService.URL,
NudgerDatasetService.CACHE_PATH
);
}
public static parse() {
const extension = extname(NudgerDatasetService.SOURCE_FILE).toLowerCase();
const parser = DatasetParserFactory.getParser(extension as DatasetType);
return parser.parse(NudgerDatasetService.getSourcePath());
}
public static getSourcePath(): string {
return join(
NudgerDatasetService.CACHE_PATH,
NudgerDatasetService.SOURCE_FILE
);
}
}
export default NudgerDatasetService;
+17
View File
@@ -0,0 +1,17 @@
import DatasetParser from "./DatasetParser";
import * as fs from "node:fs";
import Papa from "papaparse";
class CsvParser implements DatasetParser {
public static instance: CsvParser = new CsvParser();
async parse(filePath: string) {
const stream = fs.createReadStream(filePath);
Papa.parse(stream, {
worker: true,
step: (res) => console.log("Row:", res.data),
});
}
}
export default CsvParser;
@@ -0,0 +1,5 @@
interface DatasetParser {
parse(filePath: string): unknown;
}
export default DatasetParser;
@@ -0,0 +1,16 @@
import DatasetParser from "./DatasetParser";
import CsvParser from "./CsvParser";
enum DatasetType {
CSV = ".csv",
}
class DatasetParserFactory {
static getParser(fileType: DatasetType): DatasetParser {
if (fileType === DatasetType.CSV) return CsvParser.instance;
throw new Error("Unsupported file type");
}
}
export default DatasetParserFactory;
export { DatasetType };
+6
View File
@@ -0,0 +1,6 @@
export { default as CsvParser } from "./CsvParser";
export { default as DatasetParser } from "./DatasetParser";
export {
default as DatasetParserFactory,
DatasetType,
} from "./DatasetParserFactory";
-131
View File
@@ -1,131 +0,0 @@
import axios from "axios";
import * as unzipper from "unzipper";
import * as fs from "fs-extra";
import * as zlib from "zlib";
import { extname, join, basename } from "path";
import crypto from "crypto"; // Utilisé pour générer des identifiants uniques basés sur l'URL
type SupportedFormats = "zip" | "gz" | "gzip";
class FileService {
private cacheDir: string;
constructor() {
this.cacheDir = "./cache";
fs.ensureDirSync(this.cacheDir);
}
/**
* Télécharger et extraire le fichier à partir de l'URL
* @param url URL du fichier à télécharger
*/
async downloadAndExtract(url: string): Promise<void> {
try {
const fileType = this.getFileExtension(url);
if (!fileType) throw new Error("Unsupported file format");
if (this.isInCache(url)) return;
const response = await axios({
method: "GET",
url,
responseType: "stream",
});
console.log(`Downloading : ${url}`);
// Décompresser et sauvegarder dans le cache
const cacheKey = this.generateCacheKey(url);
const cachedPath = join(this.cacheDir, cacheKey);
fs.ensureDirSync(cachedPath);
if (fileType === "zip") await this.extractZip(response.data, cachedPath);
if (fileType === "gz" || fileType === "gzip")
await this.extractGzip(
response.data,
join(cachedPath, basename(url).replace(/\.(gz|gzip)$/, ""))
);
console.log(`Downloaded and extracted : ${basename(url)}`);
} catch (error) {
console.error(
"An error occurred while downloading and extracting the file",
error
);
}
}
/**
* Vérifier si le fichier est déjà en cache
* @param url URL du fichier
* @private
*/
private isInCache(url: string): boolean {
const cacheKey = this.generateCacheKey(url);
const cachedPath = join(this.cacheDir, cacheKey);
return fs.pathExistsSync(cachedPath);
}
/**
* Extraire les fichiers ZIP et stocker dans le cache
* @param stream
* @param cachePath
* @private
*/
private async extractZip(
stream: NodeJS.ReadableStream,
cachePath: string
): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(unzipper.Extract({ path: cachePath }))
.on("close", resolve)
.on("error", reject);
});
}
/**
* Extraire les fichiers GZ et GZIP et stocker dans le cache
* @param stream Flux du fichier téléchargé
* @param cachePath Chemin où stocker le fichier décompressé
* @private
*/
private async extractGzip(
stream: NodeJS.ReadableStream,
cachePath: string
): Promise<void> {
return new Promise((resolve, reject) => {
// Ajouter une extension correcte (par exemple, si le fichier original est 'file.gz', le résultat sera 'file')
const decompressedFilePath = cachePath.replace(/\.gz$/, "");
const writeStream = fs.createWriteStream(decompressedFilePath);
// Pipeliner le flux du téléchargement et la décompression
stream
.pipe(zlib.createGunzip()) // Décompresser le flux
.pipe(writeStream) // Écrire le fichier décompressé
.on("finish", resolve)
.on("error", reject);
});
}
/**
* Obtenir l'extension du fichier à partir de l'URL
* @param url URL du fichier
*/
private getFileExtension(url: string): SupportedFormats | null {
const extension = extname(url).toLowerCase();
if (extension === ".zip") return "zip";
if (extension === ".gz" || extension === ".gzip") return "gz";
return null;
}
/**
* Générer un identifiant unique pour le fichier basé sur l'URL
* @param url URL du fichier
*/
private generateCacheKey(url: string): string {
return crypto.createHash("md5").update(url).digest("hex");
}
}
export default new FileService();