Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/functions/src/string/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl ScalarUDFImpl for ConcatFunc {
columns
.iter()
.for_each(|column| builder.write::<true>(column, i));
builder.append_offset();
builder.append_offset()?;
}

let string_array = builder.finish(None)?;
Expand All @@ -271,7 +271,7 @@ impl ScalarUDFImpl for ConcatFunc {
columns
.iter()
.for_each(|column| builder.write::<true>(column, i));
builder.append_offset();
builder.append_offset()?;
}

let string_array = builder.finish(None)?;
Expand Down
8 changes: 4 additions & 4 deletions datafusion/functions/src/string/concat_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size);
for i in 0..len {
if !sep.is_valid(i) {
builder.append_offset();
builder.append_offset()?;
continue;
}
let mut first = true;
Expand All @@ -348,15 +348,15 @@ impl ScalarUDFImpl for ConcatWsFunc {
first = false;
}
}
builder.append_offset();
builder.append_offset()?;
}
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
}
_ => {
let mut builder = StringArrayBuilder::with_capacity(len, data_size);
for i in 0..len {
if !sep.is_valid(i) {
builder.append_offset();
builder.append_offset()?;
continue;
}
let mut first = true;
Expand All @@ -369,7 +369,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
first = false;
}
}
builder.append_offset();
builder.append_offset()?;
}
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
}
Expand Down
96 changes: 66 additions & 30 deletions datafusion/functions/src/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use datafusion_common::{Result, exec_datafusion_err, internal_err};

use arrow::array::{
Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray,
StringArray, StringViewArray, StringViewBuilder, make_view,
StringArray, StringViewArray, make_view,
};
use arrow::buffer::{MutableBuffer, NullBuffer};
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
use arrow::datatypes::DataType;

/// Optimized version of the StringBuilder in Arrow that:
Expand Down Expand Up @@ -106,13 +106,14 @@ impl StringArrayBuilder {
}
}

pub fn append_offset(&mut self) {
pub fn append_offset(&mut self) -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an API change we should call out (I'll add a api-change label)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why StringViewArrayBuilder is pub, rather than pub(crate)? It is a fairly special-purpose API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR is I don't know -- it got added for some very special purpose reason I can't remember (I think ideally we would use the upstream one in arrow-rs, but for some reason we needed a special copy. I can't remember why)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote out a quick plan for what I'd like to do here, see #21684 ; comments very welcome

let next_offset: i32 = self
.value_buffer
.len()
.try_into()
.expect("byte array offset overflow");
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}

/// Finalize the builder into a concrete [`StringArray`].
Expand Down Expand Up @@ -150,18 +151,25 @@ impl StringArrayBuilder {
}
}

/// Optimized version of Arrow's [`StringViewBuilder`]. Rather than adding NULLs
/// on a row-by-row basis, the caller should provide nulls when calling
/// [`finish`](Self::finish). This allows callers to compute nulls more
/// efficiently (e.g., via bulk bitmap operations).
///
/// [`StringViewBuilder`]: arrow::array::StringViewBuilder
pub struct StringViewArrayBuilder {
builder: StringViewBuilder,
views: Vec<u128>,
data: Vec<u8>,
block: Vec<u8>,
/// If true, a safety check is required during the `append_offset` call
tainted: bool,
}

impl StringViewArrayBuilder {
pub fn with_capacity(item_capacity: usize, _data_capacity: usize) -> Self {
let builder = StringViewBuilder::with_capacity(item_capacity);
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
Self {
builder,
views: Vec::with_capacity(item_capacity),
data: Vec::with_capacity(data_capacity),
block: vec![],
tainted: false,
}
Expand Down Expand Up @@ -214,16 +222,29 @@ impl StringViewArrayBuilder {
}
}

/// Finalizes the current row by converting the accumulated data into a
/// StringView and appending it to the views buffer.
pub fn append_offset(&mut self) -> Result<()> {
let block_str = if self.tainted {
if self.tainted {
std::str::from_utf8(&self.block)
.map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?
.map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?;
}

let v = &self.block;
if v.len() > 12 {
let offset: u32 = self
.data
.len()
.try_into()
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.data.extend_from_slice(v);
self.views.push(make_view(v, 0, offset));
} else {
// SAFETY: all data that was appended was valid UTF8
unsafe { std::str::from_utf8_unchecked(&self.block) }
};
self.builder.append_value(block_str);
self.views.push(make_view(v, 0, 0));
}

self.block.clear();
self.tainted = false;
Ok(())
}

Expand All @@ -233,21 +254,35 @@ impl StringViewArrayBuilder {
///
/// Returns an error when:
///
/// - the provided `null_buffer` does not match amount of `append_offset` calls.
pub fn finish(mut self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
let array = self.builder.finish();
match null_buffer {
Some(nulls) if nulls.len() != array.len() => {
internal_err!("Null buffer and views buffer must be the same length")
}
Some(nulls) => {
let array_builder = array.into_data().into_builder().nulls(Some(nulls));
// SAFETY: the underlying data is valid; we are only adding a null buffer
let array_data = unsafe { array_builder.build_unchecked() };
Ok(StringViewArray::from(array_data))
}
None => Ok(array),
/// - the provided `null_buffer` length does not match the row count.
pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
if let Some(ref nulls) = null_buffer
&& nulls.len() != self.views.len()
{
return internal_err!(
"Null buffer length ({}) must match row count ({})",
nulls.len(),
self.views.len()
);
}

let buffers: Vec<Buffer> = if self.data.is_empty() {
vec![]
} else {
vec![Buffer::from(self.data)]
};

// SAFETY: views were constructed with correct lengths, offsets, and
// prefixes. UTF-8 validity was checked in append_offset() for any row
// where tainted data (e.g., binary literals) was appended.
let array = unsafe {
StringViewArray::new_unchecked(
ScalarBuffer::from(self.views),
buffers,
null_buffer,
)
};
Ok(array)
}
}

Expand Down Expand Up @@ -328,13 +363,14 @@ impl LargeStringArrayBuilder {
}
}

pub fn append_offset(&mut self) {
pub fn append_offset(&mut self) -> Result<()> {
let next_offset: i64 = self
.value_buffer
.len()
.try_into()
.expect("byte array offset overflow");
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}

/// Finalize the builder into a concrete [`LargeStringArray`].
Expand Down
Loading