Skip to content

Commit 9427eef

Browse files
author
Oren (electricessence)
committed
Updated ObjectPool impementions.
1 parent f607de0 commit 9427eef

21 files changed

Lines changed: 1059 additions & 429 deletions
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using Microsoft.VisualStudio.TestTools.UnitTesting;
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
using System.Threading.Tasks.Dataflow;
8+
9+
namespace Open.Disposable
10+
{
11+
[TestClass]
12+
public class ObjectPoolBenchmarks
13+
{
14+
const int TEST_COUNT = 200;
15+
const int TEST_LOOPS = 1000;
16+
17+
static void Benchmark<T>(IObjectPool<T> pool)
18+
where T : class
19+
{
20+
var tank = new ConcurrentBag<T>(); // This will have an effect on performance measurement, but hopefully consistently.
21+
int remaining = 0;
22+
using (pool)
23+
{
24+
Parallel.For(0, TEST_COUNT, i => tank.Add(pool.Take()));
25+
26+
Parallel.ForEach(tank, e => pool.Give(e));
27+
28+
while (pool.TryTake() != null) { remaining++; }
29+
}
30+
//Console.WriteLine("Remaining: {0}", remaining);
31+
Assert.IsTrue(remaining!=0);
32+
}
33+
34+
static void Benchmark<T>(IEnumerable<IObjectPool<T>> pools)
35+
where T : class
36+
{
37+
foreach (var p in pools)
38+
Benchmark(p);
39+
}
40+
41+
OptimisticArrayObjectPool<object>[] OptimisticArrayObjectPools
42+
= Enumerable.Range(0, TEST_LOOPS)
43+
.Select(i => OptimisticArrayObjectPool.Create<object>(TEST_COUNT * 2))
44+
.ToArray();
45+
46+
[TestMethod]
47+
public void OptimisticArrayObjectPool_Benchmark()
48+
{
49+
Benchmark(OptimisticArrayObjectPools);
50+
}
51+
52+
ConcurrentBagObjectPool<object>[] ConcurrentBagObjectPools
53+
= Enumerable.Range(0, TEST_LOOPS)
54+
.Select(i => ConcurrentBagObjectPool.Create<object>(TEST_COUNT * 2))
55+
.ToArray();
56+
57+
[TestMethod]
58+
public void ConcurrentBagObjectPool_Benchmark()
59+
{
60+
Benchmark(ConcurrentBagObjectPools);
61+
}
62+
63+
BufferBlockObjectPool<object>[] BufferBlockObjectPools
64+
= Enumerable.Range(0, TEST_LOOPS)
65+
.Select(i => BufferBlockObjectPool.Create<object>(TEST_COUNT * 2))
66+
.ToArray();
67+
68+
[TestMethod]
69+
public void BufferBlockObjectPool_Benchmark()
70+
{
71+
Benchmark(BufferBlockObjectPools);
72+
}
73+
74+
75+
LinkedListObjectPool<object>[] LinkedListObjectPools
76+
= Enumerable.Range(0, TEST_LOOPS)
77+
.Select(i => LinkedListObjectPool.Create<object>(TEST_COUNT * 2))
78+
.ToArray();
79+
80+
[TestMethod]
81+
public void LinkedListObjectPool_Benchmark()
82+
{
83+
Benchmark(LinkedListObjectPools);
84+
}
85+
}
86+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Microsoft.VisualStudio.TestTools.UnitTesting;
2+
3+
namespace Open.Disposable
4+
{
5+
[TestClass]
6+
public class ObjectPoolSmokeTests
7+
{
8+
class IdContainer
9+
{
10+
public int ID;
11+
}
12+
13+
[TestMethod]
14+
public void BufferBlockObjectPool_FactoryTest()
15+
{
16+
int i = 0;
17+
var pool = BufferBlockObjectPool.Create(()=>new IdContainer { ID = ++i });
18+
Assert.AreEqual(1, pool.Take().ID);
19+
}
20+
21+
[TestMethod]
22+
public void ConcurrentBagObjectPool_FactoryTest()
23+
{
24+
int i = 0;
25+
var pool = ConcurrentBagObjectPool.Create(() => new IdContainer { ID = ++i });
26+
Assert.AreEqual(1, pool.Take().ID);
27+
}
28+
29+
[TestMethod]
30+
public void OptimisticArrayObjectPool_FactoryTest()
31+
{
32+
int i = 0;
33+
var pool = OptimisticArrayObjectPool.Create(() => new IdContainer { ID = ++i });
34+
Assert.AreEqual(1, pool.Take().ID);
35+
}
36+
37+
[TestMethod]
38+
public void LinkedListObjectPool_FactoryTest()
39+
{
40+
int i = 0;
41+
var pool = LinkedListObjectPool.Create(() => new IdContainer { ID = ++i });
42+
Assert.AreEqual(1, pool.Take().ID);
43+
}
44+
}
45+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp1.1</TargetFramework>
5+
<RootNamespace>Open.Disposable</RootNamespace>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
10+
<PackageReference Include="MSTest.TestAdapter" Version="1.1.11" />
11+
<PackageReference Include="MSTest.TestFramework" Version="1.1.11" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<ProjectReference Include="..\source\Open.Disposable.ObjectPools.csproj" />
16+
</ItemGroup>
17+
18+
<ItemGroup>
19+
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
20+
</ItemGroup>
21+
22+
</Project>

source/BufferBlockObjectPool.cs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using System.Threading.Tasks.Dataflow;
7+
8+
namespace Open.Disposable
9+
{
10+
/// <summary>
11+
/// An ObjectPool that when .Take() is called will return the first possible item even if one is returned to the pool before the generator function completes.
12+
/// </summary>
13+
/// <typeparam name="T">The reference type contained.</typeparam>
14+
[DebuggerDisplay("Count = {Count}")]
15+
public class BufferBlockObjectPool<T> : TrimmableObjectPoolBase<T>
16+
where T : class
17+
{
18+
/// <summary>
19+
/// Constructs an ObjectPool that when .Take() is called will return the first possible item even if one is returned to the pool before the generator function completes.
20+
/// </summary>
21+
/// <param name="factory">The generator function that creates the items.</param>
22+
/// <param name="maxSize">The maximum size of the object pool. Default is ushort.MaxValue (65535).</param>
23+
public BufferBlockObjectPool(
24+
Func<T> factory,
25+
int capacity = DEFAULT_CAPACITY) : base(factory, capacity)
26+
{
27+
_pool = new BufferBlock<T>(new DataflowBlockOptions()
28+
{
29+
BoundedCapacity = capacity
30+
});
31+
}
32+
33+
protected BufferBlock<T> _pool;
34+
35+
Task<bool> Generate(out Task<T> actual)
36+
{
37+
actual = Task.Run(Factory);
38+
return actual.ContinueWith(t =>
39+
t.Status == TaskStatus.RanToCompletion && (_pool?.Post(t.Result) ?? false) // .Post is synchronous but should return immediately if bounded capacity is met.
40+
);
41+
}
42+
43+
public override int Count => _pool?.Count ?? 0;
44+
45+
protected override sealed T TryTakeInternal()
46+
{
47+
var p = _pool;
48+
if (p == null) return null;
49+
p.TryReceive(out T item);
50+
return item;
51+
}
52+
53+
protected override async Task<T> TakeAsyncInternal()
54+
{
55+
CancellationTokenSource ts = null; // If something goes wrong we need a way to cancel.
56+
var taken = _pool?.ReceiveAsync((ts = new CancellationTokenSource()).Token);
57+
if (taken != null)
58+
{
59+
while (taken.Status != TaskStatus.RanToCompletion)
60+
{
61+
// Ok we need to push into the pool...
62+
var generated = Generate(out Task<T> actual);
63+
64+
if (await Task.WhenAny(taken, generated) == generated && !generated.Result)
65+
{
66+
// ^^^ Not received yet and was not added to pool? Uh-oh...
67+
ts.Cancel(); // Since the generator failed or was unable to be added, then cancel waiting to recieve it.
68+
69+
// Was it actually cancelled?
70+
if (await taken.ContinueWith(t => t.IsCanceled)) // || t.IsFaulted ... .ReceiveAsync should never fault.
71+
{
72+
if (actual.IsFaulted)
73+
throw actual.Exception; // Possible generator failure.
74+
if (actual.Status == TaskStatus.RanToCompletion)
75+
return actual.Result; // Don't let it go to waste.
76+
77+
// This is a rare edge case where there's no fault but did not complete. Effectively erroneous.
78+
Debug.Fail("Somehow the generate task did not complete and had no fault.");
79+
return base.Take();
80+
}
81+
}
82+
}
83+
84+
OnTakenFrom();
85+
return taken.Result;
86+
}
87+
88+
return base.Take(); // Pool is closed/completed. Just do this here without queueing.
89+
}
90+
91+
public override sealed T Take()
92+
{
93+
// See if there's one available already.
94+
if (TryTake(out T firstTry))
95+
return firstTry;
96+
97+
if (_pool == null)
98+
return Factory();
99+
100+
return TakeAsync().Result;
101+
}
102+
103+
protected override void OnDispose(bool calledExplicitly)
104+
{
105+
var pool = Nullify(ref _pool);
106+
if (pool != null)
107+
{
108+
pool.Complete(); // No more... You're done...
109+
if(pool.TryReceiveAll(out IList<T> items)) // Empty out...
110+
items.Clear();
111+
}
112+
}
113+
114+
protected override bool GiveInternal(T item)
115+
{
116+
return (item == null ? null : _pool)
117+
?.Post(item)
118+
?? false;
119+
}
120+
121+
protected override Task<bool> GiveInternalAsync(T item)
122+
{
123+
return (item == null ? null : _pool)
124+
?.SendAsync(item)
125+
?? Task.FromResult(false);
126+
}
127+
128+
}
129+
130+
public static class BufferBlockObjectPool
131+
{
132+
public static BufferBlockObjectPool<T> Create<T>(Func<T> factory, int capacity = Constants.DEFAULT_CAPACITY)
133+
where T : class
134+
{
135+
return new BufferBlockObjectPool<T>(factory, capacity);
136+
}
137+
138+
public static BufferBlockObjectPool<T> Create<T>(int capacity = Constants.DEFAULT_CAPACITY)
139+
where T : class, new()
140+
{
141+
return Create(() => new T(), capacity);
142+
}
143+
}
144+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace Open.Disposable
8+
{
9+
public class CollectionWrapperObjectPool<T, TCollection> : TrimmableObjectPoolBase<T>
10+
where T : class
11+
where TCollection : class, ICollection<T>
12+
{
13+
public CollectionWrapperObjectPool(TCollection pool, Func<T> factory, int capacity = DEFAULT_CAPACITY) : base(factory, capacity)
14+
{
15+
Pool = pool;
16+
}
17+
18+
protected TCollection Pool;
19+
20+
public override int Count => Pool?.Count ?? 0;
21+
22+
protected override bool GiveInternal(T item)
23+
{
24+
if (item != null)
25+
{
26+
var p = Pool;
27+
if (p != null && p.Count < MaxSize)
28+
{
29+
lock (p) p.Add(item); // It's possible that the count could exceed MaxSize here, but the risk is negligble as a few over the limit won't hurt.
30+
return true;
31+
}
32+
}
33+
34+
return false;
35+
}
36+
37+
protected override T TryTakeInternal()
38+
{
39+
retry:
40+
41+
var p = Pool;
42+
var item = p?.FirstOrDefault();
43+
if (item != null)
44+
{
45+
/* Removing the first item is typically horribly inefficient but we can't make assumptions about the implementation here.
46+
* It's a trade off between potentially iterating the entire collection before removing the last item, or relying on the underlying implementation.
47+
* This implementation is in place for reference more than practice. Sub classes should override. */
48+
49+
bool wasRemoved = false;
50+
lock (p) wasRemoved = p.Remove(item);
51+
if (!wasRemoved) goto retry;
52+
}
53+
54+
return item;
55+
}
56+
57+
protected override void OnDispose(bool calledExplicitly)
58+
{
59+
Nullify(ref Pool)?.Clear();
60+
}
61+
}
62+
63+
public class CollectionWrapperObjectPool<T> : CollectionWrapperObjectPool<T, ICollection<T>>
64+
where T : class
65+
{
66+
public CollectionWrapperObjectPool(ICollection<T> pool, Func<T> factory, int capacity = DEFAULT_CAPACITY) : base(pool, factory, capacity)
67+
{
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)