@@ -13,6 +13,7 @@ import { UnsuccessfulWorkflowExecution } from '../exception/exception';
13
13
import { standardFormats } from '../formats' ;
14
14
import { DryRunEvent , DryRunSink } from '../sink/dryrun' ;
15
15
import { HostSink } from '../sink/host' ;
16
+ import { Sink } from '../sink/sink' ;
16
17
import { HostTree } from '../tree/host-tree' ;
17
18
import { Tree } from '../tree/interface' ;
18
19
import { optimize } from '../tree/static' ;
@@ -96,6 +97,35 @@ export abstract class BaseWorkflow implements Workflow {
96
97
return this . _lifeCycle . asObservable ( ) ;
97
98
}
98
99
100
+ protected _createSinks ( ) : Sink [ ] {
101
+ let error = false ;
102
+
103
+ const dryRunSink = new DryRunSink ( this . _host , this . _force ) ;
104
+ const dryRunSubscriber = dryRunSink . reporter . subscribe ( event => {
105
+ this . _reporter . next ( event ) ;
106
+ error = error || ( event . kind == 'error' ) ;
107
+ } ) ;
108
+
109
+ // We need two sinks if we want to output what will happen, and actually do the work.
110
+ return [
111
+ dryRunSink ,
112
+ // Add a custom sink that clean ourselves and throws an error if an error happened.
113
+ {
114
+ commit ( ) {
115
+ dryRunSubscriber . unsubscribe ( ) ;
116
+ if ( error ) {
117
+ return throwError ( new UnsuccessfulWorkflowExecution ( ) ) ;
118
+ }
119
+
120
+ return of ( ) ;
121
+ } ,
122
+ } ,
123
+
124
+ // Only add a HostSink if this is not a dryRun.
125
+ ...( ! this . _dryRun ? [ new HostSink ( this . _host , this . _force ) ] : [ ] ) ,
126
+ ] ;
127
+ }
128
+
99
129
execute (
100
130
options : Partial < WorkflowExecutionContext > & RequiredWorkflowExecutionContext ,
101
131
) : Observable < void > {
@@ -112,17 +142,7 @@ export abstract class BaseWorkflow implements Workflow {
112
142
|| ( parentContext && parentContext . collection === options . collection ) ;
113
143
const schematic = collection . createSchematic ( options . schematic , allowPrivate ) ;
114
144
115
- // We need two sinks if we want to output what will happen, and actually do the work.
116
- // Note that fsSink is technically not used if `--dry-run` is passed, but creating the Sink
117
- // does not have any side effect.
118
- const dryRunSink = new DryRunSink ( this . _host , this . _force ) ;
119
- const fsSink = new HostSink ( this . _host , this . _force ) ;
120
-
121
- let error = false ;
122
- const dryRunSubscriber = dryRunSink . reporter . subscribe ( event => {
123
- this . _reporter . next ( event ) ;
124
- error = error || ( event . kind == 'error' ) ;
125
- } ) ;
145
+ const sinks = this . _createSinks ( ) ;
126
146
127
147
this . _lifeCycle . next ( { kind : 'workflow-start' } ) ;
128
148
@@ -141,23 +161,18 @@ export abstract class BaseWorkflow implements Workflow {
141
161
) . pipe (
142
162
map ( tree => optimize ( tree ) ) ,
143
163
concatMap ( ( tree : Tree ) => {
144
- return concat (
145
- dryRunSink . commit ( tree ) . pipe ( ignoreElements ( ) ) ,
146
- of ( tree ) ,
164
+ // Process all sinks.
165
+ return of ( tree ) . pipe (
166
+ ...sinks . map ( sink => {
167
+ return concatMap ( ( tree : Tree ) => {
168
+ return concat (
169
+ sink . commit ( tree ) . pipe ( ignoreElements ( ) ) ,
170
+ of ( tree ) ,
171
+ ) ;
172
+ } ) ;
173
+ } ) ,
147
174
) ;
148
175
} ) ,
149
- concatMap ( ( tree : Tree ) => {
150
- dryRunSubscriber . unsubscribe ( ) ;
151
- if ( error ) {
152
- return throwError ( new UnsuccessfulWorkflowExecution ( ) ) ;
153
- }
154
-
155
- if ( this . _dryRun ) {
156
- return of ( ) ;
157
- }
158
-
159
- return fsSink . commit ( tree ) . pipe ( defaultIfEmpty ( ) , last ( ) ) ;
160
- } ) ,
161
176
concatMap ( ( ) => {
162
177
if ( this . _dryRun ) {
163
178
return of ( ) ;
0 commit comments