feat: Stream download, extract and parse datasets

This commit is contained in:
Lucàs
2024-09-28 16:19:43 +02:00
parent 5a00871319
commit ffc1ad3e84
32 changed files with 501 additions and 791 deletions
+193 -608
View File
File diff suppressed because it is too large Load Diff
+5 -5
View File
@@ -15,18 +15,18 @@
"license": "ISC",
"description": "",
"dependencies": {
"dmn-js": "^16.7.1",
"axios": "^1.7.7",
"csvtojson": "^2.0.10",
"dotenv": "^16.4.5",
"express": "^4.21.0",
"fs-extra": "^11.2.0",
"papaparse": "^5.4.1",
"node-stream-zip": "^1.15.0",
"tar-stream": "^3.1.7",
"unzipper": "^0.12.3"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/fs-extra": "^11.0.4",
"@types/node": "^22.5.5",
"@types/papaparse": "^5.3.14",
"@types/tar-stream": "^3.1.3",
"@types/unzipper": "^0.10.10",
"prettier": "3.3.3",
"ts-node": "^10.9.2",
+5 -4
View File
@@ -1,9 +1,10 @@
import dotenv from "dotenv";
import Server from "./Server";
import NudgerDatasetService from "./services/dataset/NudgerDatasetService";
import { DatasetCollection } from "./services/dataset";
dotenv.config();
Promise.all([NudgerDatasetService.loadDataset()])
.then(() => new Server().start())
.catch(console.error);
DatasetCollection.loadAll()
.then(() => console.log("All datasets are loaded"))
.then(() => new Server().start())
.catch(console.error);
+14 -11
View File
@@ -1,27 +1,30 @@
import { createHash } from "node:crypto";
import { join } from "node:path";
import * as fs from "fs-extra";
import { existsSync, mkdirSync } from "node:fs";
class CacheService {
public static readonly CACHE_DIR: string = "./cache";
public static generateCacheKey(url: string): string {
return createHash("md5").update(url).digest("hex");
public static generateCacheKey(name: string): string {
return createHash("md5").update(name).digest("hex");
}
public static getCachePath(url: string): string {
const cacheKey = this.generateCacheKey(url);
return join(CacheService.CACHE_DIR, cacheKey);
public static getCachePath(name: string, extension: string = ""): string {
const cacheKey = this.generateCacheKey(name);
return join(CacheService.CACHE_DIR, `${cacheKey}${extension}`);
}
public static isCached(url: string): boolean {
const cacheKey = CacheService.generateCacheKey(url);
public static isCached(name: string, extension: string = ""): boolean {
const cacheKey = CacheService.generateCacheKey(name);
const cachedPath = join(CacheService.CACHE_DIR, cacheKey);
return fs.pathExistsSync(cachedPath);
const cachedPath = join(CacheService.CACHE_DIR, `${cacheKey}${extension}`);
return existsSync(cachedPath);
}
}
fs.ensureDirSync(CacheService.CACHE_DIR);
if (!existsSync(CacheService.CACHE_DIR)) {
mkdirSync(CacheService.CACHE_DIR);
}
export default CacheService;
+10
View File
@@ -0,0 +1,10 @@
import { Dataset } from "./dataset";
import { Data } from "./data";
class DMN {
readonly xmlPath: string;
constructor(xmlPath: string, dataset: Dataset<Data>) {
this.xmlPath = xmlPath;
}
public parseXml() {}
}
+28 -34
View File
@@ -1,43 +1,37 @@
import { extname } from "node:path";
import { ArchiveExtractorFactory, ArchiveType } from "./archive_extractor";
import { Readable } from "node:stream";
import axios from "axios";
import * as fs from "node:fs";
import { WriteStream } from "node:fs";
class FileService {
public static async downloadAndExtract(
url: string,
output: string
): Promise<void> {
const fileType: string = FileService.getFileExtension(url);
const archiveExtractor = ArchiveExtractorFactory.getExtractor(
fileType as ArchiveType
);
console.log(`Downloading ${url}`);
const stream = await FileService.getFileStream(url);
return archiveExtractor.extract(stream, output);
}
private static async getFileStream(url: string): Promise<ReadableStream> {
const response = await fetch(url, {
/**
* Get the compressed file stream from a given url
* @param url - The url of the file
* @return Promise<Readable> - The compressed file stream
*/
public static async getFileStream(url: string): Promise<Readable> {
return axios({
method: "GET",
headers: {
"Content-Type": "application/octet-stream",
},
});
if (!response.ok) {
throw new Error(`Failed to download file: ${response.statusText}`);
}
if (!response.body) {
throw new Error("Response body is not a readable stream");
}
return response.body;
url: url,
responseType: "stream",
}).then((response) => response.data);
}
private static getFileExtension(url: string): string {
return extname(url).toLowerCase();
/**
* Create a write stream to a file
* @param path - The path of the file
*/
public static createWriteStream(path: string): WriteStream {
return fs.createWriteStream(path);
}
/**
* Delete a file from the file system
* @param cachePath - The path of the file to delete
*/
public static deleteFile(cachePath: string): void {
fs.unlinkSync(cachePath);
}
}
+7
View File
@@ -0,0 +1,7 @@
import { Duplex } from "node:stream";
interface Archive {
extract(source: string): Duplex;
}
export default Archive;
+12
View File
@@ -0,0 +1,12 @@
import { Archive, ZipArchive, ArchiveType, GzipArchive } from "./";
class ArchiveFactory {
static getArchive(archiveType: ArchiveType): Archive {
if (archiveType === ArchiveType.ZIP) return ZipArchive.instance;
if ([ArchiveType.GZIP, ArchiveType.GZ].includes(archiveType)) return GzipArchive.instance;
throw new Error("Unsupported archive type");
}
}
export default ArchiveFactory;
+7
View File
@@ -0,0 +1,7 @@
enum ArchiveType {
ZIP = ".zip",
GZIP = ".gzip",
GZ = ".gz",
}
export default ArchiveType;
+12
View File
@@ -0,0 +1,12 @@
import { createGunzip } from "node:zlib";
import { Duplex } from "node:stream";
import { Archive } from "./";
class GzipArchive implements Archive {
public static instance: Archive = new GzipArchive();
public extract(source: string): Duplex {
return createGunzip();
}
}
export default GzipArchive;
+15
View File
@@ -0,0 +1,15 @@
import { Archive } from "./";
import { Duplex } from "node:stream";
import { ParseOne } from "unzipper";
class ZipArchive implements Archive {
public static instance: Archive = new ZipArchive();
public extract(source: string): Duplex {
return ParseOne(new RegExp(source), {
forceStream: true,
});
}
}
export default ZipArchive;
+7
View File
@@ -0,0 +1,7 @@
export { default as ArchiveType } from "./ArchiveType";
export { default as ArchiveFactory } from "./ArchiveFactory";
export { default as Archive } from "./Archive";
export { default as ZipArchive } from "./ZipArchive";
export { default as GzipArchive } from "./GzipArchive";
@@ -1,5 +0,0 @@
interface ArchiveExtractor {
extract(stream: ReadableStream, destinationPath: string): Promise<void>;
}
export default ArchiveExtractor;
@@ -1,16 +0,0 @@
import ArchiveExtractor from "./ArchiveExtractor";
import ZipExtractor from "./ZipExtractor";
enum ArchiveType {
ZIP = ".zip",
}
class ArchiveExtractorFactory {
static getExtractor(archiveType: ArchiveType): ArchiveExtractor {
if (archiveType === ArchiveType.ZIP) return ZipExtractor.instance;
throw new Error("Unsupported archive type");
}
}
export default ArchiveExtractorFactory;
export { ArchiveType };
@@ -1,18 +0,0 @@
import ArchiveExtractor from "./ArchiveExtractor";
import unzipper from "unzipper";
import { pipeline } from "node:stream";
import { promisify } from "node:util";
class ZipExtractor implements ArchiveExtractor {
public static instance: ArchiveExtractor = new ZipExtractor();
async extract(
stream: ReadableStream,
destinationPath: string
): Promise<void> {
const streamPipeline = promisify(pipeline);
await streamPipeline(stream, unzipper.Extract({ path: destinationPath }));
}
}
export default ZipExtractor;
-6
View File
@@ -1,6 +0,0 @@
export { default as ArchiveExtractor } from "./ArchiveExtractor";
export {
default as ArchiveExtractorFactory,
ArchiveType,
} from "./ArchiveExtractorFactory";
export { default as ZipExtractor } from "./ZipExtractor";
+3
View File
@@ -0,0 +1,3 @@
interface Data {}
export default Data;
+28
View File
@@ -0,0 +1,28 @@
import { Data } from "./";
type RawNudgerData = {
code: string; // "3260014791012",
brand: string; // "ALSATEK",
model: string; // "TL33171",
name: string; // "alsatek lg g3 coque protection aluminium rouge bumper tl33171",
last_updated: string; // "1562430134146",
gs1_country: string; // "FR",
offers_count: string; // "0",
min_price: string; // "",
min_price_compensation: string; // "",
currency: string; // "",
categories: string; // "ACCESSOIRES>COQUE SMARTPHONE",
url: string; // ""
};
class NudgerData implements Data {
barcode: string;
country: string;
constructor(rawData: RawNudgerData) {
this.barcode = rawData.code;
this.country = rawData.gs1_country;
}
}
export default NudgerData;
+3
View File
@@ -0,0 +1,3 @@
export {default as Data} from "./Data";
export {default as NudgerData} from "./NudgerData";
+76
View File
@@ -0,0 +1,76 @@
import { pipeline } from "node:stream";
import { promisify } from "node:util";
import CacheService from "../CacheService";
import FileService from "../FileService";
import { ArchiveFactory, ArchiveType } from "../archive";
import { ParserFactory } from "../parser";
import { DatasetType } from "./";
/**
* Represents a dataset that can be loaded and queried
*/
class Dataset<Data> {
readonly url: string;
readonly sourceFile: string;
readonly archiveType: ArchiveType;
readonly datasetType: DatasetType;
readonly cachePath: string;
/**
* Create a new dataset instance
* @param url - The URL of the dataset
* @param sourceFile - The file name of the dataset in the archive
* @param archiveType - The type of the archive
* @param datasetType - The type of the dataset
*/
constructor(
url: string,
sourceFile: string,
archiveType: ArchiveType,
datasetType: DatasetType,
) {
this.url = url;
this.sourceFile = sourceFile;
this.archiveType = archiveType;
this.datasetType = datasetType;
this.cachePath = CacheService.getCachePath(this.url, ".json");
}
/**
* Load the dataset by downloading, extracting, parsing and saving it in cache
* @return Promise<void> - A promise that resolves when the dataset is loaded
* @throws {Error} - If the dataset cannot be loaded
*/
public async load(): Promise<void> {
if (CacheService.isCached(this.url, ".json")) {
console.log(`Already cached: ${this.url}`);
return;
}
const archive = ArchiveFactory.getArchive(this.archiveType);
const parser = ParserFactory.getParser(this.datasetType);
const pipelineAsync = promisify(pipeline);
console.log(`Download: ${this.url}`);
await pipelineAsync(
await FileService.getFileStream(this.url),
archive.extract(this.sourceFile),
parser.parse(),
FileService.createWriteStream(this.cachePath),
);
}
/**
* Get a number of data entries from the dataset
* @param count - The number of data entries to get (default: 10)
*/
public get(count: number = 10): Data[] {
// TODO: Implement the get method
return [];
}
}
export default Dataset;
+20
View File
@@ -0,0 +1,20 @@
import { Data, NudgerData } from "../data";
import { ArchiveType } from "../archive";
import { Dataset, DatasetType } from "./";
class DatasetCollection {
static datasets: Dataset<Data>[] = [
new Dataset<NudgerData>(
"https://files.opendatarchives.fr/data.cquest.org/open4goods/gtin-open-data.zip",
"open4goods-full-gtin-dataset.csv",
ArchiveType.ZIP,
DatasetType.CSV,
),
];
public static loadAll(): Promise<void[]> {
return Promise.all(this.datasets.map((dataset) => dataset.load()));
}
}
export default DatasetCollection;
+10
View File
@@ -0,0 +1,10 @@
enum DatasetType {
CSV = ".csv",
// TSV = ".tsv",
// PARQUET = ".parquet",
// JSONL = ".jsonl",
// XML = ".xml",
// RDF = ".rdf",
}
export default DatasetType;
@@ -1,40 +0,0 @@
import FileService from "../FileService";
import CacheService from "../CacheService";
import { extname, join } from "node:path";
import { DatasetParserFactory, DatasetType } from "../dataset_parser";
class NudgerDatasetService {
private static URL: string =
"https://files.opendatarchives.fr/data.cquest.org/open4goods/gtin-open-data.zip";
private static SOURCE_FILE: string = "open4goods-full-gtin-dataset.csv";
private static CACHE_PATH: string = CacheService.getCachePath(
NudgerDatasetService.URL
);
public static loadDataset(): Promise<void> {
if (CacheService.isCached(NudgerDatasetService.URL)) {
return Promise.resolve();
}
return FileService.downloadAndExtract(
NudgerDatasetService.URL,
NudgerDatasetService.CACHE_PATH
);
}
public static parse() {
const extension = extname(NudgerDatasetService.SOURCE_FILE).toLowerCase();
const parser = DatasetParserFactory.getParser(extension as DatasetType);
return parser.parse(NudgerDatasetService.getSourcePath());
}
public static getSourcePath(): string {
return join(
NudgerDatasetService.CACHE_PATH,
NudgerDatasetService.SOURCE_FILE
);
}
}
export default NudgerDatasetService;
+3
View File
@@ -0,0 +1,3 @@
export {default as DatasetCollection} from "./DatasetCollection";
export {default as DatasetType} from "./DatasetType";
export {default as Dataset} from "./Dataset";
-17
View File
@@ -1,17 +0,0 @@
import DatasetParser from "./DatasetParser";
import * as fs from "node:fs";
import Papa from "papaparse";
class CsvParser implements DatasetParser {
public static instance: CsvParser = new CsvParser();
async parse(filePath: string) {
const stream = fs.createReadStream(filePath);
Papa.parse(stream, {
worker: true,
step: (res) => console.log("Row:", res.data),
});
}
}
export default CsvParser;
@@ -1,5 +0,0 @@
interface DatasetParser {
parse(filePath: string): unknown;
}
export default DatasetParser;
@@ -1,16 +0,0 @@
import DatasetParser from "./DatasetParser";
import CsvParser from "./CsvParser";
enum DatasetType {
CSV = ".csv",
}
class DatasetParserFactory {
static getParser(fileType: DatasetType): DatasetParser {
if (fileType === DatasetType.CSV) return CsvParser.instance;
throw new Error("Unsupported file type");
}
}
export default DatasetParserFactory;
export { DatasetType };
-6
View File
@@ -1,6 +0,0 @@
export { default as CsvParser } from "./CsvParser";
export { default as DatasetParser } from "./DatasetParser";
export {
default as DatasetParserFactory,
DatasetType,
} from "./DatasetParserFactory";
+13
View File
@@ -0,0 +1,13 @@
import { Parser } from "./";
import { Duplex } from "node:stream";
import csv from "csvtojson";
class CsvParser implements Parser {
public static instance: CsvParser = new CsvParser();
public parse(): Duplex {
return csv();
}
}
export default CsvParser;
+10
View File
@@ -0,0 +1,10 @@
import { Duplex } from "node:stream";
interface Parser {
/**
* Parse the content of the stream into JSON objects
*/
parse(): Duplex;
}
export default Parser;
+16
View File
@@ -0,0 +1,16 @@
import { Parser, CsvParser } from "./";
import { DatasetType } from "../dataset";
class ParserFactory {
/**
* Get the parser corresponding to the dataset type
* @param fileType The type of the dataset
* @returns The parser corresponding to the dataset type
*/
static getParser(fileType: DatasetType): Parser {
if (fileType === DatasetType.CSV) return CsvParser.instance;
throw new Error("Unsupported file type");
}
}
export default ParserFactory;
+4
View File
@@ -0,0 +1,4 @@
export { default as ParserFactory } from "./ParserFactory";
export { default as Parser } from "./Parser";
export { default as CsvParser } from "./CsvParser";