Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,22 @@
static Schema convert(List<String> names, List<TypeInfo> typeInfos, List<String> comments, boolean autoConvert,
Map<String, String> defaultValues) {
HiveSchemaConverter converter = new HiveSchemaConverter(autoConvert);
return new Schema(converter.convertInternal(names, typeInfos, defaultValues, comments));
return new Schema(converter.convertInternal(names, typeInfos, defaultValues, comments, false));
}

public static Type convert(TypeInfo typeInfo, boolean autoConvert, String defaultValue) {
public static Type convert(TypeInfo typeInfo, boolean autoConvert, String defaultValue,
boolean shouldAddInitialDefault) {
HiveSchemaConverter converter = new HiveSchemaConverter(autoConvert);
return converter.convertType(typeInfo, defaultValue);
return converter.convertType(typeInfo, defaultValue, shouldAddInitialDefault);
}

List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeInfos,

Check failure on line 70 in iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ8ksWpw4c972omve0om&open=AZ8ksWpw4c972omve0om&pullRequest=6565
Map<String, String> defaultValues, List<String> comments) {
Map<String, String> defaultValues, List<String> comments, boolean shouldAddInitialDefault) {
List<Types.NestedField> result = Lists.newArrayListWithExpectedSize(names.size());
int outerId = id + names.size();
id = outerId;
for (int i = 0; i < names.size(); ++i) {
Type type = convertType(typeInfos.get(i), defaultValues.get(names.get(i)));
Type type = convertType(typeInfos.get(i), defaultValues.get(names.get(i)), shouldAddInitialDefault);
String columnName = names.get(i);
Types.NestedField.Builder fieldBuilder =
Types.NestedField.builder()
Expand All @@ -87,6 +88,9 @@
Object icebergDefaultValue = HiveSchemaUtil.getDefaultValue(defaultValues.get(columnName), type);
if (icebergDefaultValue != null) {
fieldBuilder.withWriteDefault(Expressions.lit(icebergDefaultValue));
if (shouldAddInitialDefault) {
fieldBuilder.withInitialDefault(Expressions.lit(icebergDefaultValue));
}
}
} else if (!type.isStructType()) {
throw new UnsupportedOperationException(
Expand All @@ -99,7 +103,7 @@
return result;
}

Type convertType(TypeInfo typeInfo, String defaultValue) {
Type convertType(TypeInfo typeInfo, String defaultValue, boolean shouldAddInitialDefault) {

Check warning on line 106 in iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 84 to 64, Complexity from 25 to 14, Nesting Level from 4 to 2, Number of Variables from 16 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ8ksWpw4c972omve0on&open=AZ8ksWpw4c972omve0on&pullRequest=6565
switch (typeInfo.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
Expand Down Expand Up @@ -161,21 +165,24 @@
}
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List<Types.NestedField> fields =
convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos(),
HiveSchemaUtil.getDefaultValuesMap(null, defaultValue), Collections.emptyList());
List<Types.NestedField> fields = convertInternal(
structTypeInfo.getAllStructFieldNames(),
structTypeInfo.getAllStructFieldTypeInfos(),
HiveSchemaUtil.getDefaultValuesMap(null, defaultValue),
Collections.emptyList(),
shouldAddInitialDefault);
return Types.StructType.of(fields);
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
int keyId = id++;
Type keyType = convertType(mapTypeInfo.getMapKeyTypeInfo(), defaultValue);
Type keyType = convertType(mapTypeInfo.getMapKeyTypeInfo(), defaultValue, shouldAddInitialDefault);
int valueId = id++;
Type valueType = convertType(mapTypeInfo.getMapValueTypeInfo(), defaultValue);
Type valueType = convertType(mapTypeInfo.getMapValueTypeInfo(), defaultValue, shouldAddInitialDefault);
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
int listId = id++;
Type listType = convertType(listTypeInfo.getListElementTypeInfo(), defaultValue);
Type listType = convertType(listTypeInfo.getListElementTypeInfo(), defaultValue, shouldAddInitialDefault);
return Types.ListType.ofOptional(listId, listType);
case VARIANT:
return Types.VariantType.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -148,10 +149,11 @@ public static TypeInfo convert(Type type) {
*
* @param typeInfo The Hive type
* @param defaultValue the default value for the column, if any
* @param shouldAddInitialDefault whether to set initial default for the column or not
* @return The Iceberg type
*/
public static Type convert(TypeInfo typeInfo, String defaultValue) {
return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
public static Type convert(TypeInfo typeInfo, String defaultValue, boolean shouldAddInitialDefault) {
return HiveSchemaConverter.convert(typeInfo, false, defaultValue, shouldAddInitialDefault);
}

/**
Expand Down Expand Up @@ -418,7 +420,8 @@ public static void setDefaultValues(Record record, List<Types.NestedField> missi
for (Types.NestedField field : missingFields) {
if (field.type().isStructType()) {
// Attempt to build the nested struct with its defaults
Record nestedRecord = buildStructWithDefaults(field.type().asStructType());
Record nestedRecord = buildStructFromDefaults(
field.type().asStructType(), Types.NestedField::writeDefault);
if (nestedRecord != null) {
record.setField(field.name(), nestedRecord);
}
Expand All @@ -430,34 +433,64 @@ public static void setDefaultValues(Record record, List<Types.NestedField> missi
}

/**
* Recursively builds a struct populated with write defaults.
* * @return A populated Record, or null if no nested fields have defaults.
* Backfills struct column that is null on read using nested {@code initialDefault} metadata.
* This applies to rows written before {@code ADD COLUMNS} added the struct.
* Spec allows struct defaults as {@code {}} (see https://iceberg.apache.org/spec/#default-values), but
* {@code UpdateSchema} add column only supports defaults of underlying primitives and keeping the
* struct default as null due to which we need to backfill that nested default record while reading;
* if empty structs are allowed, this backfill can be removed.
*/
private static Record buildStructWithDefaults(Types.StructType structType) {
public static void backfillStructInitialDefaults(
Record iceRecord, Map<String, Record> initialDefaultStructsByColumn) {
for (Map.Entry<String, Record> columnAndInitialDefaultStruct : initialDefaultStructsByColumn.entrySet()) {
String columnName = columnAndInitialDefaultStruct.getKey();
if (iceRecord.getField(columnName) == null) {
iceRecord.setField(columnName, columnAndInitialDefaultStruct.getValue());
}
}
}

/**
* Recursively builds a struct populated with underlying field defaults.
* @return A populated Record, or null if no nested fields have defaults.
*/
public static Record buildStructFromDefaults(
Types.StructType structType, Function<Types.NestedField, Object> defaultForField) {
Record nestedRecord = GenericRecord.create(structType);
boolean hasAnyDefault = false;

for (Types.NestedField field : structType.fields()) {
if (field.writeDefault() != null) {
Object defaultValue = convertToWriteType(field.writeDefault(), field.type());
nestedRecord.setField(field.name(), defaultValue);
Object defaultValue = defaultForField.apply(field);
if (defaultValue != null) {
nestedRecord.setField(field.name(), convertToWriteType(defaultValue, field.type()));
hasAnyDefault = true;
} else if (field.type().isStructType()) {
// Recursively process deeper nested structs
Record deeperRecord = buildStructWithDefaults(field.type().asStructType());

// If the deeper struct has defaults, attach it and flag this current struct as populated
Record deeperRecord = buildStructFromDefaults(field.type().asStructType(), defaultForField);
if (deeperRecord != null) {
nestedRecord.setField(field.name(), deeperRecord);
hasAnyDefault = true;
}
}
}

// If no fields (or nested fields) had defaults, return null to avoid an empty struct
return hasAnyDefault ? nestedRecord : null;
}

public static Map<String, Object> getStructInitialDefaults(Types.StructType structType) {
Map<String, Object> result = Maps.newHashMap();
for (Types.NestedField field : structType.fields()) {
if (field.initialDefault() != null) {
result.put(field.name(), field.initialDefault());
} else if (field.type().isStructType()) {
Map<String, Object> nested = getStructInitialDefaults(field.type().asStructType());
if (!nested.isEmpty()) {
result.put(field.name(), nested);
}
}
}
return result;
}

/**
* Sets a value into a {@link Record} using a struct-only field path (top-level column or nested
* through structs). Intermediate struct records are created as needed.
Expand Down Expand Up @@ -496,21 +529,6 @@ private static Record getOrCreateStructRecord(
return record;
}

// Special method for nested structs that always applies defaults to null fields
private static void setDefaultValuesForNestedStruct(Record record, List<Types.NestedField> fields) {
for (Types.NestedField field : fields) {
Object fieldValue = record.getField(field.name());

if (field.writeDefault() != null) {
Object defaultValue = convertToWriteType(field.writeDefault(), field.type());
record.setField(field.name(), defaultValue);
} else if (field.type().isStructType()) {
// Recursively process nested structs
setDefaultValuesForNestedStruct((Record) fieldValue, field.type().asStructType().fields());
}
}
}

public static Object convertToWriteType(Object value, Type type) {
if (value == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void checkConvert(TypeInfo typeInfo, Type type) {
// Convert to TypeInfo
assertThat(HiveSchemaUtil.convert(type)).isEqualTo(typeInfo);
// Convert to Type
assertEquals(type, HiveSchemaUtil.convert(typeInfo, null));
assertEquals(type, HiveSchemaUtil.convert(typeInfo, null, true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,16 +741,16 @@ private void handleAddColumns(org.apache.hadoop.hive.metastore.api.Table hmsTabl
(List<SQLDefaultConstraint>) SessionStateUtil.getResource(conf, SessionStateUtil.COLUMN_DEFAULTS).orElse(null);
Map<String, String> defaultValues = Stream.ofNullable(sqlDefaultConstraints).flatMap(Collection::stream)
.collect(Collectors.toMap(SQLDefaultConstraint::getColumn_name, SQLDefaultConstraint::getDefault_value));
boolean isORc = isOrcFileFormat(hmsTable);
boolean isOrc = isOrcFileFormat(hmsTable);
for (FieldSchema addedCol : addedCols) {
String defaultValue = defaultValues.get(addedCol.getName());
Type type = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()), defaultValue);
Type type = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()), defaultValue,
!isOrc);
Literal<Object> defaultVal = Optional.ofNullable(defaultValue).filter(v -> !type.isStructType())
.map(v -> Expressions.lit(HiveSchemaUtil.getDefaultValue(v, type))).orElse(null);

// ORC doesn't have support for initialDefault from iceberg layer, we only need to set default for writeDefault.
updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), isORc ? null : defaultVal);
if (isORc && defaultVal != null) {
updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), isOrc ? null : defaultVal);
if (isOrc && defaultVal != null) {
updateSchema.updateColumnDefault(addedCol.getName(), defaultVal);
}
}
Expand Down Expand Up @@ -933,7 +933,7 @@ private void handlePartitionRename(HiveSchemaUtil.SchemaDifference schemaDiffere
}

private Type.PrimitiveType getPrimitiveTypeOrThrow(FieldSchema field) throws MetaException {
Type newType = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(field.getType()), null);
Type newType = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(field.getType()), null, true);
if (!(newType instanceof Type.PrimitiveType)) {
throw new MetaException(String.format("Cannot promote type of column: '%s' to a non-primitive type: %s.",
field.getName(), newType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.InputFormatConfig;
Expand Down Expand Up @@ -187,6 +188,12 @@ static Map<String, Object> getInitialColumnDefaults(List<Types.NestedField> colu
for (Types.NestedField column : columns) {
if (column.initialDefault() != null) {
columnDefaults.put(column.name(), column.initialDefault());
} else if (column.type().isStructType()) {
Map<String, Object> structDefaults =
HiveSchemaUtil.getStructInitialDefaults(column.type().asStructType());
if (!structDefaults.isEmpty()) {
columnDefaults.put(column.name(), structDefaults);
}
Comment on lines 188 to +196

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
return columnDefaults;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
Expand All @@ -63,6 +64,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -173,7 +175,29 @@ private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) {
default -> throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(), file.location()));
};
return applyResidualFiltering(iterable, residual, readSchema);
return applyResidualFiltering(withStructInitialDefaultsBackfill(iterable, readSchema), residual, readSchema);
}

private CloseableIterable<T> withStructInitialDefaultsBackfill(CloseableIterable<T> iterable, Schema readSchema) {
Map<String, Record> initialDefaultStructsByColumn = Maps.newHashMap();
for (Types.NestedField column : readSchema.columns()) {
if (column.type().isStructType()) {
Record initialDefaultStruct = HiveSchemaUtil
.buildStructFromDefaults(column.type().asStructType(), Types.NestedField::initialDefault);
if (initialDefaultStruct != null) {
initialDefaultStructsByColumn.put(column.name(), initialDefaultStruct);
}
}
}
if (initialDefaultStructsByColumn.isEmpty()) {
return iterable;
}
return CloseableIterable.transform(iterable, row -> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialDefault value will be same for all the records, missing the field. buildStructWithInitialDefaults recursively parses the schema and calls convertToWriteType() to build the default record from scratch for every single missing struct on every single row. For a table with millions of rows, this continuous object creation and recursive schema parsing will severely degrade read performance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, optimized it to build default struct only once and then set the field of the row record for that particular field

if (row instanceof Record curIceRecord) {
HiveSchemaUtil.backfillStructInitialDefaults(curIceRecord, initialDefaultStructsByColumn);
}
return row;
});
}

private CloseableIterable<T> newAvroIterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT '{"x":100,"
created_date DATE DEFAULT '2024-01-01',
created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
score DECIMAL(5,2) DEFAULT 100.00,
category STRING DEFAULT 'general');
category STRING DEFAULT 'general',
person STRUCT<
name: STRING,
address: STRUCT<
street: STRING,
city: STRING
>
> DEFAULT '{"name":"John","address":{"street":"Main St","city":"New York"}}');

INSERT INTO ice_parq (id) VALUES (2);

Expand Down Expand Up @@ -45,7 +52,14 @@ ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT '{"x":100,"
created_date DATE DEFAULT '2024-01-01',
created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
score DECIMAL(5,2) DEFAULT 100.00,
category STRING DEFAULT 'general');
category STRING DEFAULT 'general',
person STRUCT<
name: STRING,
address: STRUCT<
street: STRING,
city: STRING
>
> DEFAULT '{"name":"John","address":{"street":"Main St","city":"New York"}}');

INSERT INTO ice_avro (id) VALUES (2);

Expand Down Expand Up @@ -77,7 +91,14 @@ ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT '{"x":100,"y
created_date DATE DEFAULT '2024-01-01',
created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
score DECIMAL(5,2) DEFAULT 100.00,
category STRING DEFAULT 'general');
category STRING DEFAULT 'general',
person STRUCT<
name: STRING,
address: STRUCT<
street: STRING,
city: STRING
>
> DEFAULT '{"name":"John","address":{"street":"Main St","city":"New York"}}');

INSERT INTO ice_orc (id) VALUES (2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ POSTHOOK: query: SELECT * FROM ice_t ORDER BY id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
1 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
PREHOOK: query: ALTER TABLE ice_t REPLACE COLUMNS (id INT,
point STRUCT<x:INT, y:INT> DEFAULT '{"x":100,"y":99}',
Expand Down Expand Up @@ -94,7 +94,7 @@ POSTHOOK: query: SELECT * FROM ice_t ORDER BY id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
1 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT '{"x":100,"y":88}'
PREHOOK: type: ALTERTABLE_RENAMECOL
Expand Down Expand Up @@ -128,7 +128,7 @@ POSTHOOK: query: SELECT * FROM ice_t ORDER BY id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
1 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point_new STRUCT<x:INT, y:INT> DEFAULT '{"x":55,"y":88}'
Expand All @@ -155,7 +155,7 @@ POSTHOOK: query: SELECT * FROM ice_t ORDER BY id
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
1 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
4 {"x":55,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
Loading
Loading