fix: Stream does not close on end

This commit is contained in:
Lucàs
2024-10-08 13:32:14 +02:00
parent 5e53d9d914
commit 93ee52ddc2
7 changed files with 137 additions and 56 deletions
+3 -5
View File
@@ -10,11 +10,9 @@ class FileService {
* @return Promise<Readable> - The compressed file stream
*/
public static async getFileStream(url: string): Promise<Readable> {
return axios({
method: "GET",
url: url,
responseType: "stream",
}).then((response) => response.data);
return axios({ method: "GET", url, responseType: "stream" }).then(
(response) => response.data
);
}
/**
+23 -5
View File
@@ -11,13 +11,18 @@ import { ParserFactory } from "../parser";
import { DatasetType } from "./";
import { Data, DataConstructor } from "../data";
type DatasetOptions = {
type DatasetParams = {
id: string;
dataType: DataConstructor<Data>;
source: string;
file: string;
archiveType: ArchiveType;
datasetType: DatasetType;
options?: DatasetOptions;
};
type DatasetOptions = {
parser?: any;
};
/**
@@ -31,6 +36,7 @@ class Dataset {
readonly datasetType: DatasetType;
readonly cachePath: string;
private dataType: DataConstructor<Data>;
private options?: DatasetOptions;
/**
* Create a new dataset instance
@@ -40,6 +46,7 @@ class Dataset {
* @param dataType - The constructor of the data class
* @param archiveType - The type of the archive
* @param datasetType - The type of the dataset
* @param options - Additional options for the dataset
*/
constructor({
id,
@@ -48,13 +55,15 @@ class Dataset {
dataType,
archiveType,
datasetType,
}: DatasetOptions) {
options,
}: DatasetParams) {
this.id = id;
this.dataType = dataType;
this.source = source;
this.file = file;
this.archiveType = archiveType;
this.datasetType = datasetType;
this.options = options;
this.cachePath = CacheService.getCachePath(this.source, ".json");
}
@@ -79,18 +88,27 @@ class Dataset {
await pipelineAsync(
await FileService.getFileStream(this.source),
archive.extract(this.file),
parser.parse(),
parser.parse(this.options?.parser),
Dataset.transformToData(this.dataType),
FileService.createWriteStream(this.cachePath)
);
)
.then(() => {
console.log(`Loaded: ${this.source}`);
})
.catch((err) => {
console.error(`Failed to load dataset: ${this.source}`);
FileService.deleteFile(this.cachePath);
throw err;
});
}
private static transformToData(dataType: DataConstructor<Data>): Transform {
return new Transform({
objectMode: true,
transform(chunk: object, _, callback) {
const data: Data = new dataType(JSON.parse(chunk.toString()));
const data: Data = new dataType(chunk);
this.push(JSON.stringify(data) + "\n");
callback(null, JSON.stringify(data) + "\n");
},
});
+11
View File
@@ -12,6 +12,11 @@ class DatasetCollection {
dataType: NudgerData,
archiveType: ArchiveType.ZIP,
datasetType: DatasetType.CSV,
options: {
parser: {
delimiter: ",",
},
},
}),
new Dataset({
id: "openfoodfacts",
@@ -21,6 +26,12 @@ class DatasetCollection {
dataType: OpenFoodFactsData,
archiveType: ArchiveType.GZIP,
datasetType: DatasetType.CSV,
options: {
parser: {
delimiter: "\t",
quote: null,
},
},
}),
];
+11 -4
View File
@@ -1,14 +1,21 @@
import { Parser } from "./";
import { Duplex } from "node:stream";
import csv from "csvtojson";
// import csv from "csvtojson";
import * as csv from "fast-csv";
class CsvParser implements Parser {
public static instance: CsvParser = new CsvParser();
public parse(): Duplex {
return csv({
delimiter: "auto",
public parse(options: any): Duplex {
return csv.parse({
headers: true,
objectMode: true,
trim: true,
...options,
});
// return csv({
// delimiter: "auto",
// });
}
}
+1 -1
View File
@@ -4,7 +4,7 @@ interface Parser {
/**
* Parse the content of the stream into JSON objects
*/
parse(): Duplex;
parse(options: any): Duplex;
}
export default Parser;