Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/dependabot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ updates:
groups:
github-actions:
patterns: ["*"]
cooldown:
default-days: 7
- package-ecosystem: "uv"
directory: "/"
schedule:
interval: "weekly"
groups:
python-uv-lock:
patterns: ["*"]
cooldown:
default-days: 7
234 changes: 234 additions & 0 deletions bench_fixes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Compare the two PR fixes independently and combined:
A) Baseline — list() + StringIO (V2, no declare_fields)
B) list-fix — no list() via declare_fields, still StringIO (V2)
C) spool-fix — list() still present, but SpooledTemporaryFile (V3)
D) both — declare_fields + V3

Measures wall-clock, tracemalloc peak heap, and RSS delta.
Uses GeneratingCommand path so list() materialisation in _execute_chunk_v2
is actually exercised.
"""
import gc
import resource
import sys
import time
import tracemalloc
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))

from splunklib.searchcommands.internals import (
DiskBufferSettings,
RecordWriterV2,
RecordWriterV3,
)

GB = 1024 ** 3
MB = 1024 * 1024

# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
RECORD_BYTES = 1_000
GB_TARGET = 2.0
CHUNK_ROWS = 50_000
SPOOL_SIZE = 4 * MB

N_RECORDS = int(GB_TARGET * GB / RECORD_BYTES)
PAYLOAD = "x" * RECORD_BYTES


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class NullFile:
def write(self, d): return len(d)
def flush(self): pass


def rss_bytes() -> int:
ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return ru if sys.platform == "darwin" else ru * 1024


def record_gen(n: int):
for i in range(n):
yield {"index": str(i), "payload": PAYLOAD}


# ---------------------------------------------------------------------------
# Simulate GeneratingCommand._execute_chunk_v2 for each case
# ---------------------------------------------------------------------------
def run_baseline(n: int) -> tuple[float, int, int]:
"""A: list() accumulation + StringIO (original behaviour)."""
w = RecordWriterV2(NullFile(), CHUNK_ROWS)
process = record_gen(n)

gc.collect()
rss_before = rss_bytes()
tracemalloc.start()
t0 = time.perf_counter()

while True:
count = 0
records = []
for row in process:
records.append(row)
count += 1
if count == CHUNK_ROWS:
break
for row in records:
w.write_record(row)
finished = count < CHUNK_ROWS
w.write_chunk(finished=finished)
if finished:
break

wall = time.perf_counter() - t0
_, heap = tracemalloc.get_traced_memory()
tracemalloc.stop()
return wall, heap, max(0, rss_bytes() - rss_before)


def run_list_fix(n: int) -> tuple[float, int, int]:
"""B: declare_fields removes list(), still StringIO."""
w = RecordWriterV2(NullFile(), CHUNK_ROWS)
w.custom_fields.update(["index", "payload"])
w.fields_declared = True
process = record_gen(n)

gc.collect()
rss_before = rss_bytes()
tracemalloc.start()
t0 = time.perf_counter()

while True:
count = 0
# fields_declared path: stream directly
for row in process:
w.write_record(row)
count += 1
if count == CHUNK_ROWS:
break
finished = count < CHUNK_ROWS
w.write_chunk(finished=finished)
if finished:
break

wall = time.perf_counter() - t0
_, heap = tracemalloc.get_traced_memory()
tracemalloc.stop()
return wall, heap, max(0, rss_bytes() - rss_before)


def run_spool_fix(n: int) -> tuple[float, int, int]:
"""C: list() still used, but SpooledTemporaryFile (V3)."""
w = RecordWriterV3(NullFile(), CHUNK_ROWS, disk_buffer=DiskBufferSettings(spool_size=SPOOL_SIZE))
process = record_gen(n)

gc.collect()
rss_before = rss_bytes()
tracemalloc.start()
t0 = time.perf_counter()

while True:
count = 0
records = []
for row in process:
records.append(row)
count += 1
if count == CHUNK_ROWS:
break
for row in records:
w.write_record(row)
finished = count < CHUNK_ROWS
w.write_chunk(finished=finished)
if finished:
break

wall = time.perf_counter() - t0
_, heap = tracemalloc.get_traced_memory()
tracemalloc.stop()
return wall, heap, max(0, rss_bytes() - rss_before)


def run_both(n: int) -> tuple[float, int, int]:
"""D: declare_fields + V3."""
w = RecordWriterV3(NullFile(), CHUNK_ROWS, disk_buffer=DiskBufferSettings(spool_size=SPOOL_SIZE))
w.custom_fields.update(["index", "payload"])
w.fields_declared = True
process = record_gen(n)

gc.collect()
rss_before = rss_bytes()
tracemalloc.start()
t0 = time.perf_counter()

while True:
count = 0
for row in process:
w.write_record(row)
count += 1
if count == CHUNK_ROWS:
break
finished = count < CHUNK_ROWS
w.write_chunk(finished=finished)
if finished:
break

wall = time.perf_counter() - t0
_, heap = tracemalloc.get_traced_memory()
tracemalloc.stop()
return wall, heap, max(0, rss_bytes() - rss_before)


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print(f"\nFix comparison: {GB_TARGET:.1f} GB payload "
f"({N_RECORDS:,} records × {RECORD_BYTES} B) "
f"chunk_rows={CHUNK_ROWS:,} spool={SPOOL_SIZE // MB} MB\n")

hdr = f"{'Variant':<35} {'Wall (s)':>8} {'Heap peak':>11} {'RSS delta':>11}"
print(hdr)
print("-" * len(hdr))

cases = [
("A baseline (list + StringIO)", run_baseline),
("B list-fix (no list, StringIO)", run_list_fix),
("C spool-fix (list + SpoolFile)", run_spool_fix),
("D both (no list + SpoolFile)", run_both),
]

results = {}
for label, fn in cases:
wall, heap, rss = fn(N_RECORDS)
results[label] = (wall, heap, rss)
print(f"{label:<35} {wall:>8.2f} {heap / MB:>9.1f} MB {rss / MB:>9.1f} MB")
gc.collect()

baseline_wall, baseline_heap, baseline_rss = results[cases[0][0]]
print()
print("Savings vs baseline:")
for label, _ in cases[1:]:
w, h, r = results[label]
dw = w - baseline_wall
dh = h - baseline_heap
dr = r - baseline_rss
print(f" {label:<35} wall {dw:>+7.2f}s heap {dh / MB:>+7.1f} MB ({dh / baseline_heap * 100:>+5.1f}%) "
f"rss {dr / MB:>+7.1f} MB")

print()
# Which fix dominates heap savings?
_, h_b, _ = results[cases[1][0]] # list-fix
_, h_c, _ = results[cases[2][0]] # spool-fix
list_saving = (baseline_heap - h_b) / baseline_heap * 100
spool_saving = (baseline_heap - h_c) / baseline_heap * 100
print(f"Heap: list-fix alone saves {list_saving:.1f}%, spool-fix alone saves {spool_saving:.1f}%")


if __name__ == "__main__":
main()
148 changes: 148 additions & 0 deletions bench_writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Standalone benchmark: RecordWriterV2 (StringIO) vs RecordWriterV3 (SpooledFile).

Streams N GB of synthetic records through each writer into /dev/null.
Measures wall-clock time, peak tracemalloc heap, and peak RSS.

Usage:
python bench_writers.py [--gb 10] [--record-bytes 1000] [--chunk-rows 50000]
"""
import argparse
import gc
import os
import resource
import shutil
import sys
import time
import tracemalloc
from io import BytesIO, TextIOWrapper
from pathlib import Path

# Make sure we can import splunklib from the worktree
sys.path.insert(0, str(Path(__file__).parent))

from splunklib.searchcommands.internals import (
DiskBufferSettings,
RecordWriterV2,
RecordWriterV3,
)


# ---------------------------------------------------------------------------
# /dev/null sink (binary)
# ---------------------------------------------------------------------------
class NullFile:
"""Binary sink — accepts bytes, discards them."""
def write(self, data: bytes) -> int:
return len(data)
def flush(self):
pass


# ---------------------------------------------------------------------------
# Synthetic record generator (never materialises all records)
# ---------------------------------------------------------------------------
def record_stream(n_records: int, record_bytes: int):
"""Yield dicts with a fixed-size payload field."""
payload = "x" * record_bytes
for i in range(n_records):
yield {"index": str(i), "payload": payload}


# ---------------------------------------------------------------------------
# Benchmark runner
# ---------------------------------------------------------------------------
def rss_bytes() -> int:
ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return ru if sys.platform == "darwin" else ru * 1024


def run_benchmark(writer, records_iter, chunk_rows: int) -> tuple[float, int, int]:
"""
Pump records through writer in chunks of `chunk_rows`.
Returns (wall_seconds, peak_tracemalloc_bytes, peak_rss_bytes).
"""
gc.collect()
rss_before = rss_bytes()

tracemalloc.start()
t0 = time.perf_counter()

count = 0
for record in records_iter:
writer.write_record(record)
count += 1
if count == chunk_rows:
writer.write_chunk(finished=False)
count = 0

writer.write_chunk(finished=True)

wall = time.perf_counter() - t0
_, peak_heap = tracemalloc.get_traced_memory()
tracemalloc.stop()

rss_after = rss_bytes()
peak_rss_delta = max(0, rss_after - rss_before)

return wall, peak_heap, peak_rss_delta


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--gb", type=float, default=10.0, help="Total payload GB")
parser.add_argument("--record-bytes", type=int, default=1_000, help="Bytes per record")
parser.add_argument("--chunk-rows", type=int, default=50_000, help="Rows per chunk (maxresultrows)")
parser.add_argument("--spool-size", type=int, default=4 * 1024 * 1024, help="V3 spool_size bytes")
args = parser.parse_args()

total_bytes = int(args.gb * 1024 ** 3)
n_records = total_bytes // args.record_bytes
actual_gb = (n_records * args.record_bytes) / 1024 ** 3

mb = 1024 * 1024
gb = 1024 ** 3

print(f"\nBenchmark: {actual_gb:.2f} GB payload "
f"({n_records:,} records × {args.record_bytes} B) "
f"chunk_rows={args.chunk_rows:,} spool_size={args.spool_size // mb} MB\n")
print(f"{'Writer':<30} {'Wall (s)':>10} {'Heap peak':>12} {'RSS delta':>12} {'Throughput':>14}")
print("-" * 84)

results = {}

for label, make_writer in [
("RecordWriterV2 (StringIO)", lambda: RecordWriterV2(NullFile(), args.chunk_rows)),
(f"RecordWriterV3 (spool={args.spool_size // mb}MB)", lambda: RecordWriterV3(
NullFile(), args.chunk_rows, disk_buffer=DiskBufferSettings(spool_size=args.spool_size)
)),
]:
writer = make_writer()
gen = record_stream(n_records, args.record_bytes)
wall, heap, rss = run_benchmark(writer, gen, args.chunk_rows)
throughput = (n_records * args.record_bytes) / wall / gb
results[label] = (wall, heap, rss, throughput)
print(f"{label:<30} {wall:>10.2f} {heap / mb:>10.1f} MB {rss / mb:>10.1f} MB {throughput:>12.2f} GB/s")
gc.collect()

# Delta row
labels = list(results.keys())
w2, h2, r2, tp2 = results[labels[0]]
w3, h3, r3, tp3 = results[labels[1]]
print("-" * 84)
print(f"{'Delta (V3 - V2)':<30} {w3 - w2:>+10.2f} {(h3 - h2) / mb:>+10.1f} MB "
f"{(r3 - r2) / mb:>+10.1f} MB {tp3 - tp2:>+12.2f} GB/s")
print()

heap_reduction_pct = (h2 - h3) / h2 * 100 if h2 else 0
print(f"V3 heap reduction: {heap_reduction_pct:.1f}% vs V2")
print(f"Total CSV written: ~{n_records * args.record_bytes * 2 / gb:.1f} GB " # ~2x due to __mv_ columns
f"(raw payload × ~2 for __mv_ encoding)")


if __name__ == "__main__":
main()
Loading
Loading