-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathDefaultDatabaseWriter.cs
More file actions
171 lines (150 loc) · 7.4 KB
/
DefaultDatabaseWriter.cs
File metadata and controls
171 lines (150 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
using Kusto.Data;
using KustoSchemaTools.Changes;
using KustoSchemaTools.Model;
using KustoSchemaTools.Parser.KustoLoader;
using KustoSchemaTools.Plugins;
using Microsoft.Extensions.Logging;
using System.Text;
namespace KustoSchemaTools.Parser.KustoWriter
{
public class DefaultDatabaseWriter : IDBEntityWriter
{
public async Task WriteAsync(Database sourceDb, Database targetDb, KustoClient client, ILogger logger)
{
var followerMeta = FollowerLoader.LoadFollower(targetDb.Name, client);
// Treat as follower only when metadata exists (.show follower database returned a row)
var isFollower = followerMeta.IsFollower;
List<IChange> changes;
if (isFollower)
{
// Build desired follower from YAML/source DB
var desiredFollower = new FollowerDatabase
{
DatabaseName = targetDb.Name,
Permissions = new FollowerPermissions
{
ModificationKind = followerMeta.Permissions.ModificationKind,
Admins = sourceDb.Admins,
Viewers = sourceDb.Viewers,
LeaderName = followerMeta.Permissions.LeaderName
},
Cache = followerMeta.Cache
};
changes = DatabaseChanges.GenerateFollowerChanges(followerMeta, desiredFollower, logger);
}
else
{
changes = DatabaseChanges.GenerateChanges(targetDb, sourceDb, targetDb.Name, logger);
}
var results = await ApplyChangesToDatabase(targetDb.Name, changes, client, logger);
foreach (var result in results)
{
Console.WriteLine($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})");
Console.WriteLine("---------------------------------------------------------------------------");
}
foreach (var follower in sourceDb.Followers)
{
var followerClient = new KustoClient(follower.Key);
var source = FollowerLoader.LoadFollower(follower.Value.DatabaseName, followerClient);
var followerChanges = DatabaseChanges.GenerateFollowerChanges(source, follower.Value, logger);
var followerResults = await ApplyChangesToDatabase(follower.Value.DatabaseName, followerChanges, followerClient, logger);
results.AddRange(followerResults);
Console.WriteLine();
Console.WriteLine($"Follower: {follower.Key}");
Console.WriteLine("---------------------------------------------------------------------------");
Console.WriteLine();
foreach (var result in followerResults)
{
Console.WriteLine($"{result.CommandType} ({result.OperationId}): {result.Result} => {result.Reason} ({result.CommandText})");
Console.WriteLine("---------------------------------------------------------------------------");
}
Console.WriteLine();
Console.WriteLine();
}
var exs = results.Where(itm => itm.Result == "Failed").Select(itm => new Exception($"Execution failed for command \n{itm.CommandText} \n with reason\n{itm.Reason}")).ToList();
if (exs.Count == 1)
{
throw exs[0];
}
if (exs.Count > 1)
{
throw new AggregateException(exs);
}
}
private async Task<List<ScriptExecuteCommandResult>> ApplyChangesToDatabase(string databaseName, List<IChange> changes, KustoClient client, ILogger logger)
{
var scripts = changes
.SelectMany(itm => itm.Scripts)
.Where(itm => itm.Order >= 0)
.Where(itm => itm.IsValid == true)
.OrderBy(itm => itm.Order)
.ToList();
var results = new List<ScriptExecuteCommandResult>();
var batch = new List<DatabaseScriptContainer>();
foreach (var sc in scripts)
{
if (sc.IsAsync == false)
{
batch.Add(sc);
continue;
}
else
{
var batchResults = await ExecutePendingSync(databaseName, client, logger, batch);
results.AddRange(batchResults);
var asyncResult = await ExecuteAsyncCommand(databaseName, client, logger, sc);
results.Add(asyncResult);
}
}
var finalBatchResults = await ExecutePendingSync(databaseName, client, logger, batch);
results.AddRange(finalBatchResults);
return results;
}
private async Task<ScriptExecuteCommandResult> ExecuteAsyncCommand(string databaseName, KustoClient client, ILogger logger, DatabaseScriptContainer sc)
{
var interval = TimeSpan.FromSeconds(5);
var iterations = (int)(TimeSpan.FromHours(1) / interval);
var result = await client.AdminClient.ExecuteControlCommandAsync(databaseName, sc.Text);
var operationId = result.ToScalar<Guid>();
var finalState = false;
string monitoringCommand = $".show operations | where OperationId == '{operationId}' " +
"| summarize arg_max(LastUpdatedOn, *) by OperationId " +
"| project OperationId, CommandType = Operation, Result = State, Reason = Status";
int cnt = 0;
while (!finalState)
{
if(cnt++ >= iterations)
{
finalState = true;
}
logger.LogInformation($"Waiting for operation {operationId} to complete... current iteration: {cnt}/{iterations}");
var monitoringResult = client.Client.ExecuteQuery(databaseName, monitoringCommand, new Kusto.Data.Common.ClientRequestProperties());
var operationState = monitoringResult.As<ScriptExecuteCommandResult>().FirstOrDefault();
if (operationState != null && operationState?.IsFinal() == true)
{
operationState.CommandText = sc.Text;
return operationState;
}
await Task.Delay(interval);
}
throw new Exception("Operation did not complete in a reasonable time");
}
private static async Task<List<ScriptExecuteCommandResult>> ExecutePendingSync(string databaseName, KustoClient client, ILogger logger, List<DatabaseScriptContainer> scripts)
{
if(scripts.Any() == false)
{
return new List<ScriptExecuteCommandResult>();
}
var sb = new StringBuilder();
sb.AppendLine(".execute script with(ContinueOnErrors = true) <|");
foreach (var sc in scripts)
{
sb.AppendLine(sc.Text);
}
var script = sb.ToString();
logger.LogInformation($"Applying sript:\n{script}");
var result = await client.AdminClient.ExecuteControlCommandAsync(databaseName, script);
return result.As<ScriptExecuteCommandResult>();
}
}
}