Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .nix/php/lib/php.ini.dist
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ file_uploads = On
max_file_uploads = 20
short_open_tag = off
opcache.enable=1
opcache.enable_cli=0
opcache.enable_cli=0
apc.enabled=1
apc.enable_cli=1
apc.shm_size=128M
1 change: 1 addition & 0 deletions .nix/pkgs/flow-php/package.nix
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let
with all;
enabled
++ [
apcu
bcmath
dom
mbstring
Expand Down
2 changes: 2 additions & 0 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL;

use Flow\ETL\Config\Aggregation\AggregationConfig;
use Flow\ETL\Config\Cache\CacheConfig;
use Flow\ETL\Config\ConfigBuilder;
use Flow\ETL\Config\Sort\SortConfig;
Expand Down Expand Up @@ -36,6 +37,7 @@ public function __construct(
public SortConfig $sort,
private ?Analyze $analyze,
public TelemetryConfig $telemetry,
public AggregationConfig $aggregation,
) {}

public static function builder(): ConfigBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Config\Aggregation;

use Flow\ETL\Dataset\Memory\Unit;
use Flow\ETL\GroupBy\AggregationStorageStrategy;
use Flow\ETL\GroupBy\Storage\AggregationStorage;

final readonly class AggregationConfig
{
public const string AGGREGATION_MAX_MEMORY_ENV = 'FLOW_AGGREGATION_MAX_MEMORY';

public function __construct(
public AggregationStorageStrategy $strategy,
public Unit $memoryLimit,
public ?AggregationStorage $storage = null,
public string $filesystemProtocol = 'file',
) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Config\Aggregation;

use Flow\ETL\Dataset\Memory\Unit;
use Flow\ETL\GroupBy\AggregationStorageStrategy;
use Flow\ETL\GroupBy\Storage\AggregationStorage;

use function getenv;
use function ini_get;
use function is_string;

use const PHP_INT_MAX;

final class AggregationConfigBuilder
{
public const int DEFAULT_AGGREGATION_MEMORY_PERCENTAGE = 70;

private string $filesystemProtocol = 'file';

private ?Unit $memoryLimit = null;

private ?AggregationStorage $storage = null;

private AggregationStorageStrategy $strategy = AggregationStorageStrategy::MEMORY;

public function build(): AggregationConfig
{
if ($this->memoryLimit === null) {
$aggregationMemory = getenv(AggregationConfig::AGGREGATION_MAX_MEMORY_ENV);

if (is_string($aggregationMemory)) {
$this->memoryLimit = Unit::fromString($aggregationMemory);
} else {
$memoryLimit = ini_get('memory_limit');

if ($memoryLimit === false || $memoryLimit === '-1') {
$this->memoryLimit = Unit::fromBytes(PHP_INT_MAX);
} else {
$this->memoryLimit = Unit::fromString(
$memoryLimit,
)->percentage(self::DEFAULT_AGGREGATION_MEMORY_PERCENTAGE);
}
}
}

return new AggregationConfig($this->strategy, $this->memoryLimit, $this->storage, $this->filesystemProtocol);
}

public function filesystemProtocol(string $protocol): self
{
$this->filesystemProtocol = $protocol;

return $this;
}

public function memoryLimit(Unit $memoryLimit): self
{
$this->memoryLimit = $memoryLimit;

return $this;
}

public function storage(AggregationStorage $storage): self
{
$this->storage = $storage;

return $this;
}

public function strategy(AggregationStorageStrategy $strategy): self
{
$this->strategy = $strategy;

return $this;
}
}
35 changes: 35 additions & 0 deletions src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
use Flow\ETL\Analyze;
use Flow\ETL\Cache;
use Flow\ETL\Config;
use Flow\ETL\Config\Aggregation\AggregationConfigBuilder;
use Flow\ETL\Config\Cache\CacheConfigBuilder;
use Flow\ETL\Config\Sort\SortConfigBuilder;
use Flow\ETL\Config\Telemetry\TelemetryConfig;
use Flow\ETL\Config\Telemetry\TelemetryOptions;
use Flow\ETL\Dataset\Memory\Unit;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\GroupBy\AggregationStorageStrategy;
use Flow\ETL\GroupBy\Storage\AggregationStorage;
use Flow\ETL\NativePHPRandomValueGenerator;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Pipeline\Optimizer\BatchSizeOptimization;
Expand All @@ -34,6 +37,8 @@

final class ConfigBuilder
{
public readonly AggregationConfigBuilder $aggregation;

public readonly CacheConfigBuilder $cache;

public readonly SortConfigBuilder $sort;
Expand Down Expand Up @@ -69,6 +74,7 @@ public function __construct()
$this->putInputIntoRows = false;
$this->optimizer = null;
$this->clock = null;
$this->aggregation = new AggregationConfigBuilder();
$this->cache = new CacheConfigBuilder();
$this->sort = new SortConfigBuilder();
$this->randomValueGenerator = new NativePHPRandomValueGenerator();
Expand All @@ -79,6 +85,34 @@ public function __construct()
: PackageVersion::get('flow-php/etl');
}

public function aggregationFilesystem(string $protocol): self
{
$this->aggregation->filesystemProtocol($protocol);

return $this;
}

public function aggregationMemoryLimit(Unit $unit): self
{
$this->aggregation->memoryLimit($unit);

return $this;
}

public function aggregationStorage(AggregationStorageStrategy $strategy): self

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so storage or strategy? 😅

{
$this->aggregation->strategy($strategy);

return $this;
}

public function aggregationStore(AggregationStorage $storage): self

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two methods, aggregationStorage and aggregationStore are a bit confusing

{
$this->aggregation->storage($storage);

return $this;
}

public function analyze(Analyze $analyze): self
{
$this->analyze = $analyze;
Expand Down Expand Up @@ -111,6 +145,7 @@ public function build(EntryFactory $entryFactory = new EntryFactory()): Config
$this->sort->build(),
$this->analyze,
$this->telemetryConfig ?? TelemetryConfig::default($this->getClock()),
$this->aggregation->build(),
);
}

Expand Down
12 changes: 11 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Average.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use function is_int;
use function is_numeric;

final class Average implements AggregatingFunction, WindowFunction
final class Average implements AggregatingFunction, MergeableAggregatingFunction, WindowFunction
{
private int $count;

Expand Down Expand Up @@ -82,6 +82,16 @@ public function apply(Row $row, Rows $partition, FlowContext $context): mixed
return (new Calculator())->divide($sum, $count, $this->scale, $this->rounding);
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

$this->sum += $other->sum;
$this->count += $other->count;
}

public function over(Window $window): WindowFunction
{
$this->window = $window;
Expand Down
12 changes: 11 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Collect.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
use Flow\ETL\Row\EntryFactory;
use Flow\ETL\Row\Reference;

use function array_merge;
use function current;
use function Flow\ETL\DSL\to_entry;

final class Collect implements AggregatingFunction
final class Collect implements AggregatingFunction, MergeableAggregatingFunction
{
/**
* @var array<mixed>
Expand All @@ -41,6 +42,15 @@ public function aggregate(Row $row, FlowContext $context): void
}
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

$this->collection = array_merge($this->collection, $other->collection);
}

/**
* @return Entry<mixed>
*/
Expand Down
16 changes: 15 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/CollectUnique.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use function Flow\ETL\DSL\to_entry;
use function in_array;

final class CollectUnique implements AggregatingFunction
final class CollectUnique implements AggregatingFunction, MergeableAggregatingFunction
{
/**
* @var array<mixed>
Expand Down Expand Up @@ -49,6 +49,20 @@ public function aggregate(Row $row, FlowContext $context): void
}
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

/** @var mixed $value */
foreach ($other->collection as $value) {
if (!in_array($value, $this->collection, true)) {
$this->collection[] = $value;
}
}
}

/**
* @return Entry<mixed>
*/
Expand Down
11 changes: 10 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Count.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use function Flow\ETL\DSL\int_entry;

final class Count implements AggregatingFunction, WindowFunction
final class Count implements AggregatingFunction, MergeableAggregatingFunction, WindowFunction
{
private int $count;

Expand Down Expand Up @@ -80,6 +80,15 @@ public function apply(Row $row, Rows $partition, FlowContext $context): mixed
return $count;
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

$this->count += $other->count;
}

public function over(Window $window): WindowFunction
{
$this->window = $window;
Expand Down
11 changes: 10 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/First.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

use function Flow\ETL\DSL\string_entry;

final class First implements AggregatingFunction
final class First implements AggregatingFunction, MergeableAggregatingFunction
{
/**
* @var null|Entry<mixed>
Expand All @@ -37,6 +37,15 @@ public function aggregate(Row $row, FlowContext $context): void
}
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

$this->first ??= $other->first;
}

/**
* @return Entry<mixed>
*/
Expand Down
13 changes: 12 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/Last.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

use function Flow\ETL\DSL\string_entry;

final class Last implements AggregatingFunction
final class Last implements AggregatingFunction, MergeableAggregatingFunction
{
/**
* @var null|Entry<mixed>
Expand All @@ -35,6 +35,17 @@ public function aggregate(Row $row, FlowContext $context): void
}
}

public function merge(MergeableAggregatingFunction $other): void
{
if (!$other instanceof self) {
throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class);
}

if ($other->last !== null) {
$this->last = $other->last;
}
}

/**
* @return Entry<mixed>
*/
Expand Down
Loading
Loading