clojure – core.async – basic channel operations: creating, putting, taking, closing, and buffers.

core.async is about making processes that take values from and put values into channels.

(require [clojure.core.async :as a])

Creating channels with chan

You create a channel using the chan function:

(def chan-0 (a/chan)) ;; unbuffered channel: acts as a rendez-vous point.
(def chan-1 (a/chan 3)) ;; channel with a buffer of size 3. 
(def chan-2 (a/chan (a/dropping-buffer 3)) ;; channel with a *dropping* buffer of size 3
(def chan-3 (a/chan (a/sliding-buffer 3)) ;; channel with a *sliding* buffer of size 3

Putting values into channels with >!! and >!

You put values into a channel with >!!:

(a/>!! my-channel :an-item)

You can put any value (Strings, numbers, maps, collections, objects, even other channels, etc.) into a channel, except nil:

;; WON'T WORK
(a/>!! my-channel nil)
=> IllegalArgumentException Can't put nil on channel

Depending on the channel’s buffer, >!! may block the current thread.

(let [ch (a/chan)] ;; unbuffered channel
  (a/>!! ch :item) 
  ;; the above call blocks, until another process 
  ;; takes the item from the channel.
  )
(let [ch (a/chan 3)] ;; channel with 3-size buffer
  (a/>!! ch :item-1) ;; => true
  (a/>!! ch :item-2) ;; => true
  (a/>!! ch :item-3) ;; => true
  (a/>!! ch :item-4) 
  ;; now the buffer is full; blocks until :item-1 is taken from ch.
  )

From inside a (go ...) block, you can – and should – use a/>! instead of a/>!!:

 (a/go (a/>! ch :item))

The logical behaviour will be the same as a/>!!, but only the logical process of the goroutine will block instead of the actual OS thread.

Using a/>!! inside of a (go ...) block is an anti-pattern:

;; NEVER DO THIS
(a/go 
  (a/>!! ch :item))

Taking values from channels with <!!

You take a value from a channel using <!!:

;; creating a channel
(def ch (a/chan 3))
;; putting some items in it
(do 
  (a/>!! ch :item-1)
  (a/>!! ch :item-2)
  (a/>!! ch :item-3))
;; taking a value
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2

If no item is available in the channel, a/<!! will block the current Thread until a value is put in the channel (or the channel is closed, see later):

(def ch (a/chan))
(a/<!! ch) ;; blocks until another process puts something into ch or closes it

From inside a (go ...) block, you can – and should – use a/<! instead of a/<!!:

 (a/go (let [x (a/<! ch)] ...))

The logical behaviour will be the same as a/<!!, but only the logical process of the goroutine will block instead of the actual OS thread.

Using a/<!! inside of a (go ...) block is an anti-pattern:

;; NEVER DO THIS
(a/go 
  (a/<!! ch))

Closing channels

You close a channel with a/close!:

(a/close! ch)

Once a channel is closed, and the all data in the channel has been exhausted, takes will always return nil:

(def ch (a/chan 5))

;; putting 2 values in the channel, then closing it
(a/>!! ch :item-1)
(a/>!! ch :item-2)
(a/close! ch)

;; taking from ch will return the items that were put in it, then nil
(a/<!! ch) ;; => :item-1
(a/<!! ch) ;; => :item-2
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil
(a/<!! ch) ;; => nil

;; once the channel is closed, >!! will have no effect on the channel:
(a/>!! ch :item-3)
=> false ;; false means the put did not succeed
(a/<!! ch) ;; => nil

Asynchronous puts with put!

As an alternative to a/>!! (which may block), you can call a/put! to put a value in a channel in another thread, with an optional callback.

(a/put! ch :item)
(a/put! ch :item (fn once-put [closed?] ...)) ;; callback function, will receive 

In ClojureScript, since blocking the current Thread is not possible, a/>!! is not supported, and put! is the only way to put data into a channel from outside of a (go) block.

Asynchronous takes with take!

As an alternative to a/<!! (which may block the current thread), you may use a/take! to take a value from a channel asynchronously, passing it to a callback.

(a/take! ch (fn [x] (do something with x)))

Using dropping and sliding buffers

With dropping and sliding buffers, puts never block, however, when the buffer is full, you lose data. Dropping buffer lose the last data added, whereas sliding buffers lose the first data added.

Dropping buffer example:

(def ch (a/chan (a/dropping-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true ;; put succeeded
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> false ;; put failed

;; no we take from the channel
(a/<!! ch)
=> :item-1
(a/<!! ch)
=> :item-2
(a/<!! ch)
;; blocks! :item-3 is lost

Sliding buffer example:

(def ch (a/chan (a/sliding-buffer 2)))

;; putting more items than buffer size
(a/>!! ch :item-1)
=> true
(a/>!! ch :item-2)
=> true
(a/>!! ch :item-3)
=> true

;; no when we take from the channel:
(a/<!! ch)
=> :item-2
(a/<!! ch)
=> :item-3
;; :item-1 was lost

if you want to reproduce, please indicate the source:
clojure – core.async – basic channel operations: creating, putting, taking, closing, and buffers. - CodeDay