A-New-Day-001's picture
Upload 1591 files
be5030f
"use strict";
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Object.defineProperty(exports, "__esModule", { value: true });
const vector_1 = require("../vector");
const enum_1 = require("../enum");
const file_1 = require("./metadata/file");
const adapters_1 = require("../io/adapters");
const stream_1 = require("../io/stream");
const file_2 = require("../io/file");
const vectorloader_1 = require("../visitor/vectorloader");
const recordbatch_1 = require("../recordbatch");
const interfaces_1 = require("../io/interfaces");
const message_1 = require("./message");
const compat_1 = require("../util/compat");
class RecordBatchReader extends interfaces_1.ReadableInterop {
constructor(impl) {
super();
this._impl = impl;
}
get closed() { return this._impl.closed; }
get schema() { return this._impl.schema; }
get autoDestroy() { return this._impl.autoDestroy; }
get dictionaries() { return this._impl.dictionaries; }
get numDictionaries() { return this._impl.numDictionaries; }
get numRecordBatches() { return this._impl.numRecordBatches; }
get footer() { return this._impl.isFile() ? this._impl.footer : null; }
isSync() { return this._impl.isSync(); }
isAsync() { return this._impl.isAsync(); }
isFile() { return this._impl.isFile(); }
isStream() { return this._impl.isStream(); }
next() {
return this._impl.next();
}
throw(value) {
return this._impl.throw(value);
}
return(value) {
return this._impl.return(value);
}
cancel() {
return this._impl.cancel();
}
reset(schema) {
this._impl.reset(schema);
this._DOMStream = undefined;
this._nodeStream = undefined;
return this;
}
open(options) {
const opening = this._impl.open(options);
return compat_1.isPromise(opening) ? opening.then(() => this) : this;
}
readRecordBatch(index) {
return this._impl.isFile() ? this._impl.readRecordBatch(index) : null;
}
[Symbol.iterator]() {
return this._impl[Symbol.iterator]();
}
[Symbol.asyncIterator]() {
return this._impl[Symbol.asyncIterator]();
}
toDOMStream() {
return adapters_1.default.toDOMStream((this.isSync()
? { [Symbol.iterator]: () => this }
: { [Symbol.asyncIterator]: () => this }));
}
toNodeStream() {
return adapters_1.default.toNodeStream((this.isSync()
? { [Symbol.iterator]: () => this }
: { [Symbol.asyncIterator]: () => this }), { objectMode: true });
}
/** @nocollapse */
// @ts-ignore
static throughNode(options) {
throw new Error(`"throughNode" not available in this environment`);
}
/** @nocollapse */
static throughDOM(
// @ts-ignore
writableStrategy,
// @ts-ignore
readableStrategy) {
throw new Error(`"throughDOM" not available in this environment`);
}
/** @nocollapse */
static from(source) {
if (source instanceof RecordBatchReader) {
return source;
}
else if (compat_1.isArrowJSON(source)) {
return fromArrowJSON(source);
}
else if (compat_1.isFileHandle(source)) {
return fromFileHandle(source);
}
else if (compat_1.isPromise(source)) {
return (async () => await RecordBatchReader.from(await source))();
}
else if (compat_1.isFetchResponse(source) || compat_1.isReadableDOMStream(source) || compat_1.isReadableNodeStream(source) || compat_1.isAsyncIterable(source)) {
return fromAsyncByteStream(new stream_1.AsyncByteStream(source));
}
return fromByteStream(new stream_1.ByteStream(source));
}
/** @nocollapse */
static readAll(source) {
if (source instanceof RecordBatchReader) {
return source.isSync() ? readAllSync(source) : readAllAsync(source);
}
else if (compat_1.isArrowJSON(source) || ArrayBuffer.isView(source) || compat_1.isIterable(source) || compat_1.isIteratorResult(source)) {
return readAllSync(source);
}
return readAllAsync(source);
}
}
exports.RecordBatchReader = RecordBatchReader;
//
// Since TS is a structural type system, we define the following subclass stubs
// so that concrete types exist to associate with with the interfaces below.
//
// The implementation for each RecordBatchReader is hidden away in the set of
// `RecordBatchReaderImpl` classes in the second half of this file. This allows
// us to export a single RecordBatchReader class, and swap out the impl based
// on the io primitives or underlying arrow (JSON, file, or stream) at runtime.
//
// Async/await makes our job a bit harder, since it forces everything to be
// either fully sync or fully async. This is why the logic for the reader impls
// has been duplicated into both sync and async variants. Since the RBR
// delegates to its impl, an RBR with an AsyncRecordBatchFileReaderImpl for
// example will return async/await-friendly Promises, but one with a (sync)
// RecordBatchStreamReaderImpl will always return values. Nothing should be
// different about their logic, aside from the async handling. This is also why
// this code looks highly structured, as it should be nearly identical and easy
// to follow.
//
/** @ignore */
class RecordBatchStreamReader extends RecordBatchReader {
constructor(_impl) {
super(_impl);
this._impl = _impl;
}
[Symbol.iterator]() { return this._impl[Symbol.iterator](); }
async *[Symbol.asyncIterator]() { yield* this[Symbol.iterator](); }
}
exports.RecordBatchStreamReader = RecordBatchStreamReader;
/** @ignore */
class AsyncRecordBatchStreamReader extends RecordBatchReader {
constructor(_impl) {
super(_impl);
this._impl = _impl;
}
[Symbol.iterator]() { throw new Error(`AsyncRecordBatchStreamReader is not Iterable`); }
[Symbol.asyncIterator]() { return this._impl[Symbol.asyncIterator](); }
}
exports.AsyncRecordBatchStreamReader = AsyncRecordBatchStreamReader;
/** @ignore */
class RecordBatchFileReader extends RecordBatchStreamReader {
constructor(_impl) {
super(_impl);
this._impl = _impl;
}
}
exports.RecordBatchFileReader = RecordBatchFileReader;
/** @ignore */
class AsyncRecordBatchFileReader extends AsyncRecordBatchStreamReader {
constructor(_impl) {
super(_impl);
this._impl = _impl;
}
}
exports.AsyncRecordBatchFileReader = AsyncRecordBatchFileReader;
/** @ignore */
class RecordBatchReaderImpl {
constructor(dictionaries = new Map()) {
this.closed = false;
this.autoDestroy = true;
this._dictionaryIndex = 0;
this._recordBatchIndex = 0;
this.dictionaries = dictionaries;
}
get numDictionaries() { return this._dictionaryIndex; }
get numRecordBatches() { return this._recordBatchIndex; }
isSync() { return false; }
isAsync() { return false; }
isFile() { return false; }
isStream() { return false; }
reset(schema) {
this._dictionaryIndex = 0;
this._recordBatchIndex = 0;
this.schema = schema;
this.dictionaries = new Map();
return this;
}
_loadRecordBatch(header, body) {
return new recordbatch_1.RecordBatch(this.schema, header.length, this._loadVectors(header, body, this.schema.fields));
}
_loadDictionaryBatch(header, body) {
const { id, isDelta, data } = header;
const { dictionaries, schema } = this;
const dictionary = dictionaries.get(id);
if (isDelta || !dictionary) {
const type = schema.dictionaries.get(id);
return (dictionary && isDelta ? dictionary.concat(vector_1.Vector.new(this._loadVectors(data, body, [type])[0])) :
vector_1.Vector.new(this._loadVectors(data, body, [type])[0]));
}
return dictionary;
}
_loadVectors(header, body, types) {
return new vectorloader_1.VectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types);
}
}
/** @ignore */
class RecordBatchStreamReaderImpl extends RecordBatchReaderImpl {
constructor(source, dictionaries) {
super(dictionaries);
this._reader = !compat_1.isArrowJSON(source)
? new message_1.MessageReader(this._handle = source)
: new message_1.JSONMessageReader(this._handle = source);
}
isSync() { return true; }
isStream() { return true; }
[Symbol.iterator]() {
return this;
}
cancel() {
if (!this.closed && (this.closed = true)) {
this.reset()._reader.return();
this._reader = null;
this.dictionaries = null;
}
}
open(options) {
if (!this.closed) {
this.autoDestroy = shouldAutoDestroy(this, options);
if (!(this.schema || (this.schema = this._reader.readSchema()))) {
this.cancel();
}
}
return this;
}
throw(value) {
if (!this.closed && this.autoDestroy && (this.closed = true)) {
return this.reset()._reader.throw(value);
}
return interfaces_1.ITERATOR_DONE;
}
return(value) {
if (!this.closed && this.autoDestroy && (this.closed = true)) {
return this.reset()._reader.return(value);
}
return interfaces_1.ITERATOR_DONE;
}
next() {
if (this.closed) {
return interfaces_1.ITERATOR_DONE;
}
let message, { _reader: reader } = this;
while (message = this._readNextMessageAndValidate()) {
if (message.isSchema()) {
this.reset(message.header());
}
else if (message.isRecordBatch()) {
this._recordBatchIndex++;
const header = message.header();
const buffer = reader.readMessageBody(message.bodyLength);
const recordBatch = this._loadRecordBatch(header, buffer);
return { done: false, value: recordBatch };
}
else if (message.isDictionaryBatch()) {
this._dictionaryIndex++;
const header = message.header();
const buffer = reader.readMessageBody(message.bodyLength);
const vector = this._loadDictionaryBatch(header, buffer);
this.dictionaries.set(header.id, vector);
}
}
if (this.schema && this._recordBatchIndex === 0) {
this._recordBatchIndex++;
return { done: false, value: new recordbatch_1._InternalEmptyPlaceholderRecordBatch(this.schema) };
}
return this.return();
}
_readNextMessageAndValidate(type) {
return this._reader.readMessage(type);
}
}
/** @ignore */
class AsyncRecordBatchStreamReaderImpl extends RecordBatchReaderImpl {
constructor(source, dictionaries) {
super(dictionaries);
this._reader = new message_1.AsyncMessageReader(this._handle = source);
}
isAsync() { return true; }
isStream() { return true; }
[Symbol.asyncIterator]() {
return this;
}
async cancel() {
if (!this.closed && (this.closed = true)) {
await this.reset()._reader.return();
this._reader = null;
this.dictionaries = null;
}
}
async open(options) {
if (!this.closed) {
this.autoDestroy = shouldAutoDestroy(this, options);
if (!(this.schema || (this.schema = (await this._reader.readSchema())))) {
await this.cancel();
}
}
return this;
}
async throw(value) {
if (!this.closed && this.autoDestroy && (this.closed = true)) {
return await this.reset()._reader.throw(value);
}
return interfaces_1.ITERATOR_DONE;
}
async return(value) {
if (!this.closed && this.autoDestroy && (this.closed = true)) {
return await this.reset()._reader.return(value);
}
return interfaces_1.ITERATOR_DONE;
}
async next() {
if (this.closed) {
return interfaces_1.ITERATOR_DONE;
}
let message, { _reader: reader } = this;
while (message = await this._readNextMessageAndValidate()) {
if (message.isSchema()) {
await this.reset(message.header());
}
else if (message.isRecordBatch()) {
this._recordBatchIndex++;
const header = message.header();
const buffer = await reader.readMessageBody(message.bodyLength);
const recordBatch = this._loadRecordBatch(header, buffer);
return { done: false, value: recordBatch };
}
else if (message.isDictionaryBatch()) {
this._dictionaryIndex++;
const header = message.header();
const buffer = await reader.readMessageBody(message.bodyLength);
const vector = this._loadDictionaryBatch(header, buffer);
this.dictionaries.set(header.id, vector);
}
}
if (this.schema && this._recordBatchIndex === 0) {
this._recordBatchIndex++;
return { done: false, value: new recordbatch_1._InternalEmptyPlaceholderRecordBatch(this.schema) };
}
return await this.return();
}
async _readNextMessageAndValidate(type) {
return await this._reader.readMessage(type);
}
}
/** @ignore */
class RecordBatchFileReaderImpl extends RecordBatchStreamReaderImpl {
constructor(source, dictionaries) {
super(source instanceof file_2.RandomAccessFile ? source : new file_2.RandomAccessFile(source), dictionaries);
}
get footer() { return this._footer; }
get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; }
get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; }
isSync() { return true; }
isFile() { return true; }
open(options) {
if (!this.closed && !this._footer) {
this.schema = (this._footer = this._readFooter()).schema;
for (const block of this._footer.dictionaryBatches()) {
block && this._readDictionaryBatch(this._dictionaryIndex++);
}
}
return super.open(options);
}
readRecordBatch(index) {
if (this.closed) {
return null;
}
if (!this._footer) {
this.open();
}
const block = this._footer && this._footer.getRecordBatch(index);
if (block && this._handle.seek(block.offset)) {
const message = this._reader.readMessage(enum_1.MessageHeader.RecordBatch);
if (message && message.isRecordBatch()) {
const header = message.header();
const buffer = this._reader.readMessageBody(message.bodyLength);
const recordBatch = this._loadRecordBatch(header, buffer);
return recordBatch;
}
}
return null;
}
_readDictionaryBatch(index) {
const block = this._footer && this._footer.getDictionaryBatch(index);
if (block && this._handle.seek(block.offset)) {
const message = this._reader.readMessage(enum_1.MessageHeader.DictionaryBatch);
if (message && message.isDictionaryBatch()) {
const header = message.header();
const buffer = this._reader.readMessageBody(message.bodyLength);
const vector = this._loadDictionaryBatch(header, buffer);
this.dictionaries.set(header.id, vector);
}
}
}
_readFooter() {
const { _handle } = this;
const offset = _handle.size - message_1.magicAndPadding;
const length = _handle.readInt32(offset);
const buffer = _handle.readAt(offset - length, length);
return file_1.Footer.decode(buffer);
}
_readNextMessageAndValidate(type) {
if (!this._footer) {
this.open();
}
if (this._footer && this._recordBatchIndex < this.numRecordBatches) {
const block = this._footer && this._footer.getRecordBatch(this._recordBatchIndex);
if (block && this._handle.seek(block.offset)) {
return this._reader.readMessage(type);
}
}
return null;
}
}
/** @ignore */
class AsyncRecordBatchFileReaderImpl extends AsyncRecordBatchStreamReaderImpl {
constructor(source, ...rest) {
const byteLength = typeof rest[0] !== 'number' ? rest.shift() : undefined;
const dictionaries = rest[0] instanceof Map ? rest.shift() : undefined;
super(source instanceof file_2.AsyncRandomAccessFile ? source : new file_2.AsyncRandomAccessFile(source, byteLength), dictionaries);
}
get footer() { return this._footer; }
get numDictionaries() { return this._footer ? this._footer.numDictionaries : 0; }
get numRecordBatches() { return this._footer ? this._footer.numRecordBatches : 0; }
isFile() { return true; }
isAsync() { return true; }
async open(options) {
if (!this.closed && !this._footer) {
this.schema = (this._footer = await this._readFooter()).schema;
for (const block of this._footer.dictionaryBatches()) {
block && await this._readDictionaryBatch(this._dictionaryIndex++);
}
}
return await super.open(options);
}
async readRecordBatch(index) {
if (this.closed) {
return null;
}
if (!this._footer) {
await this.open();
}
const block = this._footer && this._footer.getRecordBatch(index);
if (block && (await this._handle.seek(block.offset))) {
const message = await this._reader.readMessage(enum_1.MessageHeader.RecordBatch);
if (message && message.isRecordBatch()) {
const header = message.header();
const buffer = await this._reader.readMessageBody(message.bodyLength);
const recordBatch = this._loadRecordBatch(header, buffer);
return recordBatch;
}
}
return null;
}
async _readDictionaryBatch(index) {
const block = this._footer && this._footer.getDictionaryBatch(index);
if (block && (await this._handle.seek(block.offset))) {
const message = await this._reader.readMessage(enum_1.MessageHeader.DictionaryBatch);
if (message && message.isDictionaryBatch()) {
const header = message.header();
const buffer = await this._reader.readMessageBody(message.bodyLength);
const vector = this._loadDictionaryBatch(header, buffer);
this.dictionaries.set(header.id, vector);
}
}
}
async _readFooter() {
const { _handle } = this;
_handle._pending && await _handle._pending;
const offset = _handle.size - message_1.magicAndPadding;
const length = await _handle.readInt32(offset);
const buffer = await _handle.readAt(offset - length, length);
return file_1.Footer.decode(buffer);
}
async _readNextMessageAndValidate(type) {
if (!this._footer) {
await this.open();
}
if (this._footer && this._recordBatchIndex < this.numRecordBatches) {
const block = this._footer.getRecordBatch(this._recordBatchIndex);
if (block && await this._handle.seek(block.offset)) {
return await this._reader.readMessage(type);
}
}
return null;
}
}
/** @ignore */
class RecordBatchJSONReaderImpl extends RecordBatchStreamReaderImpl {
constructor(source, dictionaries) {
super(source, dictionaries);
}
_loadVectors(header, body, types) {
return new vectorloader_1.JSONVectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types);
}
}
//
// Define some helper functions and static implementations down here. There's
// a bit of branching in the static methods that can lead to the same routines
// being executed, so we've broken those out here for readability.
//
/** @ignore */
function shouldAutoDestroy(self, options) {
return options && (typeof options['autoDestroy'] === 'boolean') ? options['autoDestroy'] : self['autoDestroy'];
}
/** @ignore */
function* readAllSync(source) {
const reader = RecordBatchReader.from(source);
try {
if (!reader.open({ autoDestroy: false }).closed) {
do {
yield reader;
} while (!(reader.reset().open()).closed);
}
}
finally {
reader.cancel();
}
}
/** @ignore */
async function* readAllAsync(source) {
const reader = await RecordBatchReader.from(source);
try {
if (!(await reader.open({ autoDestroy: false })).closed) {
do {
yield reader;
} while (!(await reader.reset().open()).closed);
}
}
finally {
await reader.cancel();
}
}
/** @ignore */
function fromArrowJSON(source) {
return new RecordBatchStreamReader(new RecordBatchJSONReaderImpl(source));
}
/** @ignore */
function fromByteStream(source) {
const bytes = source.peek((message_1.magicLength + 7) & ~7);
return bytes && bytes.byteLength >= 4 ? !message_1.checkForMagicArrowString(bytes)
? new RecordBatchStreamReader(new RecordBatchStreamReaderImpl(source))
: new RecordBatchFileReader(new RecordBatchFileReaderImpl(source.read()))
: new RecordBatchStreamReader(new RecordBatchStreamReaderImpl(function* () { }()));
}
/** @ignore */
async function fromAsyncByteStream(source) {
const bytes = await source.peek((message_1.magicLength + 7) & ~7);
return bytes && bytes.byteLength >= 4 ? !message_1.checkForMagicArrowString(bytes)
? new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(source))
: new RecordBatchFileReader(new RecordBatchFileReaderImpl(await source.read()))
: new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(async function* () { }()));
}
/** @ignore */
async function fromFileHandle(source) {
const { size } = await source.stat();
const file = new file_2.AsyncRandomAccessFile(source, size);
if (size >= message_1.magicX2AndPadding) {
if (message_1.checkForMagicArrowString(await file.readAt(0, (message_1.magicLength + 7) & ~7))) {
return new AsyncRecordBatchFileReader(new AsyncRecordBatchFileReaderImpl(file));
}
}
return new AsyncRecordBatchStreamReader(new AsyncRecordBatchStreamReaderImpl(file));
}
//# sourceMappingURL=reader.js.map