Concurrent Programming
In a usual sequential program, we do one thing completely after another. For example, if there are two tasks to do, A
and B
, then we will do A
until completion, then do B
until completion:
do A completely
do B completely
In a concurrent program, we do a little bit of either, arbitrarily:
work on A for 2ms
work on B for 1ms
work on A for 3ms
work on B for 6ms
work on A for 1ms
...
One of the advantages of writing concurrent programs (even in the presence of only a single operating system process) is that it would appear to the user that both tasks are executed simultaneously, making the program feel more fluid and have lower latency.
Typically, most programming languages provide building blocks for writing concurrent programs in the form of independent threads of control. Haskell is no exception. In Haskell, a thread is an I/O action that executes independently of other threads. To create a thread, we import the Control.Concurrent
module and use the forkIO
function. In the following example, we have one I/O action that writes Hello World!
to a new file called it5100a-notes.md
, and use forkIO
to independently execute that I/O action in a separate thread immediately.
ghci> import Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> import System.Directory
ghci> forkIO (writeFile "it5100a-notes.md" "Hello World!") >> doesFileExist "it5100a-notes.md"
False
The Haskell runtime does not specify an order in which threads are executed. Thus, the file it5100a-notes.md
created by the new thread may or may not have been created by the time the original thread checks for its existence. If we try this example once, and then remove it5100a-notes.md
and try again, we may get a different result the second time. In general, concurrent programs are nondeterministic.
From earlier, we stated that concurrent programs can "hide" the latency of programs by executing multiple tasks concurrently. This makes applications more responsive. For example, a web browser needs to process user input (like button clicks etc.) and page loads or running JavaScript processes. If a web browser were programmed sequentially, then the page must load completely before the user can interact with the browser at all. In addition, JavaScript processes will usually be running in the background, and while it is doing so, the user cannot interact with the browser either. However, since web browsers are (almost always) concurrent, the user can continue to interact with them while background processes are running, even if the browser is only running on a single CPU core.
A toy example demonstrating this is as follows. We shall write a program that receives some user input and creates a large file with user specified contents:
import Data.List
writeContents :: String -> String -> IO ()
writeContents file_name contents = do
let c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 100 contents)
writeFile file_name c
putStrLn $ file_name ++ " written"
main :: IO ()
main = do
putStrLn "Enter filename:"
x <- getLine
if x == "exit"
then return ()
else do putStrLn "Enter file contents:"
y <- getLine
forkIO $ writeContents x y
main
Observe the main
I/O action. All main
does is to read user input. The logic for writing the user input to a file is done by writeContents
, which is done on a separate thread. This way, main
can read user input immediately after writeContents
is forked, and is ready to read more user input again, without having to wait for writeContents
to complete its task first. If we hadn't used forkIO
, main
would perform the writing on the same thread completely, which may take a while, before being able to read user input again.
Communication Between Threads
The simplest way to share information between two threads is to let them both use a variable. In our file generation, the main thread shares both the name of a file and its contents with the other thread. Because Haskell data is immutable by default, this poses no risks: neither thread can modify the other's view of the file's name or contents. However, we will often need to have threads actively communicating with each other. For example, GHC does not provide a way for one thread to emit data to another thread, or let another thread know that it is still executing or has terminated.
MVar
s
The way we do this in Haskell is to use a synchronizing variable called the MVar
. An MVar
is essentially a mutable variable holding a value. You can put something in a variable making it full, and take out the value from a full MVar
, making it empty.
ghci> import Control.Concurrent.MVar
ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a
Importantly, using putMVar
on a full MVar
causes the thread to block until the MVar
becomes empty; dually, using takeMVar
on an empty MVar
causes the thread to block until the MVar
becomes full.
An example of using MVar
s is as follows. We are going to use the same toy example from earlier, except we are going to ensure that only one thread can perform file writing. This is so that we do not hog computing resources from other parts of our system (which there aren't, but suppose there are):
import Data.List
import Control.Concurrent.MVar
writeContents :: String -> String -> MVar () -> IO ()
writeContents file_name contents lock = do
takeMVar lock
putStrLn $ "Write to " ++ file_name ++ " started"
let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
writeFile file_name c
putStrLn $ file_name ++ " written"
putMVar lock ()
mainLoop :: MVar () -> IO ()
mainLoop lock = do
putStrLn "Enter filename:"
x <- getLine
if x == "exit"
then do takeMVar lock
return ()
else do putStrLn "Enter file contents:"
y <- getLine
forkIO $ writeContents x y lock
mainLoop lock
main :: IO ()
main = do
lock <- newMVar ()
mainLoop lock
Upon executing this program, the main
I/O action initializes a single MVar
which is then passed onto other parts of the program. The mainLoop
action does the same as before, except that it receives the lock
from main
. Note the then
branch in mainLoop
—if the user enters exit
, it waits for the lock
to be filled with a value, signalling the completion of file writing, before exiting the program. Importantly, takeMVar
wakes up in FIFO order. In other words, we are guaranteed that no more threads of writeContents
will be waiting to be executed at this point, because takeMVar
will only wake up the mainLoop
thread once all earlier writeContents
threads have been executed. The writeContents
action performs the actual file writing as per usual; however, it first acquires the shared lock
before executing the file writing operation, before putting back the lock
value. This is so that only one thread can perform writeContents
at any time.
Channels
Aside from MVar
s, we can also provide one-way communication channels via the Chan
type. Threads can write to channels (without blocking), and can read from channels (blocking if the channel is empty):
ghci> import Control.Concurrent.Chan
ghci> :t writeChan
writeChan :: Chan a -> a -> IO ()
ghci> :t readChan
readChan :: Chan a -> IO a
An example is as follows:
import Data.List
import Control.Concurrent.Chan
writeContents :: Chan String -> IO ()
writeContents chan = do
file_name <- readChan chan
contents <- readChan chan
putStrLn $ "Write to " ++ file_name ++ " started"
let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
writeFile file_name c
putStrLn $ file_name ++ " written"
writeContents chan
mainLoop :: Chan String -> IO ()
mainLoop chan = do
putStrLn "Enter filename:"
x <- getLine
if x == "exit"
then return ()
else do putStrLn "Enter file contents:"
y <- getLine
writeChan chan x
writeChan chan y
mainLoop chan
main :: IO ()
main = do
chan <- newChan :: IO (Chan String)
forkIO $ writeContents chan
mainLoop chan
In this example, only two threads are ever spawned: (1) the main thread which runs main
and mainLoop
, which, like before, only reads user input and passes the file name and contents to the other thread, which performs file writing. main
initializes the channel and forks another thread to writeContents
. mainLoop
receives user input and passes them through the channel, which is read by writeContents
for file writing.
More Cores!
Thus far, we have only discussed how the Haskell runtime is able to spark new threads of control. As said before, this does not guarantee that the program is actually running on multiple cores. In fact, by default, the program only runs on a single core. We can inspect this by using the numCapabilities
function:
import GHC.Conc
main :: IO ()
main = print numCapabilities
> ghc Main.hs
> ./Main
1
There is in fact, a way to set the number of CPU cores being used by the Haskell runtime. This can be done using the setNumCapabilities
function.
import Control.Concurrent
main :: IO ()
main = do setNumCapabilities 4
print numCapabilities
However, compiling and running this program gives a warning:
ghc Main.hs
./Main
Main: setNumCapabilities: not supported in the non-threaded RTS
1
The reason for this is because Haskell uses two runtime systems: (1) a non-threaded runtime, and (2) a threaded runtime. By default, compiling our program with ghc
links the non-threaded runtime, which is not able to leverage multiple cores. Therefore, if we want to use multiple cores, we have to use the threaded runtime instead. This can be done by compiling the program with the -threaded
option.
ghc Main.hs -threaded
./Main
4
Another way to specify the number of cores being used is to provide the +RTS -Nx
option, where x
is the number of cores we would like to use.
If we are using cabal
to build our project, we can provided -threaded
as a GHC option via the ghc-options
setting:
executable playground
-- ...
ghc-options: -threaded
And execute it with cabal run -- +RTS -Nx
.
Let's give this a try!
import Data.List
import Control.Concurrent
writeContents :: String -> String -> Chan () -> IO ()
writeContents file_name contents chan = do
putStrLn $ "Write to " ++ file_name ++ " started"
let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
writeFile file_name c
putStrLn $ file_name ++ " written"
writeChan chan ()
main :: IO ()
main = do
n <- getNumCapabilities
putStrLn $ "Number of cores: " ++ show n
chan <- newChan :: IO (Chan ())
forkIO $ writeContents "abc" "def" chan
forkIO $ writeContents "def" "ghi" chan
_ <- readChan chan
_ <- readChan chan
return ()
This program is simple. The main
I/O action creates a channel and passes them to two threads that perform file writing. Once each thread has completed writing the file, they will also write to the channel, signalling completion. The main
I/O action will only terminate once both threads have completed.
Let's try timing our program with different runtime options. The first execution command runs our program with 4 cores, while the second one only uses 1. We use the time
shell command to time the execution of each program:
> time cabal run playground -- +RTS -N4
Number of cores: 4
Write to abc started
Write to def started
abc written
def written
________________________________________________________
Executed in 6.44 secs fish external
usr time 10.76 secs 379.00 micros 10.76 secs
sys time 1.15 secs 137.00 micros 1.15 secs
> time cabal run
Number of cores: 1
Write to def started
Write to abc started
def written
abc written
________________________________________________________
Executed in 12.02 secs fish external
usr time 10.78 secs 0.00 micros 10.78 secs
sys time 0.84 secs 560.00 micros 0.84 secs
Notice that the usr
and sys
times for both are roughly similar. This is not surprising, because usr
and sys
times reflect CPU execution time; loosely, if the CPU spends 1s executing on one core and 1s executing on another core, then the total of usr
and sys
time reported with be 2s. This is to be expected, because the same amount of work needs to be done to write to our files. However, what we really want to profile is the real or wall clock time, i.e. how much time had elapsed on the clock. As you can see, the multicore execution ran roughly 2x faster than the single core execution, since we have two files we can write in parallel!