Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions parquet-variant/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.parquet.Preconditions;

/**
* This Variant class holds the Variant-encoded value and metadata binary values.
Expand All @@ -36,6 +37,12 @@ public final class Variant {
*/
final ByteBuffer metadata;

/** Number of entries in the metadata dictionary, cached from construction time. */
private final int dictSize;

/** Nesting depth of this Variant relative to the top-level value (0 = top-level). */
private final int depth;

/**
* The threshold to switch from linear search to binary search when looking up a field by key in
* an object. This is a performance optimization to avoid the overhead of binary search for a
Expand All @@ -60,13 +67,27 @@ public Variant(ByteBuffer value, ByteBuffer metadata) {
// is not important.
this.value = value.asReadOnlyBuffer();
this.metadata = metadata.asReadOnlyBuffer();
this.depth = 0;
this.dictSize = VariantUtil.validateMetadata(this.metadata);
VariantUtil.validateValueShallow(this.value, this.dictSize);
}

// There is currently only one allowed version.
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) {
throw new UnsupportedOperationException(String.format(
"Unsupported variant metadata version: %d",
metadata.get(metadata.position()) & VariantUtil.VERSION_MASK));
}
/**
* Package-private constructor for a child Variant produced by slicing an already-validated
* parent. The metadata buffer and {@code dictSize} are inherited from the parent (no
* re-validation); the value slot is bounds-checked shallowly, and the depth counter is
* propagated so that nesting depth is bounded.
*/
Variant(ByteBuffer value, ByteBuffer metadata, int dictSize, int depth) {
Preconditions.checkArgument(
depth <= VariantUtil.MAX_VARIANT_DEPTH,
"variant nesting depth exceeds maximum %s",
VariantUtil.MAX_VARIANT_DEPTH);
this.value = value.asReadOnlyBuffer();
this.metadata = metadata;
this.dictSize = dictSize;
this.depth = depth;
VariantUtil.validateValueShallow(this.value, dictSize);
}

public ByteBuffer getValueBuffer() {
Expand Down Expand Up @@ -219,7 +240,9 @@ public Variant getFieldByKey(String key) {
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
value.position() + info.dataStartOffset,
dictSize,
depth + 1);
if (field.key.equals(key)) {
return field.value;
}
Expand All @@ -240,7 +263,9 @@ public Variant getFieldByKey(String key) {
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
value.position() + info.dataStartOffset,
dictSize,
depth + 1);
int cmp = field.key.compareTo(key);
if (cmp < 0) {
low = mid + 1;
Expand Down Expand Up @@ -286,7 +311,9 @@ public ObjectField getFieldAtIndex(int idx) {
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
value.position() + info.dataStartOffset,
dictSize,
depth + 1);
return field;
}

Expand All @@ -298,12 +325,14 @@ static ObjectField getFieldAtIndex(
int offsetSize,
int idStart,
int offsetStart,
int dataStart) {
int dataStart,
int dictSize,
int childDepth) {
// idStart, offsetStart, and dataStart are absolute positions in the `value` buffer.
int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize);
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
String key = VariantUtil.getMetadataKey(metadata, id);
Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata, dictSize, childDepth);
return new ObjectField(key, v);
}

Expand Down Expand Up @@ -334,13 +363,22 @@ public Variant getElementAtIndex(int index) {
metadata,
info.offsetSize,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
value.position() + info.dataStartOffset,
dictSize,
depth + 1);
}

private static Variant getElementAtIndex(
int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) {
int index,
ByteBuffer value,
ByteBuffer metadata,
int offsetSize,
int offsetStart,
int dataStart,
int dictSize,
int childDepth) {
// offsetStart and dataStart are absolute positions in the `value` buffer.
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
return new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
return new Variant(VariantUtil.slice(value, dataStart + offset), metadata, dictSize, childDepth);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.parquet.variant;

import static org.apache.parquet.variant.VariantUtil.MAX_VARIANT_DEPTH;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -37,7 +39,7 @@ public final class VariantJsonParser {

private static final JsonFactory JSON_FACTORY = JsonFactory.builder()
.streamReadConstraints(StreamReadConstraints.builder()
.maxNestingDepth(500)
.maxNestingDepth(MAX_VARIANT_DEPTH)
.maxStringLength(10_000_000)
.maxDocumentLength(50_000_000L)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.parquet.Preconditions;

/**
* This class defines constants related to the Variant format and provides functions for
Expand Down Expand Up @@ -188,6 +189,12 @@ class VariantUtil {
// The size (in bytes) of a UUID.
static final int UUID_SIZE = 16;

/**
* Maximum permitted nesting depth of a Variant value.
* same limit as in VariantJsonParser.
*/
static final int MAX_VARIANT_DEPTH = 500;

// header bytes
static final byte HEADER_NULL = primitiveHeader(NULL);
static final byte HEADER_LONG_STRING = primitiveHeader(LONG_STR);
Expand Down Expand Up @@ -851,6 +858,160 @@ static HashMap<String, Integer> getMetadataMap(ByteBuffer metadata) {
return result;
}

/**
* Bounds-checks the metadata buffer: header version, dictionary offset table and string data
* region all fit within the buffer extent. It does not perform any deep checks into
* the metadata itself.
*
* @param metadata the variant metadata buffer
* @return the dictionary size
* @throws IllegalArgumentException if the metadata buffer is not well-formed
*/
static int validateMetadata(ByteBuffer metadata) {
int pos = metadata.position();
Preconditions.checkArgument(pos >= 0 && pos < metadata.limit(), "variant metadata is empty");
int header = metadata.get(pos) & 0xFF;
Preconditions.checkArgument(
(header & VERSION_MASK) == VERSION, "Unsupported variant metadata version: %s", header & VERSION_MASK);
int offsetSize = ((header >> 6) & 0x3) + 1;
long remaining = (long) metadata.limit() - pos;
long offsetListStart = 1L + offsetSize;
Preconditions.checkArgument(offsetListStart <= remaining, "variant metadata truncated");
int dictSize = readUnsigned(metadata, pos + 1, offsetSize);
long offsetBytes = (long) (dictSize + 1) * offsetSize;
long dataStart = offsetListStart + offsetBytes;
Preconditions.checkArgument(
dataStart <= remaining, "variant metadata dictionary table extends past buffer: dictSize=%s", dictSize);
return dictSize;
}

/**
* Bounds-checks a single Variant value node against its buffer slot. Performs no recursion
* into nested children: child nodes are checked on demand when callers descend into them.
*
* <p>Cost: O(1) for primitives and short strings, O(numElements) for objects and arrays.
* Validation of nested structures is deferred so that opening a large well-formed Variant
* is not penalised by sub-trees the caller never inspects.
*
* @param value the variant value buffer (position/limit define the extent of this node's slot)
* @param dictSize the metadata dictionary size, used to bound object field ids
* @throws IllegalArgumentException if the value header or container table does not fit within
* the buffer slot, or if any object field id is out of range
*/
static void validateValueShallow(ByteBuffer value, int dictSize) {
int s = value.position();
Preconditions.checkArgument(s >= 0 && s < value.limit(), "variant value is empty");
long slot = (long) value.limit() - s;
int header = value.get(s) & 0xFF;
int basicType = header & BASIC_TYPE_MASK;
int typeInfo = (header >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK;
switch (basicType) {
case SHORT_STR:
Preconditions.checkArgument(1L + typeInfo <= slot, "variant short string extends past buffer");
return;
case OBJECT:
validateContainerShallow(value, s, slot, dictSize, true, typeInfo);
return;
case ARRAY:
validateContainerShallow(value, s, slot, dictSize, false, typeInfo);
return;
default:
validatePrimitiveShallow(value, s, slot, typeInfo);
}
}

private static void validateContainerShallow(
ByteBuffer value, int s, long slot, int dictSize, boolean isObject, int typeInfo) {
boolean largeSize;
int idSize;
if (isObject) {
largeSize = ((typeInfo >> 4) & 0x1) != 0;
idSize = ((typeInfo >> 2) & 0x3) + 1;
} else {
largeSize = ((typeInfo >> 2) & 0x1) != 0;
idSize = 0;
}
int offsetSize = (typeInfo & 0x3) + 1;
int sizeBytes = largeSize ? U32_SIZE : 1;
Preconditions.checkArgument(1L + sizeBytes <= slot, "variant container header truncated");
int numElements = readUnsigned(value, s + 1, sizeBytes);
long idStart = 1L + sizeBytes;
long idBytes = isObject ? (long) numElements * idSize : 0L;
long offsetStart = idStart + idBytes;
long offsetBytes = (long) (numElements + 1) * offsetSize;
long dataStart = offsetStart + offsetBytes;
Preconditions.checkArgument(
dataStart <= slot, "variant container offset table extends past buffer: numElements=%s", numElements);
long dataLen = slot - dataStart;
if (isObject) {
for (int i = 0; i < numElements; i++) {
int id = readUnsigned(value, s + (int) idStart + i * idSize, idSize);
Preconditions.checkArgument(
id < dictSize, "variant object key id %s out of range (dictSize=%s)", id, dictSize);
}
}
// Each child offset must lie within the data region. Children may overlap or leave gaps;
// the trailing terminator offset is range-checked for the same reason.
for (int i = 0; i <= numElements; i++) {
// O(elements)
int off = readUnsigned(value, s + (int) offsetStart + i * offsetSize, offsetSize);
Preconditions.checkArgument(
off <= dataLen, "variant child offset out of range: %s (data length %s)", off, dataLen);
}
}

private static void validatePrimitiveShallow(ByteBuffer value, int s, long slot, int typeInfo) {
long size;
switch (typeInfo) {
case NULL:
case TRUE:
case FALSE:
size = 1;
break;
case INT8:
size = 2;
break;
case INT16:
size = 3;
break;
case INT32:
case DATE:
case FLOAT:
size = 5;
break;
case INT64:
case DOUBLE:
case TIMESTAMP_TZ:
case TIMESTAMP_NTZ:
case TIME:
case TIMESTAMP_NANOS_TZ:
case TIMESTAMP_NANOS_NTZ:
size = 9;
break;
case DECIMAL4:
size = 6;
break;
case DECIMAL8:
size = 10;
break;
case DECIMAL16:
size = 18;
break;
case BINARY:
case LONG_STR: {
Preconditions.checkArgument(1L + U32_SIZE <= slot, "variant string/binary length field truncated");
size = 1L + U32_SIZE + readUnsigned(value, s + 1, U32_SIZE);
break;
}
case UUID:
size = 1L + UUID_SIZE;
break;
default:
throw new IllegalArgumentException(String.format("Unknown primitive type in variant: %d", typeInfo));
}
Preconditions.checkArgument(size <= slot, "variant value extends past buffer");
}

/**
* Computes the actual size (in bytes) of the Variant value.
* @param value The Variant value binary
Expand Down
Loading