-
Notifications
You must be signed in to change notification settings - Fork 609
fix(server): fix the scheduler and the scheduler selection logic #2937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -68,6 +69,7 @@ | |
| import org.apache.hugegraph.config.TypedOption; | ||
| import org.apache.hugegraph.event.EventHub; | ||
| import org.apache.hugegraph.exception.ExistedException; | ||
| import org.apache.hugegraph.exception.NotFoundException; | ||
| import org.apache.hugegraph.exception.NotSupportException; | ||
| import org.apache.hugegraph.io.HugeGraphSONModule; | ||
| import org.apache.hugegraph.k8s.K8sDriver; | ||
|
|
@@ -195,7 +197,17 @@ public final class GraphManager { | |
| public GraphManager(HugeConfig conf, EventHub hub) { | ||
| LOG.info("Init graph manager"); | ||
| E.checkArgumentNotNull(conf, "The config can't be null"); | ||
|
|
||
| // Auto-generate server.id if not configured. | ||
| // Random generation is to prevent duplicate id error reports.This id is currently | ||
| // meaningless and needs to be completely removed serverInfoManager in | ||
| // the future | ||
| String server = conf.get(ServerOptions.SERVER_ID); | ||
| if (StringUtils.isEmpty(server)) { | ||
| server = "server-" + UUID.randomUUID().toString().substring(0, 8); | ||
| LOG.info("Auto-generated server.id: {}", server); | ||
| conf.setProperty(ServerOptions.SERVER_ID.name(), server); | ||
| } | ||
| String role = conf.get(ServerOptions.SERVER_ROLE); | ||
|
|
||
| this.config = conf; | ||
|
|
@@ -206,10 +218,6 @@ public GraphManager(HugeConfig conf, EventHub hub) { | |
| conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S); | ||
| this.startIgnoreSingleGraphError = conf.get( | ||
| ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR); | ||
| E.checkArgument(server != null && !server.isEmpty(), | ||
| "The server name can't be null or empty"); | ||
| E.checkArgument(role != null && !role.isEmpty(), | ||
| "The server role can't be null or empty"); | ||
| this.graphsDir = conf.get(ServerOptions.GRAPHS); | ||
| this.cluster = conf.get(ServerOptions.CLUSTER); | ||
| this.graphSpaces = new ConcurrentHashMap<>(); | ||
|
|
@@ -1557,6 +1565,14 @@ private void loadGraph(String name, String graphConfPath) { | |
| String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS); | ||
| config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), | ||
| raftGroupPeers); | ||
|
|
||
| // Transfer `pd.peers` from server config to graph config | ||
| // Only inject if not already configured in graph config | ||
| if (!config.containsKey("pd.peers")) { | ||
| String pdPeers = this.conf.get(ServerOptions.PD_PEERS); | ||
| config.addProperty("pd.peers", pdPeers); | ||
| } | ||
|
|
||
| this.transferRoleWorkerConfig(config); | ||
|
|
||
| Graph graph = GraphFactory.open(config); | ||
|
|
@@ -1637,10 +1653,6 @@ private void checkBackendVersionOrExit(HugeConfig config) { | |
| private void initNodeRole() { | ||
| String id = config.get(ServerOptions.SERVER_ID); | ||
| String role = config.get(ServerOptions.SERVER_ROLE); | ||
| E.checkArgument(StringUtils.isNotEmpty(id), | ||
| "The server name can't be null or empty"); | ||
| E.checkArgument(StringUtils.isNotEmpty(role), | ||
| "The server role can't be null or empty"); | ||
|
|
||
| NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); | ||
|
Tsukilc marked this conversation as resolved.
|
||
| boolean supportRoleElection = !nodeRole.computer() && | ||
|
|
@@ -1960,7 +1972,7 @@ public HugeGraph graph(String graphSpace, String name) { | |
| } else if (graph instanceof HugeGraph) { | ||
| return (HugeGraph) graph; | ||
| } | ||
| throw new NotSupportException("graph instance of %s", graph.getClass()); | ||
| throw new NotFoundException(String.format("Graph '%s' does not exist", name)); | ||
|
Tsukilc marked this conversation as resolved.
|
||
| } | ||
|
Comment on lines
1972
to
1976
|
||
|
|
||
| public void dropGraphLocal(String name) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph { | |
| private final BackendStoreProvider storeProvider; | ||
| private final TinkerPopTransaction tx; | ||
| private final RamTable ramtable; | ||
| private final String schedulerType; | ||
| private volatile boolean started; | ||
| private volatile boolean closed; | ||
| private volatile GraphMode mode; | ||
|
|
@@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) { | |
| this.closed = false; | ||
| this.mode = GraphMode.NONE; | ||
| this.readMode = GraphReadMode.OLTP_ONLY; | ||
| this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE); | ||
|
|
||
| LockUtil.init(this.spaceGraphName()); | ||
|
|
||
|
|
@@ -315,6 +313,7 @@ public String backend() { | |
| return this.storeProvider.type(); | ||
| } | ||
|
|
||
| @Override | ||
| public BackendStoreInfo backendStoreInfo() { | ||
| // Just for trigger Tx.getOrNewTransaction, then load 3 stores | ||
| // TODO: pass storeProvider.metaStore() | ||
|
|
@@ -465,6 +464,7 @@ public void updateTime(Date updateTime) { | |
| this.updateTime = updateTime; | ||
| } | ||
|
|
||
| @Override | ||
| public void waitStarted() { | ||
| // Just for trigger Tx.getOrNewTransaction, then load 3 stores | ||
| this.schemaTransaction(); | ||
|
|
@@ -1629,7 +1629,9 @@ public <T> void submitEphemeralJob(EphemeralJob<T> job) { | |
|
|
||
| @Override | ||
| public String schedulerType() { | ||
| return StandardHugeGraph.this.schedulerType; | ||
| // Use distributed scheduler for hstore backend, otherwise use local | ||
| // After the merger of rocksdb and hstore, consider whether to change this logic | ||
| return StandardHugeGraph.this.isHstore() ? "distributed" : "local"; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,9 @@ | |
|
|
||
| import java.util.Iterator; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
|
|
@@ -48,6 +50,7 @@ | |
| import org.slf4j.Logger; | ||
|
|
||
| public class DistributedTaskScheduler extends TaskAndResultScheduler { | ||
|
|
||
| private static final Logger LOG = Log.logger(DistributedTaskScheduler.class); | ||
| private final long schedulePeriod; | ||
| private final ExecutorService taskDbExecutor; | ||
|
|
@@ -118,6 +121,11 @@ private static boolean sleep(long ms) { | |
| public void cronSchedule() { | ||
| // Perform periodic scheduling tasks | ||
|
|
||
| // Check closed flag first to exit early | ||
| if (this.closed.get()) { | ||
| return; | ||
| } | ||
|
|
||
| if (!this.graph.started() || this.graph.closed()) { | ||
| return; | ||
| } | ||
|
|
@@ -253,6 +261,10 @@ public <V> Future<?> schedule(HugeTask<V> task) { | |
| return this.ephemeralTaskExecutor.submit(task); | ||
| } | ||
|
|
||
| // Validate task state before saving to ensure correct exception type | ||
| E.checkState(task.type() != null, "Task type can't be null"); | ||
| E.checkState(task.name() != null, "Task name can't be null"); | ||
|
|
||
| // Process schema task | ||
| // Handle gremlin task | ||
| // Handle OLAP calculation tasks | ||
|
|
@@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Note: This method will update the status of the input task. | ||
| * | ||
| * @param task | ||
| * @param <V> | ||
| */ | ||
| @Override | ||
| public <V> void cancel(HugeTask<V> task) { | ||
| // Update status to CANCELLING | ||
| if (!task.completed()) { | ||
| // Task not completed, can only execute status not CANCELLING | ||
| this.updateStatus(task.id(), null, TaskStatus.CANCELLING); | ||
| E.checkArgumentNotNull(task, "Task can't be null"); | ||
|
|
||
| if (task.completed() || task.cancelling()) { | ||
| return; | ||
| } | ||
|
|
||
| LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); | ||
|
|
||
| // Check if task is running locally, cancel it directly if so | ||
| HugeTask<?> runningTask = this.runningTasks.get(task.id()); | ||
| if (runningTask != null) { | ||
| boolean cancelled = runningTask.cancel(true); | ||
| if (cancelled) { | ||
| task.overwriteStatus(TaskStatus.CANCELLED); | ||
| } | ||
| LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled); | ||
| return; | ||
| } | ||
|
|
||
| // Task not running locally, update status to CANCELLING | ||
| // for cronSchedule() or other nodes to handle | ||
| TaskStatus currentStatus = task.status(); | ||
| if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { | ||
| LOG.info("Failed to cancel task '{}', status may have changed from {}", | ||
| task.id(), currentStatus); | ||
| } else { | ||
| LOG.info("cancel task({}) error, task has completed", task.id()); | ||
| task.overwriteStatus(TaskStatus.CANCELLING); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -316,14 +355,18 @@ protected <V> HugeTask<V> deleteFromDB(Id id) { | |
|
|
||
| @Override | ||
| public <V> HugeTask<V> delete(Id id, boolean force) { | ||
| if (!force) { | ||
| // Change status to DELETING, perform the deletion operation through automatic | ||
| // scheduling. | ||
| HugeTask<?> task = this.taskWithoutResult(id); | ||
|
|
||
| if (!force && !task.completed()) { | ||
| // Check task status: can't delete running tasks without force | ||
| this.updateStatus(id, null, TaskStatus.DELETING); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleting a running task can race with the task's own cancellation callbacks and resurrect the task record. In this branch the scheduler cancels the in-memory task and immediately removes the task vertex from DB, but |
||
| return null; | ||
| } else { | ||
| return this.deleteFromDB(id); | ||
| // Already in DELETING status, delete directly from DB | ||
| // Completed tasks can also be deleted directly | ||
| } | ||
|
|
||
| // Delete from DB directly for completed/DELETING tasks or force=true | ||
| return this.deleteFromDB(id); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -353,6 +396,18 @@ public boolean close() { | |
| cronFuture.cancel(false); | ||
| } | ||
|
|
||
| // Wait for cron task to complete to ensure all transactions are closed | ||
| try { | ||
| cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS); | ||
| } catch (CancellationException e) { | ||
| // Task was cancelled, this is expected | ||
| LOG.debug("Cron task was cancelled"); | ||
| } catch (TimeoutException e) { | ||
| LOG.warn("Cron task did not complete in time when closing scheduler"); | ||
| } catch (ExecutionException | InterruptedException e) { | ||
| LOG.warn("Exception while waiting for cron task to complete", e); | ||
| } | ||
|
|
||
| if (!this.taskDbExecutor.isShutdown()) { | ||
| this.call(() -> { | ||
| try { | ||
|
|
@@ -363,7 +418,10 @@ public boolean close() { | |
| this.graph.closeTx(); | ||
| }); | ||
| } | ||
| return true; | ||
|
|
||
| //todo: serverInfoManager section should be removed in the future. | ||
| return this.serverManager().close(); | ||
| //return true; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.