@@ -92,7 +92,7 @@ abstract protected static function schedule(self $response, array &$runningRespo
92
92
/**
93
93
* Performs all pending non-blocking operations.
94
94
*/
95
- abstract protected static function perform (ClientState $ multi , array & $ responses ): void ;
95
+ abstract protected static function perform (ClientState $ multi , array $ responses ): void ;
96
96
97
97
/**
98
98
* Waits for network activity.
@@ -150,10 +150,15 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
150
150
$ lastActivity = hrtime (true ) / 1E9 ;
151
151
$ elapsedTimeout = 0 ;
152
152
153
- if ($ fromLastTimeout = 0.0 === $ timeout && '-0 ' === (string ) $ timeout ) {
154
- $ timeout = null ;
155
- } elseif ($ fromLastTimeout = 0 > $ timeout ) {
156
- $ timeout = -$ timeout ;
153
+ if ((0.0 === $ timeout && '-0 ' === (string ) $ timeout ) || 0 > $ timeout ) {
154
+ $ timeout = $ timeout ? -$ timeout : null ;
155
+
156
+ /** @var ClientState $multi */
157
+ foreach ($ runningResponses as [$ multi ]) {
158
+ if (null !== $ multi ->lastTimeout ) {
159
+ $ elapsedTimeout = max ($ elapsedTimeout , $ lastActivity - $ multi ->lastTimeout );
160
+ }
161
+ }
157
162
}
158
163
159
164
while (true ) {
@@ -162,35 +167,33 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
162
167
$ timeoutMin = $ timeout ?? \INF ;
163
168
164
169
/** @var ClientState $multi */
165
- foreach ($ runningResponses as $ i => [$ multi ]) {
166
- $ responses = &$ runningResponses [$ i ][1 ];
170
+ foreach ($ runningResponses as $ i => [$ multi , &$ responses ]) {
167
171
self ::perform ($ multi , $ responses );
168
172
169
173
foreach ($ responses as $ j => $ response ) {
170
174
$ timeoutMax = $ timeout ?? max ($ timeoutMax , $ response ->timeout );
171
175
$ timeoutMin = min ($ timeoutMin , $ response ->timeout , 1 );
172
176
$ chunk = false ;
173
177
174
- if ($ fromLastTimeout && null !== $ multi ->lastTimeout ) {
175
- $ elapsedTimeout = hrtime (true ) / 1E9 - $ multi ->lastTimeout ;
176
- }
177
-
178
178
if (isset ($ multi ->handlesActivity [$ j ])) {
179
179
$ multi ->lastTimeout = null ;
180
+ $ elapsedTimeout = 0 ;
180
181
} elseif (!isset ($ multi ->openHandles [$ j ])) {
182
+ $ hasActivity = true ;
181
183
unset($ responses [$ j ]);
182
184
continue ;
183
185
} elseif ($ elapsedTimeout >= $ timeoutMax ) {
184
186
$ multi ->handlesActivity [$ j ] = [new ErrorChunk ($ response ->offset , sprintf ('Idle timeout reached for "%s". ' , $ response ->getInfo ('url ' )))];
185
187
$ multi ->lastTimeout ??= $ lastActivity ;
188
+ $ elapsedTimeout = $ timeoutMax ;
186
189
} else {
187
190
continue ;
188
191
}
189
192
190
- while ($ multi ->handlesActivity [$ j ] ?? false ) {
191
- $ hasActivity = true ;
192
- $ elapsedTimeout = 0 ;
193
+ $ lastActivity = null ;
194
+ $ hasActivity = true ;
193
195
196
+ while ($ multi ->handlesActivity [$ j ] ?? false ) {
194
197
if (\is_string ($ chunk = array_shift ($ multi ->handlesActivity [$ j ]))) {
195
198
if (null !== $ response ->inflate && false === $ chunk = @inflate_add ($ response ->inflate , $ chunk )) {
196
199
$ multi ->handlesActivity [$ j ] = [null , new TransportException (sprintf ('Error while processing content unencoding for "%s". ' , $ response ->getInfo ('url ' )))];
@@ -227,7 +230,6 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
227
230
}
228
231
} elseif ($ chunk instanceof ErrorChunk) {
229
232
unset($ responses [$ j ]);
230
- $ elapsedTimeout = $ timeoutMax ;
231
233
} elseif ($ chunk instanceof FirstChunk) {
232
234
if ($ response ->logger ) {
233
235
$ info = $ response ->getInfo ();
@@ -274,10 +276,12 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
274
276
if ($ chunk instanceof ErrorChunk && !$ chunk ->didThrow ()) {
275
277
// Ensure transport exceptions are always thrown
276
278
$ chunk ->getContent ();
279
+ throw new \LogicException ('A transport exception should have been thrown. ' );
277
280
}
278
281
}
279
282
280
283
if (!$ responses ) {
284
+ $ hasActivity = true ;
281
285
unset($ runningResponses [$ i ]);
282
286
}
283
287
@@ -291,7 +295,7 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
291
295
}
292
296
293
297
if ($ hasActivity ) {
294
- $ lastActivity = hrtime (true ) / 1E9 ;
298
+ $ lastActivity ?? = hrtime (true ) / 1E9 ;
295
299
continue ;
296
300
}
297
301
0 commit comments