|
77 | 77 |
|
78 | 78 | Reads in a thread to avoid blocking a go block thread."
|
79 | 79 | ([input] (input-stream->input-chan input {}))
|
80 |
| - ([^java.io.InputStream input {:keys [close? keyword-function] |
81 |
| - :or {close? true, keyword-function csk/->kebab-case-keyword}}] |
82 |
| - (let [msgs (async/chan 1)] |
| 80 | + ([^java.io.InputStream input |
| 81 | + {:keys [close? keyword-function buf-or-n] |
| 82 | + :or {close? true |
| 83 | + keyword-function csk/->kebab-case-keyword |
| 84 | + ;; By default buffer a few read messages, so that message parsing is |
| 85 | + ;; less likely to block processing. Processing is still sequential |
| 86 | + ;; because pipeline isn't parallel. |
| 87 | + buf-or-n 8}}] |
| 88 | + (let [msgs (async/chan buf-or-n)] |
83 | 89 | (async/thread
|
84 | 90 | (loop [headers {}]
|
85 | 91 | (let [line (read-header-line input)]
|
|
101 | 107 | channel is closed, closes the output.
|
102 | 108 |
|
103 | 109 | Writes in a thread to avoid blocking a go block thread."
|
104 |
| - [^java.io.OutputStream output] |
105 |
| - (let [messages (async/chan 1)] |
106 |
| - (binding [*out* (io/writer output)] |
107 |
| - (async/thread |
108 |
| - (loop [] |
109 |
| - (if-let [msg (async/<!! messages)] |
110 |
| - (do |
111 |
| - (write-message msg) |
112 |
| - (recur)) |
| 110 | + ([output] (output-stream->output-chan output {})) |
| 111 | + ([^java.io.OutputStream output |
| 112 | + {:keys [buf-or-n] |
| 113 | + :or {;; By default buffer a few messages to write, so that message delivery is |
| 114 | + ;; less likely to block processing. Processing is still sequential |
| 115 | + ;; because pipeline isn't parallel. |
| 116 | + buf-or-n 8}}] |
| 117 | + (let [messages (async/chan buf-or-n)] |
| 118 | + (binding [*out* (io/writer output)] |
| 119 | + (async/thread |
| 120 | + (loop [] |
| 121 | + (if-let [msg (async/<!! messages)] |
| 122 | + (do |
| 123 | + (write-message msg) |
| 124 | + (recur)) |
113 | 125 | ;; channel closed; also close output
|
114 |
| - (.close output))))) |
115 |
| - messages)) |
| 126 | + (.close output))))) |
| 127 | + messages))) |
0 commit comments