Interview Question: Write a CSV parser
Recently had to run some interviews to help build a new team in my organization. So naturally, I gave the question a try myself.
The question
You will design a program that consumes high-frequency temperature data and
puts it through a pipeline for various kinds of data processing.
The system should be flexible where a pipeline can be composed into a chain
of processing units. Each processor should ideally do one thing or bit of
processing before passing on to the data on to the next processor.
The data will be sourced from a large CSV file with 2 columns in the order:
DateTime, Temperature
Develop a pipeline of processors chained together that do the following:
- Parses the CSV data into records (say objects with properties, one for
each column).
- Filters bad records by ignoring those that may be out of order (timewise).
- Calculates the average temperature for each day.
- Prints the result of the calculation.
As you work through this task, you should demonstrate that your code is
working.
You are welcome to use any frameworks or libraries you like as well as refer
to online resources, documentation, and perform web searches.
The Answer
Did it in two languages (F# and Clojure) and it took me just over 20 minutes each. But of course I would expect it to take longer for interviewees under the pressures of an interview.
I did NOT implement escapable commas, as I attempted to write the solution as soon as I learned of the question (to try and emulate an timed interview experience) and I did not remember the gross ways CSV handles quotes and commas. To get around this, I just made the ability to give your own comma splitting function to each implementation.
Total Time Spent For Both Answers: 48 minutes
F#
My F# solution was pretty simple to reach, started out by outlining a generic
Pipeline
type to capture the entire CSV processing pipeline and then
parameterized all the required functionality for it via
dependency parameterization.
After that, it was just a matter of creating concrete implementations of all
needed functions to fulfill the Pipeline
.
My only complaint about this answer is that it is not super composable. While it
fulfills all the asks of the original question, I find that the inability to
capture the entire processing pipeline into a composition of functions to be
somewhat annoying. I suppose this is the price you pay for keeping the solution
type-safe. We could build a generic pipeline like
type Pipeline = (Object -> Object) seq
, but I quite frankly find that
disgusting.
/// You will design a program that consumes high-frequency temperature data and
/// puts it through a pipeline for various kinds of data processing.
/// The system should be flexible where a pipeline can be composed into a chain
/// of processing units. Each processor should ideally do one thing or bit of
/// processing before passing on to the data on to the next processor.
/// The data will be sourced from a large CSV file with 2 columns in the order:
/// DateTime, Temperature
/// Develop a pipeline of processors chained together that do the following:
/// - Parses the CSV data into records (say objects with properties, one for
/// each column).
/// - Filters bad records by ignoring those that may be out of order (timewise).
/// - Calculates the average temperature for each day.
/// - Prints the result of the calculation.
/// As you work through this task, you should demonstrate that your code is
/// working.
/// You are welcome to use any frameworks or libraries you like as well as refer
/// to online resources, documentation, and perform web searches.
open System
module CsvParser =
/// Function for splitting a single CSV line by comma into a sequence of
/// string entries
type CsvSplitStringByCommaFn = string -> string seq
/// Turns sequence of CSV lines into a sequence of generic Maps
type CsvToMapParser<'Fail> =
string seq -> Result<Map<string, string>, 'Fail> seq
/// Converts generic maps into strict type
type MapToRecordFn<'Record, 'Fail> =
Map<string, string> -> Result<'Record, 'Fail>
/// A filter predicate to determine if a parsed CSV entry is valid
type Validator<'Record> = 'Record -> bool
/// A reducing function to reduce your sequence of records into a usable type
type ReduceFn<'Acc, 'Input> = 'Acc -> 'Input -> 'Acc
/// Error handler which when provided a Result of case Error, will decide
/// whether or not 'A should be discarded.
type ErrorHandler<'A, 'Fail> = Result<'A, 'Fail> -> 'A option
/// Generic CSV parsing/processing pipeline
type Pipeline<'Record, 'Fail, 'Reduction> =
CsvToMapParser<'Fail> // Convert CSV lines into maps
-> MapToRecordFn<'Record, 'Fail> // Convert generic maps to strict type
-> ErrorHandler<'Record, 'Fail> // Determine if on 'Fail the record should be discarded
-> Validator<'Record> seq // Sequence of validators to determine if 'Record is valid
-> ReduceFn<'Reduction, 'Record> // Reducing function to pass to fold
-> 'Reduction // Initial reduction value for fold
-> string seq // Sequence of CSV lines to process
-> Result<'Reduction, 'Fail> // Result of the reduction
let genericPipeline
parser
mapToRecordFn
errorHandler
validators
reducingFn
reduceInit
lines
=
let parseResults =
lines
|> parser
|> Seq.map (Result.bind mapToRecordFn)
// Check if their are any errors in the parse results
let parsingError =
parseResults
|> Seq.choose (function
| Error ex -> Some ex
| Ok _ -> None)
|> Seq.tryHead
match parsingError with
| Some err -> err |> Error
| None ->
lines
|> parser
|> Seq.map (Result.bind mapToRecordFn)
|> Seq.choose (fun result ->
match result with
| Ok record -> Some record
| Error _ -> errorHandler result)
|> Seq.filter (fun record ->
validators
|> Seq.map (fun validator -> validator record)
|> Seq.reduce (fun result validation -> result && validation))
|> (Seq.fold reducingFn reduceInit)
|> Ok
/// Generic `Pipeline` implementation
let pipeline: Pipeline<'A, 'Fail, 'Reduction> =
fun parser mapToRecordFn errorHandler validators reducingFn reduceInit lines ->
let parseResults =
lines
|> parser
|> Seq.map (Result.bind mapToRecordFn)
// Check if their are any errors in the parse results
let parsingError =
parseResults
|> Seq.choose (function
| Error ex -> Some ex
| Ok _ -> None)
|> Seq.tryHead
match parsingError with
| Some err -> err |> Error
| None ->
lines
|> parser
|> Seq.map (Result.bind mapToRecordFn)
|> Seq.choose (fun result ->
match result with
| Ok record -> Some record
| Error _ -> errorHandler result)
|> Seq.filter (fun record ->
validators
|> Seq.map (fun validator -> validator record)
|> Seq.reduce (fun result validation -> result && validation))
|> (Seq.fold reducingFn reduceInit)
|> Ok
/// Turns a sequence of CSV lines strings into a sequence of
/// `Map<string, string>`.
/// Accepts a comma splitting function.
let genericCsvParser: (string -> string seq) -> CsvToMapParser<'Fail> =
fun commaSplitter lines ->
let headers = lines |> Seq.tryHead |> Option.map commaSplitter
let contentRows =
try
lines
|> Seq.removeAt 0
|> Seq.map commaSplitter
|> Some
with
| _ -> None
match headers, contentRows with
| _, None -> Seq.empty
| None, _ -> Seq.empty
| Some headers, Some rows ->
rows
|> Seq.map (fun row ->
row
|> Seq.mapi (fun idx colStr ->
match headers |> Seq.tryItem idx with
| None -> failwithf $"Column index %d{idx} out of header range"
| Some header -> (header, colStr))
|> Map.ofSeq
|> Ok)
module CseCsvRecord =
type CseCsvRecord =
{ Temperature: Double
DateTime: DateTimeOffset }
let private fromMap: CsvParser.MapToRecordFn<CseCsvRecord, Exception> =
fun map ->
let tryFindAndParse key parser map =
map
|> Map.tryFind key
|> function
| Some v -> Ok v
| None ->
Exception($"'%s{key}' not found in %A{map}")
|> Error
|> Result.bind (fun str ->
try
parser str |> Ok
with
| ex ->
Exception($"Failed to parse '%s{key}' from %A{map}", ex)
|> Error)
tryFindAndParse "Temperature" Double.Parse map
|> Result.bind (fun temperature ->
tryFindAndParse "DateTime" DateTimeOffset.Parse map
|> Result.map (fun dt ->
{ Temperature = temperature
DateTime = dt }))
|> Result.mapError (fun ex ->
Exception(
$"Failed to convert %A{map} into a %s{nameof CseCsvRecord}",
ex
))
let temporalValidator initialDateTime : CsvParser.Validator<CseCsvRecord> =
let mutable lastDateTime = initialDateTime
fun record ->
let currDateTime = record.DateTime
match lastDateTime with
| None ->
lastDateTime <- Some currDateTime
true
| Some prevDateTime ->
match DateTimeOffset.Compare(prevDateTime, currDateTime) with
| offset when offset < 0 ->
lastDateTime <- Some currDateTime
true
| _ -> false
let dailyAverageTempReducingFn: CsvParser.ReduceFn<Map<DateTimeOffset, int * double>, CseCsvRecord> =
fun acc record ->
let date = record.DateTime.Date |> DateTimeOffset
let count, totalTemp =
acc
|> Map.tryFind date
|> Option.defaultValue (0, 0.0)
acc
|> Map.add date ((count + 1), (totalTemp + record.Temperature))
let private splitByCommaNoDelimiter (s: string) = s.Split(",") |> Seq.ofArray
/// A fulfilled implementation of CsvParser.Pipeline.
let pipelineMain =
CsvParser.pipeline
(CsvParser.genericCsvParser splitByCommaNoDelimiter)
fromMap
(fun result ->
printfn $"Error occurred converting Map to record: %A{result}"
None)
[ temporalValidator None ]
dailyAverageTempReducingFn
Map.empty
[<EntryPoint>]
let main args =
/// Split string into lines and trim each line
let prepCsvString (csvString: string) =
csvString.Trim().Split("\n")
|> Seq.ofArray
|> Seq.map (fun line -> line.Trim())
"""
DateTime,Temperature
2022-04-11T18:00:00+00:00,1
2022-04-11T18:05:00+00:00,2
2022-04-11T18:10:00+00:00,3
2022-04-11T18:15:00+00:00,4
2022-04-11T18:20:00+00:00,5
2022-04-11T18:25:00+00:00,6
2022-04-11T18:20:00+00:00,7
2022-04-11T18:20:00+00:00,100
2022-04-12T18:00:00+00:00,10
2022-04-12T18:01:00+00:00,1000
"""
|> prepCsvString
|> CseCsvRecord.pipelineMain
|> Result.map (fun map ->
map
|> Map.toSeq
|> Seq.iter (fun (k, (count, totalTemp)) ->
printfn
$"The average temperature on %04d{k.Year}-%02d{k.Month}-%02d{k.Day} was %f{totalTemp / (double count)}ºC"))
|> ignore
0
Clojure
Overall I prefer my Clojure answer quite a bit more as it's very succinct, is pretty much as high-performance as you can get while staying idiomatic, and does what Clojure is best at: information processing.
The entire process is a transducer/eduction, which really fulfills the want of a composable data pipeline. Whereas in an OO language, you would need (should?) follow a chain-of-responsibility pattern to make a composable pipeline, in Clojure, it just comes so much more naturally... and looks so much prettier.
(ns com.microsoft.cse.hiring.csv
"You will design a program that consumes high-frequency temperature data and
puts it through a pipeline for various kinds of data processing.
The system should be flexible where a pipeline can be composed into a chain
of processing units. Each processor should ideally do one thing or bit of
processing before passing on to the data on to the next processor.
The data will be sourced from a large CSV file with 2 columns in the order:
DateTime, Temperature
Develop a pipeline of processors chained together that do the following:
- Parses the CSV data into records (say objects with properties, one for each
column).
- Filters bad records by ignoring those that may be out of order (timewise).
- Calculates the average temperature for each day.
- Prints the result of the calculation.
As you work through this task, you should demonstrate that your code is
working.
You are welcome to use any frameworks or libraries you like as well as refer
to online resources, documentation, and perform web searches."
(:require
[clojure.java.io :as io]
[clojure.string :as string]))
(defn process-lines
"Processes a sequence of CSV lines into a sequence of records, removes invalid
ones (out of order), and calculates the average temperature for each day."
[csv-lines-seq
& {:keys [comma-split-fn
k-fn-parser-map]
:or {comma-split-fn #(string/split % #",")
k-fn-parser-map {:DateTime #(LocalDateTime/parse % DateTimeFormatter/ISO_DATE_TIME)
:Temperature parse-double}}}]
(let [csv-lines-seq (seq csv-lines-seq)
header-coll (->> (comma-split-fn (first csv-lines-seq))
(into [] (map keyword)))]
(->> (rest csv-lines-seq)
(eduction
;; Convert csv lines into maps
(comp
(map comma-split-fn)
(map (fn [col-vals]
(->> col-vals
(into {}
(map-indexed
(fn [idx col-str]
(let [header (nth header-coll idx)
parser (or (k-fn-parser-map header)
identity)
parsed (parser col-str)]
[header parsed]))))))))
;; Ensure all records are sorted by DateTime
(comp
;; Ensure all records do have a DateTime
(filter (fn [{:keys [DateTime]}] (some? DateTime)))
;; Filter out records which are out of order.
;; We need to use a custom stateful transducer to which can
;; keep track of previous max LocalDateTime.
(fn [rf]
(let [prev-date-time-v (volatile! nil)]
(fn
([] (rf))
([result] (rf result))
([result {^LocalDateTime curr-date-time :DateTime :as input}]
(let [^LocalDateTime prev-date-time @prev-date-time-v]
(if (or (nil? prev-date-time)
(and (some? prev-date-time)
(-> prev-date-time (.isBefore curr-date-time))))
(do
(vreset! prev-date-time-v curr-date-time)
(rf result input))
result)))))))
;; Process daily information
(comp
;; Partition by date (NOT datetime); we can only do this
;; because records are ordered
(partition-by (fn [{:keys [^LocalDateTime DateTime]}]
(-> DateTime (.toLocalDate))))
;; convert partitions into maps of daily info
(map (fn [record-partition-by-date]
(let [record-count (count record-partition-by-date)
sum-temp (->> record-partition-by-date
(transduce (map #(:Temperature %)) +))
avg-temp (/ sum-temp record-count)
^LocalDateTime
date-time (->> (first record-partition-by-date)
:DateTime)
date (-> date-time (.toLocalDate))]
{:readings record-count
:sum-temp sum-temp
:avg-temp avg-temp
:day date}))))))))
(defn process-string
"Process a CSV string"
[csv-string]
(process-lines (string/split-lines csv-string)))
(defn process-file
"Process a CSV file and run the the reducing function `rf` on the resulting
sequence of the pipeline.
NOTE Access to the resulting sequence must be done in the `rf` as the stream
reader stream for the file will be closed when the function exits."
[filepath rf]
(with-open [rdr (io/reader filepath)]
(when-some [lines (line-seq rdr)]
(reduce rf (process-lines lines)))))
(defn -main [& args]
(let [sample-csv-string
"DateTime,Temperature
2022-04-09T19:00:42Z,1
2022-04-09T19:05:42Z,2
2022-04-09T19:10:42Z,3
2022-04-09T19:15:42Z,4
2022-04-09T19:20:42Z,5
2022-04-09T19:25:42Z,6
2022-04-09T19:30:42Z,7
2022-04-09T19:25:42Z,8
2022-05-09T19:25:42Z,9
2022-05-10T19:30:42Z,10"
csv-str-seq (->> sample-csv-string
(string/split-lines)
(map string/trim))]
(for [result (csv-pipeline csv-str-seq)]
(println result))))