有時(shí)您需要將大量數(shù)據(jù)導(dǎo)出為 JSON 到一個(gè)文件。也許是“將所有數(shù)據(jù)導(dǎo)出到 JSON”,或者 GDPR“可移植性權(quán)利”,您實(shí)際上需要這樣做。
與任何大型數(shù)據(jù)集一樣,您不能將其全部放入內(nèi)存并將其寫入文件。這需要一段時(shí)間,它從數(shù)據(jù)庫(kù)中讀取大量條目,您需要小心不要使此類導(dǎo)出使整個(gè)系統(tǒng)過(guò)載或耗盡內(nèi)存。
幸運(yùn)的是,在 ?JacksonSequenceWriter
?和可選的管道流的幫助下,這樣做相當(dāng)簡(jiǎn)單。這是它的樣子:
private ObjectMapper jsonMapper =new ObjectMapper();
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@Async
public ListenableFuture<Boolean> export(UUID customerId) {
try (PipedInputStream in =new PipedInputStream();
PipedOutputStream pipedOut =new PipedOutputStream(in);
GZIPOutputStream out =new GZIPOutputStream(pipedOut)) {
Stopwatch stopwatch = Stopwatch.createStarted();
ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();
try(SequenceWriter sequenceWriter = writer.writeValues(out)) {
sequenceWriter.init(true);
Future<?> storageFuture = executorService.submit(() ->
storageProvider.storeFile(getFilePath(customerId), in));
int batchCounter =0;
while (true) {
List<Record> batch = readDatabaseBatch(batchCounter++);
for (Record record : batch) {
sequenceWriter.write(entry);
}
if (batch.isEmpty()) {
// if there are no more batches, stop.
break;
}
}
// wait for storing to complete
storageFuture.get();
// send the customer a notification and a download link
notifyCustomer(customerId);
}
logger.info("Exporting took {} seconds", stopwatch.stop().elapsed(TimeUnit.SECONDS));
return AsyncResult.forValue(true);
}catch (Exception ex) {
logger.error("Failed to export data", ex);
return AsyncResult.forValue(false);
}
}
代碼做了幾件事:
- 使用 ?
SequenceWriter
?連續(xù)寫入記錄。它使用 ?OutputStream
?進(jìn)行初始化,所有內(nèi)容都寫入其中。這可以是簡(jiǎn)單的 ?FileOutputStream
?,也可以是下面討論的管道流。請(qǐng)注意,這里的命名有點(diǎn)誤導(dǎo)——?writeValues(out)
?聽(tīng)起來(lái)你是在指示作者現(xiàn)在寫點(diǎn)什么;相反,它將其配置為稍后使用特定的流。 - 用?
SequenceWriter
?初始化?true
?,意思是“包裹在數(shù)組中”。您正在編寫許多相同的記錄,因此它們應(yīng)該在最終的 JSON 中表示一個(gè)數(shù)組。 - 使用?
PipedOutputStream
?和?PipedInputStream
?將?SequenceWriter
?鏈接到?InputStream
?然后傳遞給存儲(chǔ)服務(wù)的 ?an
?。如果我們明確地處理文件,就沒(méi)有必要了——只需傳遞 ?aFileOutputStream
?就可以了。但是,您可能希望以不同的方式存儲(chǔ)文件,例如在 Amazon S3 中,并且 ?putObject
?調(diào)用需要一個(gè) ?InputStream
?,從中讀取數(shù)據(jù)并將其存儲(chǔ)在 ?S3
?中。因此,實(shí)際上,您正在寫入直接寫入 ?InputStream
?的 ?OutputStream
?,當(dāng)嘗試從中讀取時(shí),會(huì)將所有內(nèi)容寫入另一個(gè) ?OutputStream
? - 存儲(chǔ)文件是在單獨(dú)的線程中調(diào)用的,這樣寫入文件不會(huì)阻塞當(dāng)前線程,其目的是從數(shù)據(jù)庫(kù)中讀取。同樣,如果使用簡(jiǎn)單的 ?
FileOutputStream
?,則不需要這樣做。 - 整個(gè)方法被標(biāo)記為?
@Async (spring)
?以便它不會(huì)阻塞執(zhí)行——它在準(zhǔn)備好時(shí)被調(diào)用并完成(使用具有有限線程池的內(nèi)部 Spring 執(zhí)行程序服務(wù)) - 數(shù)據(jù)庫(kù)批量讀取代碼這里沒(méi)有顯示,因?yàn)樗驍?shù)據(jù)庫(kù)而異。關(guān)鍵是,您應(yīng)該批量獲取數(shù)據(jù),而不是 ?
SELECT * FROM X
?。 - ?
OutputStream
?被包裹在 ?GZIPOutputStream
?中,因?yàn)橄?JSON 這樣帶有重復(fù)元素的文本文件可以從壓縮中顯著受益
主要工作是由 Jackson 的 SequenceWriter 完成的,需要清楚的點(diǎn)是 - 不要假設(shè)您的數(shù)據(jù)會(huì)適合內(nèi)存。它幾乎從不這樣做,因此以批處理和增量寫入的方式進(jìn)行所有操作。