Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 031a80a

Browse files
authored
Merge pull request #6 from jamesdbrock/improve
Read from zero-length Readable.
2 parents 53909d5 + c5b4981 commit 031a80a

File tree

6 files changed

+99
-41
lines changed

6 files changed

+99
-41
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ Bugfixes:
1212

1313
Other improvements:
1414

15+
## [Unreleased]
16+
17+
Bugfixes:
18+
19+
* Read from zero-length `Readable`.
20+
1521
# v3.0.0
1622

1723
Breaking changes:

spago.dhall

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ to generate this file without the comments in this block.
1717
, "exceptions"
1818
, "node-buffer"
1919
, "node-streams"
20+
, "nullable"
2021
, "st"
22+
, "refs"
2123
, "arrays"
2224
, "either"
2325
, "maybe"

src/Node/Stream/Aff.purs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ module Node.Stream.Aff
8787
import Prelude
8888

8989
import Control.Monad.ST.Class (liftST)
90-
import Control.Monad.ST.Ref as ST.Ref
9190
import Data.Array as Array
9291
import Data.Array.ST as Array.ST
9392
import Data.Either (Either(..))
@@ -98,6 +97,7 @@ import Effect.Aff (effectCanceler, makeAff, nonCanceler)
9897
import Effect.Aff.Class (class MonadAff, liftAff)
9998
import Effect.Class (class MonadEffect, liftEffect)
10099
import Effect.Exception (catchException)
100+
import Effect.Ref as Ref
101101
import Node.Buffer (Buffer)
102102
import Node.Buffer as Buffer
103103
import Node.Encoding as Encoding
@@ -110,7 +110,6 @@ import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable, re
110110
-- |
111111
-- | This function is useful for streams like __stdin__ which never
112112
-- | reach End-Of-File.
113-
-- |
114113
readSome
115114
:: forall m r
116115
. MonadAff m
@@ -121,6 +120,11 @@ readSome r = liftAff <<< makeAff $ \res -> do
121120

122121
removeError <- onceError r $ res <<< Left
123122

123+
removeEnd <- onceEnd r do
124+
removeError
125+
ret <- liftST $ Array.ST.unsafeFreeze bufs
126+
res (Right (Tuple ret false))
127+
124128
-- try to read right away.
125129
catchException (res <<< Left) do
126130
ifM (readable r)
@@ -133,15 +137,19 @@ readSome r = liftAff <<< makeAff $ \res -> do
133137
pure false
134138
do
135139
removeError
140+
removeEnd
136141
res (Right (Tuple [] false))
137142

138143

139144
ret1 <- liftST $ Array.ST.unsafeFreeze bufs
140-
removeReadable <- if Array.length ret1 == 0 then do
141-
-- if we couldn't read anything right away, then wait until the stream is readable.
145+
readagain <- readable r
146+
removeReadable <- if readagain && Array.length ret1 == 0 then do
147+
-- if still readable and we couldn't read anything right away,
148+
-- then wait for the readable event.
142149
-- “The 'readable' event will also be emitted once the end of the
143150
-- stream data has been reached but before the 'end' event is emitted.”
144-
-- We already checked the `readable` property so we don't have to check again.
151+
-- if not readable then this was a zero-length Readable stream.
152+
-- https://nodejs.org/api/stream.html#event-readable
145153
onceReadable r do
146154
catchException (res <<< Left) do
147155
untilE do
@@ -152,23 +160,24 @@ readSome r = liftAff <<< makeAff $ \res -> do
152160
pure false
153161
ret2 <- liftST $ Array.ST.unsafeFreeze bufs
154162
removeError
155-
readagain <- readable r
156-
res (Right (Tuple ret2 readagain))
163+
removeEnd
164+
readagain2 <- readable r
165+
res (Right (Tuple ret2 readagain2))
157166

158167
-- return what we read right away
159168
else do
160169
removeError
161-
readagain <- readable r
170+
removeEnd
162171
res (Right (Tuple ret1 readagain))
163172
pure (pure unit) -- dummy canceller
164173

165174
-- canceller might by called while waiting for `onceReadable`
166175
pure $ effectCanceler do
167176
removeError
177+
removeEnd
168178
removeReadable
169179

170180

171-
172181
-- | Read all data until the end of the stream.
173182
-- |
174183
-- | Note that __stdin__ will never end.
@@ -179,7 +188,7 @@ readAll
179188
-> m (Tuple (Array Buffer) Boolean)
180189
readAll r = liftAff <<< makeAff $ \res -> do
181190
bufs <- liftST $ Array.ST.new
182-
removeReadable <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
191+
removeReadable <- Ref.new (pure unit :: Effect Unit)
183192

184193
removeError <- onceError r $ res <<< Left
185194

@@ -192,7 +201,7 @@ readAll r = liftAff <<< makeAff $ \res -> do
192201
cleanupRethrow err = do
193202
removeError
194203
removeEnd
195-
join $ liftST $ ST.Ref.read removeReadable
204+
join $ Ref.read removeReadable
196205
res (Left err)
197206

198207
-- try to read right away.
@@ -224,15 +233,15 @@ readAll r = liftAff <<< makeAff $ \res -> do
224233
_ <- liftST $ Array.ST.push chunk bufs
225234
pure false
226235
waitToRead -- this is not recursion
227-
void $ liftST $ ST.Ref.write removeReadable' removeReadable
236+
Ref.write removeReadable' removeReadable
228237

229238
waitToRead
230239

231240
-- canceller might by called while waiting for `onceReadable`
232241
pure $ effectCanceler do
233242
removeError
234243
removeEnd
235-
join $ liftST $ ST.Ref.read removeReadable
244+
join $ Ref.read removeReadable
236245

237246

238247
-- | Wait for *N* bytes to become available from the stream.
@@ -249,9 +258,9 @@ readN
249258
-> Int
250259
-> m (Tuple (Array Buffer) Boolean)
251260
readN r n = liftAff <<< makeAff $ \res -> do
252-
redRef <- liftST $ ST.Ref.new 0
261+
redRef <- Ref.new 0
253262
bufs <- liftST $ Array.ST.new
254-
removeReadable <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
263+
removeReadable <- Ref.new (pure unit :: Effect Unit)
255264

256265
-- TODO on error, we're not calling removeEnd...
257266
removeError <- onceError r $ res <<< Left
@@ -260,22 +269,21 @@ readN r n = liftAff <<< makeAff $ \res -> do
260269
-- if there are more bytes in the stream?
261270
removeEnd <- onceEnd r do
262271
removeError
263-
-- join $ liftST $ ST.Ref.read removeReadable
264272
ret <- liftST $ Array.ST.unsafeFreeze bufs
265273
res (Right (Tuple ret false))
266274

267275
let
268276
cleanupRethrow err = do
269277
removeError
270278
removeEnd
271-
join $ liftST $ ST.Ref.read removeReadable
279+
join $ Ref.read removeReadable
272280
res (Left err)
273281

274282
-- try to read N bytes and then either return N bytes or run a continuation
275283
tryToRead continuation = do
276284
catchException cleanupRethrow do
277285
untilE do
278-
red <- liftST $ ST.Ref.read redRef
286+
red <- Ref.read redRef
279287
-- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
280288
-- “If size bytes are not available to be read, null will be returned
281289
-- unless the stream has ended, in which case all of the data remaining
@@ -285,12 +293,12 @@ readN r n = liftAff <<< makeAff $ \res -> do
285293
Just chunk -> do
286294
_ <- liftST $ Array.ST.push chunk bufs
287295
s <- Buffer.size chunk
288-
red' <- liftST $ ST.Ref.modify (_+s) redRef
296+
red' <- Ref.modify (_+s) redRef
289297
if red' >= n then
290298
pure true
291299
else
292300
pure false
293-
red <- liftST $ ST.Ref.read redRef
301+
red <- Ref.read redRef
294302
if red >= n then do
295303
removeError
296304
removeEnd
@@ -314,14 +322,14 @@ readN r n = liftAff <<< makeAff $ \res -> do
314322
waitToRead _ = do
315323
removeReadable' <- onceReadable r do
316324
tryToRead waitToRead
317-
void $ liftST $ ST.Ref.write removeReadable' removeReadable
325+
Ref.write removeReadable' removeReadable
318326
waitToRead unit
319327

320328
-- canceller might by called while waiting for `onceReadable`
321329
pure $ effectCanceler do
322330
removeError
323331
removeEnd
324-
join $ liftST $ ST.Ref.read removeReadable
332+
join $ Ref.read removeReadable
325333

326334

327335
-- | Write to a stream.
@@ -335,7 +343,7 @@ write
335343
-> m Unit
336344
write w bs = liftAff <<< makeAff $ \res -> do
337345
bufs <- liftST $ Array.ST.thaw bs
338-
removeDrain <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
346+
removeDrain <- Ref.new (pure unit :: Effect Unit)
339347

340348
removeError <- onceError w $ res <<< Left
341349

@@ -366,15 +374,15 @@ write w bs = liftAff <<< makeAff $ \res -> do
366374
pure false
367375
else do
368376
removeDrain' <- onceDrain w oneWrite
369-
void $ liftST $ ST.Ref.write removeDrain' removeDrain
377+
void $ Ref.write removeDrain' removeDrain
370378
pure true
371379

372380
oneWrite
373381

374382
-- canceller might be called while waiting for `onceDrain`
375383
pure $ effectCanceler do
376384
removeError
377-
join $ liftST $ ST.Ref.read removeDrain
385+
join $ Ref.read removeDrain
378386

379387
-- | Signal that no more data will be written to the `Writable`. Will complete
380388
-- | after all data is written and flushed.

src/Node/Stream/Internal.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import stream from 'stream';
2+
13
export const onceReadable = s => f => () => {
24
s.once('readable', f);
35
return () => {s.removeListener('readable', f);};
@@ -19,5 +21,13 @@ export const onceError = s => f => () => {
1921
}
2022

2123
export const readable = s => () => {
22-
return s.readable;
24+
return s.readable;
25+
}
26+
27+
export const push = s => buf => () => {
28+
return s.push(buf);
2329
}
30+
31+
export const newReadable = () => {
32+
return new stream.Readable();
33+
}

src/Node/Stream/Internal.purs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,20 @@ module Node.Stream.Aff.Internal
77
, onceError
88
, onceReadable
99
, readable
10+
, push
11+
, newReadable
12+
, newReadableStringUTF8
1013
)
1114
where
1215

1316
import Prelude
1417

18+
import Data.Nullable (Nullable, notNull, null)
1519
import Effect (Effect)
1620
import Effect.Exception (Error)
21+
import Node.Buffer (Buffer)
22+
import Node.Buffer as Buffer
23+
import Node.Encoding as Encoding
1724
import Node.Stream (Readable, Stream, Writable)
1825

1926
-- | Listen for one `readable` event, call the callback, then remove
@@ -69,3 +76,27 @@ foreign import readable
6976
:: forall r
7077
. Readable r
7178
-> Effect Boolean
79+
80+
81+
-- | [`readable.push(chunk[, encoding])`](https://nodejs.org/api/stream.html#readablepushchunk-encoding)
82+
foreign import push
83+
:: forall r
84+
. Readable r
85+
-> Nullable Buffer
86+
-> Effect Boolean
87+
88+
-- | `new stream.Readable()`
89+
foreign import newReadable
90+
:: forall r
91+
. Effect (Readable r)
92+
93+
-- | Construct a `Readable` from a `String`.
94+
newReadableStringUTF8
95+
:: forall r
96+
. String
97+
-> Effect (Readable r)
98+
newReadableStringUTF8 strng = do
99+
rstream <- newReadable
100+
_ <- push rstream =<< (notNull <$> Buffer.fromString strng Encoding.UTF8)
101+
_ <- push rstream null -- the end of the stream
102+
pure rstream

test/Main.purs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,27 @@ module Test.Main where
1010
import Prelude
1111

1212
import Data.Array as Array
13-
import Data.Either (Either(..))
1413
import Data.Maybe (Maybe(..))
1514
import Data.Tuple (Tuple(..), fst)
1615
import Effect (Effect)
17-
import Effect.Aff (Error, Milliseconds(..), runAff_)
16+
import Effect.Aff (Milliseconds(..), launchAff_)
1817
import Effect.Class (liftEffect)
19-
import Effect.Class.Console as Console
2018
import Node.Buffer (Buffer, concat)
2119
import Node.Buffer as Buffer
2220
import Node.Encoding (Encoding(..))
2321
import Node.FS.Stream (createReadStream, createWriteStream)
24-
import Node.Stream.Aff (end, readAll, readN, readSome, write)
22+
import Node.Stream.Aff (end, readAll, readN, readSome, toStringUTF8, write)
23+
import Node.Stream.Aff.Internal (newReadableStringUTF8)
2524
import Partial.Unsafe (unsafePartial)
2625
import Test.Spec (describe, it)
2726
import Test.Spec.Assertions (expectError, shouldEqual)
2827
import Test.Spec.Reporter (consoleReporter)
29-
import Test.Spec.Runner (defaultConfig, runSpecT)
30-
import Unsafe.Coerce (unsafeCoerce)
31-
32-
completion :: Either Error (Effect Unit) -> Effect Unit
33-
completion = case _ of
34-
Left e -> Console.error (unsafeCoerce e)
35-
Right f -> f
28+
import Test.Spec.Runner (defaultConfig, runSpec')
3629

3730
main :: Effect Unit
3831
main = unsafePartial $ do
39-
runAff_ completion do
40-
void $ runSpecT (defaultConfig {timeout = Just (Milliseconds 10000.0)}) [consoleReporter] do
32+
launchAff_ do
33+
runSpec' (defaultConfig {timeout = Just (Milliseconds 20000.0)}) [consoleReporter] do
4134
describe "Node.Stream.Aff" do
4235
it "writes and reads" do
4336
let outfilename = "/tmp/test1.txt"
@@ -56,13 +49,21 @@ main = unsafePartial $ do
5649
let inputs = input1 <> input2 <> input3
5750
input :: Buffer <- liftEffect $ concat inputs
5851
inputSize <- liftEffect $ Buffer.size input
59-
shouldEqual (10 * magnitude) inputSize
52+
shouldEqual inputSize (10 * magnitude)
6053
it "writes and closes" do
6154
let outfilename = "/tmp/test2.txt"
6255
outfile <- liftEffect $ createWriteStream outfilename
6356
b <- liftEffect $ Buffer.fromString "test" UTF8
6457
write outfile [b]
6558
end outfile
6659
expectError $ write outfile [b]
60+
it "reads from a zero-length Readable" do
61+
r <- liftEffect $ newReadableStringUTF8 ""
62+
b1 <- toStringUTF8 =<< (fst <$> readSome r)
63+
shouldEqual "" b1
64+
b2 <- toStringUTF8 =<< (fst <$> readAll r)
65+
shouldEqual "" b2
66+
b3 <- toStringUTF8 =<< (fst <$> readN r 0)
67+
shouldEqual "" b3
6768

68-
pure (pure unit)
69+
pure unit

0 commit comments

Comments
 (0)