[Draft] Stockage des données sur MariaDB avec TypeORM

This commit is contained in:
Lucàs
2024-10-23 17:07:23 +02:00
parent c95c92e987
commit cde872ca55
14 changed files with 323 additions and 144 deletions
+119 -75
View File
@@ -1,18 +1,19 @@
import { pipeline, Transform } from "node:stream";
import { pipeline, Transform, Writable } from "node:stream";
import { promisify } from "node:util";
import * as fs from "node:fs";
import * as readline from "node:readline";
import { Validator } from "jsonschema";
import CacheService from "../CacheService";
import FileService from "../FileService";
import { ArchiveFactory, ArchiveType } from "../archive";
import { ParserFactory, ParserType } from "../parser";
import { Data, DataConstructor } from "../data";
import { Data, InvalidData} from "../data";
import { AppDataSource } from "../../AppDataSource";
import { EntityManager, EntityTarget, Repository } from "typeorm";
type DatasetParams = {
id: string;
dataType: DataConstructor<Data>;
dataConstructor: (params: any) => Data;
dataType: Data;
source: string;
file: string;
archiveType: ArchiveType;
@@ -27,14 +28,14 @@ type DatasetOptions = {
/**
* Represents a dataset that can be loaded and queried
*/
class Dataset {
class Dataset<D extends Data> {
readonly id: string;
readonly source: string;
readonly file: string;
readonly archiveType: ArchiveType;
readonly parserType: ParserType;
readonly cachePath: string;
private dataType: DataConstructor<Data>;
readonly dataConstructor: (params: any) => Data;
readonly dataType: Data;
private options?: DatasetOptions;
/**
@@ -44,27 +45,28 @@ class Dataset {
* @param file - The name of the file in the archive
* @param dataType - The constructor of the data class
* @param archiveType - The type of the archive
* @param datasetType - The type of the dataset
* @param dataConstructor - The type of the dataset
* @param parserType
* @param options - Additional options for the dataset
*/
constructor({
id,
source,
file,
dataConstructor,
dataType,
archiveType,
parserType,
options,
}: DatasetParams) {
this.id = id;
this.dataType = dataType;
this.dataConstructor = dataConstructor;
this.source = source;
this.file = file;
this.dataType = dataType;
this.archiveType = archiveType;
this.parserType = parserType;
this.options = options;
this.cachePath = CacheService.getCachePath(this.source, ".json");
}
/**
@@ -73,10 +75,12 @@ class Dataset {
* @throws {Error} - If the dataset cannot be loaded
*/
public async load(): Promise<void> {
if (CacheService.isCached(this.source, ".json")) {
console.log(`Already cached: ${this.source}`);
return;
}
// const repository: Repository<T> = AppDataSource.getRepository<T>(Data);
// if ((await repository.count()) > 0) {
// console.log(`Already cached: ${this.source}`);
// return;
// }
const archive = ArchiveFactory.getArchive(this.archiveType);
const parser = ParserFactory.getParser(this.parserType);
@@ -84,31 +88,47 @@ class Dataset {
const pipelineAsync = promisify(pipeline);
console.log(`Download: ${this.source}`);
await pipelineAsync(
await FileService.getFileStream(this.source),
archive.extract(this.file),
parser.parse(this.options?.parser),
Dataset.transformToData(this.dataType),
FileService.createWriteStream(this.cachePath)
)
.then(() => {
console.log(`Loaded: ${this.source}`);
})
.catch((err) => {
console.error(`Failed to load dataset: ${this.source}`);
FileService.deleteFile(this.cachePath);
throw err;
});
// Start transaction
await AppDataSource.manager.transaction(async (manager) => {
await pipelineAsync(
await FileService.getFileStream(this.source),
archive.extract(this.file),
parser.parse(this.options?.parser),
Dataset.transformToData(this.dataConstructor, manager),
new Writable({
objectMode: true,
write(chunk, _, callback) {
callback();
},
})
)
.then(() => {
console.log(`Loaded: ${this.source}`);
})
.catch((err) => {
console.error(`Failed to load dataset: ${this.source}`);
throw err;
});
});
}
private static transformToData(dataType: DataConstructor<Data>): Transform {
private static transformToData(
dataType: (params: any) => Data,
manager: EntityManager
): Transform {
return new Transform({
objectMode: true,
transform(chunk: object, _, callback) {
const data: Data = new dataType(chunk);
this.push(JSON.stringify(data) + "\n");
callback(null, JSON.stringify(data) + "\n");
async transform(chunk: object, _, callback) {
try {
const data: Data = dataType(chunk);
await manager.save(data);
callback(null, JSON.stringify(data) + "\n");
} catch (err: any) {
if (err instanceof InvalidData) {
callback(null, "");
} else callback(err);
}
},
});
}
@@ -118,51 +138,75 @@ class Dataset {
* @param length - The number of data entries to get (default: 10)
* @param schema - Schema of the expected data returned
*/
public get(
length: number = 10,
schema: { input: string[] | undefined; output: string[] | undefined }
): Promise<any[]> {
public async get(length: number = 10, schema: {}): Promise<any[]> {
const dataRepository = AppDataSource.manager.getRepository<T>(
this.dataType as EntityTarget<T>
);
const datas = await dataRepository
.createQueryBuilder("data")
.orderBy("RAND()") // Fonction RAND() pour randomiser l'ordre
.limit(length) // Limiter le nombre de résultats
.getMany();
return new Promise((resolve, reject) => {
let count: number = 0;
const results: any[] = [];
const results: Data[] = [];
const validator = new Validator();
const stream = fs.createReadStream(this.cachePath, { encoding: "utf8" });
const rl = readline.createInterface({
input: stream,
crlfDelay: Infinity,
});
datas.forEach((data) => {
let randomizedData = D.fromRaw(data);
// this.dataConstructor(data);
rl.on("line", (line) => {
if (count < length) {
const data: Data = JSON.parse(line) as Data;
// Create an object with the input and output values according to the schema
const obj: any = {};
schema.input?.forEach((input: string, index: number) => {
obj[input] = data.input[index];
});
schema.output?.forEach((output, index) => {
obj[output] = data.output[index];
});
// Add the object to the results
results.push(obj);
if (validator.validate(randomizedData, schema)) {
results.push(randomizedData);
count++;
} else {
rl.close(); // Fermer le flux si on a atteint les n objets
}
});
// Quand le flux est terminé ou a été fermé.
rl.on("close", () => {
resolve(results); // Renvoie les n objets lus
});
// Gérer les erreurs du flux de lecture
rl.on("error", (err) => {
reject(err);
});
return resolve(results);
});
// //
// // const stream = fs.createReadStream(this.cachePath, { encoding: "utf8" });
// // const rl = readline.createInterface({
// // input: stream,
// // crlfDelay: Infinity,
// // });
// //
// //
// // rl.on("line", (line) => {
// // if (count < length) {
// // const data: Data = JSON.parse(line) as Data;
// // if (validator.validate(data, schema)) {
// // results.push(data);
// // count++;
// // }
// //
// // // // Pour chaque objet, récupérer l'objet et vérifier que le schéma est valide
// //
// // // schema.input?.forEach((input: string, index: number) => {
// // // obj[input] = data.input[index];
// // // });
// // // schema.output?.forEach((output, index) => {
// // // obj[output] = data.output[index];
// // // });
// //
// // // // Add the object to the results
// // // count++;
// // } else {
// // rl.close(); // Fermer le flux si on a atteint les n objets
// // }
// // });
// //
// // // Quand le flux est terminé ou a été fermé.
// // rl.on("close", () => {
// // resolve(results); // Renvoie les n objets lus
// // });
// //
// // // Gérer les erreurs du flux de lecture
// // rl.on("error", (err) => {
// // reject(err);
// // });
// // });
}
}