Updated

Concurrent Programming

In a usual sequential program, we do one thing completely after another. For example, if there are two tasks to do, A and B, then we will do A until completion, then do B until completion:

do A completely
do B completely

In a concurrent program, we do a little bit of either, arbitrarily:

work on A for 2ms
work on B for 1ms
work on A for 3ms
work on B for 6ms
work on A for 1ms
...

One of the advantages of writing concurrent programs (even in the presence of only a single operating system process) is that it would appear to the user that both tasks are executed simultaneously, making the program feel more fluid and have lower latency.

Typically, most programming languages provide building blocks for writing concurrent programs in the form of independent threads of control. Haskell is no exception. In Haskell, a thread is an I/O action that executes independently of other threads. To create a thread, we import the Control.Concurrent module and use the forkIO function. In the following example, we have one I/O action that writes Hello World! to a new file called it5100a-notes.md, and use forkIO to independently execute that I/O action in a separate thread immediately.

ghci> import Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> import System.Directory
ghci> forkIO (writeFile "it5100a-notes.md" "Hello World!") >> doesFileExist "it5100a-notes.md"
False

The Haskell runtime does not specify an order in which threads are executed. Thus, the file it5100a-notes.md created by the new thread may or may not have been created by the time the original thread checks for its existence. If we try this example once, and then remove it5100a-notes.md and try again, we may get a different result the second time. In general, concurrent programs are nondeterministic.

From earlier, we stated that concurrent programs can "hide" the latency of programs by executing multiple tasks concurrently. This makes applications more responsive. For example, a web browser needs to process user input (like button clicks etc.) and page loads or running JavaScript processes. If a web browser were programmed sequentially, then the page must load completely before the user can interact with the browser at all. In addition, JavaScript processes will usually be running in the background, and while it is doing so, the user cannot interact with the browser either. However, since web browsers are (almost always) concurrent, the user can continue to interact with them while background processes are running, even if the browser is only running on a single CPU core.

A toy example demonstrating this is as follows. We shall write a program that receives some user input and creates a large file with user specified contents:

import Data.List

writeContents :: String -> String -> IO ()
writeContents file_name contents = do
  let c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 100 contents)
  writeFile file_name c
  putStrLn $ file_name ++ " written"

main :: IO ()
main = do
  putStrLn "Enter filename:"
  x <- getLine
  if x == "exit"
  then return ()
  else do putStrLn "Enter file contents:"
          y <- getLine
          forkIO $ writeContents x y
          main

Observe the main I/O action. All main does is to read user input. The logic for writing the user input to a file is done by writeContents, which is done on a separate thread. This way, main can read user input immediately after writeContents is forked, and is ready to read more user input again, without having to wait for writeContents to complete its task first. If we hadn't used forkIO, main would perform the writing on the same thread completely, which may take a while, before being able to read user input again.

Communication Between Threads

The simplest way to share information between two threads is to let them both use a variable. In our file generation, the main thread shares both the name of a file and its contents with the other thread. Because Haskell data is immutable by default, this poses no risks: neither thread can modify the other's view of the file's name or contents. However, we will often need to have threads actively communicating with each other. For example, GHC does not provide a way for one thread to emit data to another thread, or let another thread know that it is still executing or has terminated.

MVars

The way we do this in Haskell is to use a synchronizing variable called the MVar. An MVar is essentially a mutable variable holding a value. You can put something in a variable making it full, and take out the value from a full MVar, making it empty.

ghci> import Control.Concurrent.MVar
ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a

Importantly, using putMVar on a full MVar causes the thread to block until the MVar becomes empty; dually, using takeMVar on an empty MVar causes the thread to block until the MVar becomes full.

An example of using MVars is as follows. We are going to use the same toy example from earlier, except we are going to ensure that only one thread can perform file writing. This is so that we do not hog computing resources from other parts of our system (which there aren't, but suppose there are):

import Data.List
import Control.Concurrent.MVar

writeContents :: String -> String -> MVar () -> IO ()
writeContents file_name contents lock = do
  takeMVar lock 
  putStrLn $ "Write to " ++ file_name ++ " started"
  let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
  writeFile file_name c
  putStrLn $ file_name ++ " written"
  putMVar lock ()

mainLoop :: MVar () -> IO ()
mainLoop lock = do
  putStrLn "Enter filename:"
  x <- getLine
  if x == "exit"
  then do takeMVar lock
          return ()
  else do putStrLn "Enter file contents:"
          y <- getLine
          forkIO $ writeContents x y lock
          mainLoop lock

main :: IO ()
main = do
  lock <- newMVar ()
  mainLoop lock

Upon executing this program, the main I/O action initializes a single MVar which is then passed onto other parts of the program. The mainLoop action does the same as before, except that it receives the lock from main. Note the then branch in mainLoop—if the user enters exit, it waits for the lock to be filled with a value, signalling the completion of file writing, before exiting the program. Importantly, takeMVar wakes up in FIFO order. In other words, we are guaranteed that no more threads of writeContents will be waiting to be executed at this point, because takeMVar will only wake up the mainLoop thread once all earlier writeContents threads have been executed. The writeContents action performs the actual file writing as per usual; however, it first acquires the shared lock before executing the file writing operation, before putting back the lock value. This is so that only one thread can perform writeContents at any time.

Channels

Aside from MVars, we can also provide one-way communication channels via the Chan type. Threads can write to channels (without blocking), and can read from channels (blocking if the channel is empty):

ghci> import Control.Concurrent.Chan
ghci> :t writeChan
writeChan :: Chan a -> a -> IO ()
ghci> :t readChan
readChan :: Chan a -> IO a

An example is as follows:

import Data.List
import Control.Concurrent.Chan

writeContents :: Chan String -> IO ()
writeContents chan = do
  file_name <- readChan chan
  contents  <- readChan chan
  putStrLn $ "Write to " ++ file_name ++ " started"
  let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
  writeFile file_name c
  putStrLn $ file_name ++ " written"
  writeContents chan

mainLoop :: Chan String -> IO ()
mainLoop chan = do
  putStrLn "Enter filename:"
  x <- getLine
  if x == "exit"
  then return ()
  else do putStrLn "Enter file contents:"
          y <- getLine
          writeChan chan x
          writeChan chan y
          mainLoop chan

main :: IO ()
main = do
  chan <- newChan :: IO (Chan String)
  forkIO $ writeContents chan
  mainLoop chan

In this example, only two threads are ever spawned: (1) the main thread which runs main and mainLoop, which, like before, only reads user input and passes the file name and contents to the other thread, which performs file writing. main initializes the channel and forks another thread to writeContents. mainLoop receives user input and passes them through the channel, which is read by writeContents for file writing.

More Cores!

Thus far, we have only discussed how the Haskell runtime is able to spark new threads of control. As said before, this does not guarantee that the program is actually running on multiple cores. In fact, by default, the program only runs on a single core. We can inspect this by using the numCapabilities function:

import GHC.Conc
main :: IO ()
main = print numCapabilities
> ghc Main.hs
> ./Main
1

There is in fact, a way to set the number of CPU cores being used by the Haskell runtime. This can be done using the setNumCapabilities function.

import Control.Concurrent
main :: IO ()
main = do setNumCapabilities 4
          print numCapabilities

However, compiling and running this program gives a warning:

ghc Main.hs
./Main
Main: setNumCapabilities: not supported in the non-threaded RTS
1

The reason for this is because Haskell uses two runtime systems: (1) a non-threaded runtime, and (2) a threaded runtime. By default, compiling our program with ghc links the non-threaded runtime, which is not able to leverage multiple cores. Therefore, if we want to use multiple cores, we have to use the threaded runtime instead. This can be done by compiling the program with the -threaded option.

ghc Main.hs -threaded
./Main
4

Another way to specify the number of cores being used is to provide the +RTS -Nx option, where x is the number of cores we would like to use.

If we are using cabal to build our project, we can provided -threaded as a GHC option via the ghc-options setting:

executable playground
    -- ...
    ghc-options: -threaded

And execute it with cabal run -- +RTS -Nx.

Let's give this a try!

import Data.List
import Control.Concurrent

writeContents :: String -> String -> Chan () -> IO ()
writeContents file_name contents chan = do
  putStrLn $ "Write to " ++ file_name ++ " started"
  let !c = intercalate "\n" $ replicate 1000000 (intercalate "" $ replicate 500 contents)
  writeFile file_name c
  putStrLn $ file_name ++ " written"
  writeChan chan ()

main :: IO ()
main = do
  n <- getNumCapabilities
  putStrLn $ "Number of cores: " ++ show n
  chan <- newChan :: IO (Chan ())
  forkIO $ writeContents "abc" "def" chan
  forkIO $ writeContents "def" "ghi" chan
  _ <- readChan chan
  _ <- readChan chan
  return ()

This program is simple. The main I/O action creates a channel and passes them to two threads that perform file writing. Once each thread has completed writing the file, they will also write to the channel, signalling completion. The main I/O action will only terminate once both threads have completed.

Let's try timing our program with different runtime options. The first execution command runs our program with 4 cores, while the second one only uses 1. We use the time shell command to time the execution of each program:

> time cabal run playground -- +RTS -N4
Number of cores: 4
Write to abc started
Write to def started
abc written
def written

________________________________________________________
Executed in    6.44 secs    fish           external
   usr time   10.76 secs  379.00 micros   10.76 secs
   sys time    1.15 secs  137.00 micros    1.15 secs
> time cabal run
Number of cores: 1
Write to def started
Write to abc started
def written
abc written

________________________________________________________
Executed in   12.02 secs    fish           external
   usr time   10.78 secs    0.00 micros   10.78 secs
   sys time    0.84 secs  560.00 micros    0.84 secs

Notice that the usr and sys times for both are roughly similar. This is not surprising, because usr and sys times reflect CPU execution time; loosely, if the CPU spends 1s executing on one core and 1s executing on another core, then the total of usr and sys time reported with be 2s. This is to be expected, because the same amount of work needs to be done to write to our files. However, what we really want to profile is the real or wall clock time, i.e. how much time had elapsed on the clock. As you can see, the multicore execution ran roughly 2x faster than the single core execution, since we have two files we can write in parallel!