feat: Use MongoDB to save the datalake

This commit is contained in:
Lucàs
2024-11-11 23:09:59 +01:00
parent cde872ca55
commit 6975765e18
36 changed files with 862 additions and 517 deletions
+110 -173
View File
@@ -1,72 +1,67 @@
import { pipeline, Transform, Writable } from "node:stream";
import { promisify } from "node:util";
import { Validator } from "jsonschema";
import FileService from "../FileService";
import { ArchiveFactory, ArchiveType } from "../archive";
import { ParserFactory, ParserType } from "../parser";
import { Data, InvalidData} from "../data";
import { AppDataSource } from "../../AppDataSource";
import { EntityManager, EntityTarget, Repository } from "typeorm";
type DatasetParams = {
id: string;
dataConstructor: (params: any) => Data;
dataType: Data;
source: string;
file: string;
archiveType: ArchiveType;
parserType: ParserType;
options?: DatasetOptions;
};
type DatasetOptions = {
parser?: any;
};
import ExtractorFactory, {
ExtractorType,
} from "../archive_extractor/ExtractorFactory";
import Extractor from "../archive_extractor/Extractor";
import Parser from "../parser/Parser";
import { getDatabaseConnexion } from "../DataLake";
import { validate } from "jsonschema";
/**
* Represents a dataset that can be loaded and queried
*/
class Dataset<D extends Data> {
export default class Dataset {
readonly id: string;
readonly source: string;
readonly file: string;
readonly archiveType: ArchiveType;
readonly parserType: ParserType;
readonly dataConstructor: (params: any) => Data;
readonly dataType: Data;
private options?: DatasetOptions;
readonly uri: string;
readonly endpoint: string;
private extractor: Extractor = ExtractorFactory.getExtractor(
ExtractorType.NONE
);
private parser: Parser = ParserFactory.getParser(ParserType.CSV);
private extractorOptions: any;
private parserOptions: any;
private dataTransformer?: Transform;
/**
* Create a new dataset instance
* @param id - The unique identifier of the dataset
* @param source - The URL of the dataset
* @param file - The name of the file in the archive
* @param dataType - The constructor of the data class
* @param archiveType - The type of the archive
* @param dataConstructor - The type of the dataset
* @param parserType
* @param options - Additional options for the dataset
*/
constructor({
id,
source,
file,
dataConstructor,
dataType,
archiveType,
parserType,
options,
}: DatasetParams) {
uri,
endpoint,
}: {
id: string;
uri: string;
endpoint: string;
}) {
this.id = id;
this.dataConstructor = dataConstructor;
this.source = source;
this.file = file;
this.dataType = dataType;
this.archiveType = archiveType;
this.parserType = parserType;
this.options = options;
this.uri = uri;
this.endpoint = endpoint;
}
setExtractor(type: ExtractorType, options: any): this {
this.extractor = ExtractorFactory.getExtractor(type);
this.extractorOptions = options;
return this;
}
setParser(type: ParserType, options: any): this {
this.parser = ParserFactory.getParser(type);
this.parserOptions = options;
return this;
}
setDataTransformer(dataTransformer: Transform): this {
this.dataTransformer = dataTransformer;
return this;
}
/**
@@ -75,139 +70,81 @@ class Dataset<D extends Data> {
* @throws {Error} - If the dataset cannot be loaded
*/
public async load(): Promise<void> {
// const repository: Repository<T> = AppDataSource.getRepository<T>(Data);
if (!this.dataTransformer) {
throw new Error("Data transformer is not set");
}
// if ((await repository.count()) > 0) {
// console.log(`Already cached: ${this.source}`);
// return;
// }
const archive = ArchiveFactory.getArchive(this.archiveType);
const parser = ParserFactory.getParser(this.parserType);
const db = await getDatabaseConnexion();
const collection = db.collection(this.id);
const count = await collection.countDocuments();
if (count > 0) {
console.log(`Dataset ${this.id} already loaded`);
return;
}
const pipelineAsync = promisify(pipeline);
console.log(`Download: ${this.source}`);
console.log(`Download: ${this.uri}`);
// Start transaction
await AppDataSource.manager.transaction(async (manager) => {
await pipelineAsync(
await FileService.getFileStream(this.source),
archive.extract(this.file),
parser.parse(this.options?.parser),
Dataset.transformToData(this.dataConstructor, manager),
new Writable({
objectMode: true,
write(chunk, _, callback) {
callback();
},
})
)
.then(() => {
console.log(`Loaded: ${this.source}`);
})
.catch((err) => {
console.error(`Failed to load dataset: ${this.source}`);
throw err;
});
});
const datasetId = this.id;
const batch = 1000;
const buffer: any[] = [];
await pipelineAsync(
await FileService.getFileStream(this.uri),
this.extractor.extract(this.extractorOptions),
this.parser.parse(this.parserOptions),
this.dataTransformer,
new Writable({
objectMode: true,
async write(chunk, _, callback) {
buffer.push(chunk);
if (buffer.length < batch) return callback();
const db = await getDatabaseConnexion();
db.collection(datasetId)
.insertMany(buffer.splice(0, batch))
.then(() => callback())
.catch((error) => callback(error));
},
})
)
.then(async () => {
if (buffer.length > 0) {
await db.collection(datasetId).insertMany(buffer);
}
})
.then(() => console.log(`Dataset ${this.id} loaded`))
.catch((error) =>
console.error(`Error loading dataset ${this.id}: ${error.message}`)
);
}
private static transformToData(
dataType: (params: any) => Data,
manager: EntityManager
): Transform {
return new Transform({
objectMode: true,
async transform(chunk: object, _, callback) {
try {
const data: Data = dataType(chunk);
await manager.save(data);
callback(null, JSON.stringify(data) + "\n");
} catch (err: any) {
if (err instanceof InvalidData) {
callback(null, "");
} else callback(err);
}
},
});
}
async get(length: number = 10, schema: any) {
const db = await getDatabaseConnexion();
/**
* Get a number of data entries from the dataset
* @param length - The number of data entries to get (default: 10)
* @param schema - Schema of the expected data returned
*/
public async get(length: number = 10, schema: {}): Promise<any[]> {
const dataRepository = AppDataSource.manager.getRepository<T>(
this.dataType as EntityTarget<T>
);
// Convert JSON schema to projection
const projection = { _id: 0 };
if (schema?.properties) {
for (const field in schema.properties) {
// @ts-ignore
projection[field] = 1;
}
}
const datas = await dataRepository
.createQueryBuilder("data")
.orderBy("RAND()") // Fonction RAND() pour randomiser l'ordre
.limit(length) // Limiter le nombre de résultats
.getMany();
const datas = await db
.collection(this.id)
.aggregate([{ $project: projection }, { $sample: { size: length } }])
.limit(length)
.toArray();
return new Promise((resolve, reject) => {
let count: number = 0;
const results: Data[] = [];
const validator = new Validator();
datas.forEach((data) => {
let randomizedData = D.fromRaw(data);
// this.dataConstructor(data);
if (validator.validate(randomizedData, schema)) {
results.push(randomizedData);
count++;
}
});
return resolve(results);
});
// //
// // const stream = fs.createReadStream(this.cachePath, { encoding: "utf8" });
// // const rl = readline.createInterface({
// // input: stream,
// // crlfDelay: Infinity,
// // });
// //
// //
// // rl.on("line", (line) => {
// // if (count < length) {
// // const data: Data = JSON.parse(line) as Data;
// // if (validator.validate(data, schema)) {
// // results.push(data);
// // count++;
// // }
// //
// // // // Pour chaque objet, récupérer l'objet et vérifier que le schéma est valide
// //
// // // schema.input?.forEach((input: string, index: number) => {
// // // obj[input] = data.input[index];
// // // });
// // // schema.output?.forEach((output, index) => {
// // // obj[output] = data.output[index];
// // // });
// //
// // // // Add the object to the results
// // // count++;
// // } else {
// // rl.close(); // Fermer le flux si on a atteint les n objets
// // }
// // });
// //
// // // Quand le flux est terminé ou a été fermé.
// // rl.on("close", () => {
// // resolve(results); // Renvoie les n objets lus
// // });
// //
// // // Gérer les erreurs du flux de lecture
// // rl.on("error", (err) => {
// // reject(err);
// // });
// // });
return datas
.map((data) => {
const res = validate(data, schema);
if (!res.valid) return null;
return data;
})
.filter((data) => data !== null);
}
}
export default Dataset;