@@ -29,8 +29,12 @@ import (
2929 "github.com/compose-spec/compose-go/v2/types"
3030 containerType "github.com/moby/moby/api/types/container"
3131 "github.com/moby/moby/client"
32+ "go.opentelemetry.io/otel"
33+ "go.opentelemetry.io/otel/attribute"
34+ "go.opentelemetry.io/otel/codes"
3235 "golang.org/x/sync/errgroup"
3336
37+ "github.com/docker/compose/v5/internal/tracing"
3438 "github.com/docker/compose/v5/pkg/api"
3539)
3640
@@ -148,19 +152,15 @@ func (es *executionState) resolveSharedNamespaces(service *types.ServiceConfig)
148152// ExecutePlan executes a reconciliation plan using DAG traversal similar to
149153// graphTraversal.visit() in dependencies.go. Operations are executed
150154// concurrently, respecting dependency ordering.
151- func (s * composeService ) ExecutePlan (ctx context.Context , project * types.Project , plan * ReconciliationPlan ) error {
155+ func (s * composeService ) ExecutePlan (ctx context.Context , project * types.Project , plan * ReconciliationPlan , observed * ObservedState ) error {
152156 if plan .IsEmpty () {
153157 return nil
154158 }
155159
156160 // Pre-populate execution state with existing containers so that
157161 // resolveServiceReferences can find containers for services not
158162 // included in the plan (e.g. --no-deps scenarios).
159- allContainers , err := s .getContainers (ctx , project .Name , oneOffExclude , true )
160- if err != nil {
161- return err
162- }
163- state := newExecutionStateFrom (allContainers )
163+ state := newExecutionStateFrom (observed .allContainers ())
164164
165165 // Build dependency count map: number of unsatisfied deps per operation.
166166 // The consumer goroutine is single-threaded, so no mutex is needed for depCount.
@@ -219,6 +219,30 @@ func (s *composeService) ExecutePlan(ctx context.Context, project *types.Project
219219}
220220
221221func (s * composeService ) executeOperation (ctx context.Context , project * types.Project , op * Operation , state * executionState ) error {
222+ spanName := op .Type .String ()
223+ opts := tracing.SpanOptions {}
224+ if op .ContainerOp != nil {
225+ opts = tracing .ServiceOptions (op .ContainerOp .Service )
226+ }
227+ ctx , span := otel .Tracer ("" ).Start (ctx , spanName , opts .SpanStartOptions ()... )
228+ defer span .End ()
229+ span .SetAttributes (
230+ attribute .String ("operation.id" , op .ID ),
231+ attribute .String ("operation.resource" , op .Resource ),
232+ attribute .String ("operation.reason" , op .Reason ),
233+ )
234+
235+ err := s .dispatchOperation (ctx , project , op , state )
236+ if err != nil {
237+ span .SetStatus (codes .Error , err .Error ())
238+ span .RecordError (err )
239+ } else {
240+ span .SetStatus (codes .Ok , "" )
241+ }
242+ return err
243+ }
244+
245+ func (s * composeService ) dispatchOperation (ctx context.Context , project * types.Project , op * Operation , state * executionState ) error {
222246 switch op .Type {
223247 case OpCreateNetwork :
224248 return s .executePlanCreateNetwork (ctx , project , op , state )
@@ -397,7 +421,10 @@ func (s *composeService) executePlanRunPlugin(ctx context.Context, project *type
397421// DisplayPlan performs a topological sort of operations and displays them
398422// grouped by resource type.
399423func DisplayPlan (plan * ReconciliationPlan , w io.Writer ) error {
400- ops := topologicalSort (plan )
424+ ops , err := topologicalSort (plan )
425+ if err != nil {
426+ return err
427+ }
401428
402429 // Group operations by category
403430 var networkOps , volumeOps []* Operation
@@ -497,7 +524,8 @@ func opVerb(t OperationType) string {
497524}
498525
499526// topologicalSort returns operations in dependency order using Kahn's algorithm.
500- func topologicalSort (plan * ReconciliationPlan ) []* Operation {
527+ // Returns an error if the dependency graph contains a cycle.
528+ func topologicalSort (plan * ReconciliationPlan ) ([]* Operation , error ) {
501529 inDegree := make (map [string ]int , len (plan .Operations ))
502530 for _ , op := range plan .Operations {
503531 inDegree [op .ID ] = len (op .DependsOn )
@@ -529,5 +557,16 @@ func topologicalSort(plan *ReconciliationPlan) []*Operation {
529557 queue = append (queue , next ... )
530558 }
531559
532- return sorted
560+ if len (sorted ) != len (plan .Operations ) {
561+ var cycled []string
562+ for id , degree := range inDegree {
563+ if degree > 0 {
564+ cycled = append (cycled , id )
565+ }
566+ }
567+ sort .Strings (cycled )
568+ return nil , fmt .Errorf ("dependency cycle detected involving operations: %v" , cycled )
569+ }
570+
571+ return sorted , nil
533572}
0 commit comments