Sunday, September 8, 2013

An essay about expressiveness, behavior and conciseness in code

This week I came across an interesting tweet, which stated that a code is read more often than it is written.

It can only be true, as any tautology can be. It's like a book, right? Written once, read many... until errata come in the game, or new editions.

That was my point when I replied, by trolling (I admit) that Java is becoming a pain in this area... because of its verbosity.
Hopefully, Nicolas Frankel is clever enough to continue on the discussion tone rather than entering an infinite war where none can win...
So we took the decision to express our point in a cross-referenced blog where you'll find your own way of coding.

Reading a code

Before starting, I would like first to wonder why would we have to read code?

From my point of view, there are several relevant cases:
  • learning a new library, code express more than doc that's why it must be opened
  • improving it by adding new features, in that case, we need to have the big picture before starting any hacks
  • debugging
So what I'll claim based on these three (more? poke me!) cases, is that
  • the two firsts can be eased by a clean code describing a behavior using the right level of conciseness but keeping the expressiveness at its best.
  • the later can be a real pain when the conciseness of the code has been badly chosen

Some words about conciseness and Java

Here I'll brush quickly some additions that Java received over the last couple of versions, a lot involved conciseness.

Let's take List and the for-loop.

See? Everything is about making the code more concise without removing any logic nor features.

However, everything is not that shiny, mostly when you need to add more logic, or let say when one need to enhance the behavior. Can you read the following two examples easily?

Don't know about you, but my feeling is that the behavior is not well represented, and thus we need to touch our in-brain JVM to catch it up.

The problem is that the conciseness introduce at the language level is not flexible enough to express advanced workflow, like early termination or filtering.
Now let's look at a Java 8 version of this example:

The same code, less lines... but is it really the interesting feature? Conciseness? Really!?
I don't think so, the most important part is that the behavior can be catch by combining two simple behaviors:

  • filtering
  • mapping (transforming)
  • limiting (taking)
Another good fact is that using the second version, the behavior can even be easily testable by simply testing the the predicate (dereferenced from the Test class) is returning the right value.
So there is no need to test if an array will be filtered correctly, it's asserted by the library.
Also, the behavior is easily extendable since now the filtering is no more explicitly hardcoded in the behavior -- however we're free to instantiate a specific behavior introducing partially applied method.

Now, it's true that we have to agree on an API and everybody must know it. But I think it's always the case when you're creating an API, you're fixing names and concepts.
The sad fact, in Java, is that they are trying to reinvent the wheel by renaming well know behavior like limit (take), substream (drop + take) in the Collection API. That's why semantic is hard when someone is creating his own taxonomy/ontology of concepts.
Moreover, they took the opportunity to leave some noise, like the stream() call (probably for backward compatibility) which result in a collect method (which is also used to reconstruct a List using a Stream).

Conciseness Expressiveness and Scala

In this section, I won't expand myself into much details about Scala, but I'll just try to show some advantages of the expressiveness that Scala offers... for instance, even implicits (in the call-site) are explicits (in the definition-site).

So I'll just mention two (among many) features brought by Scala which is really missing in Java, both being related to expressing a behavior based on an implicitly available context.

Implicit parameter

In Scala, a method/function can have several parameter blocks, like so:
def add(a:Int)(b:Int) = a+b.

Ok, fine, but the very last parameter block can be declared as implicit, this way:
def persist(user:User)(implicit s:DBSession) = ....

What an implicit parameter block of parameters is, is simply a bunch of parameter that the compiler should be able to find within the compilation stack. And if it can find, in a deterministic way, such parameters they'll be automatically provided at the call-site. An example?
What has just happened there? It's rather simple, when the compiler will fall on the persist call, it'll see that the last block is implicit and not provided so it'll search in the actual scope if there is an object matching the required type. In this case, it finds session.

Again, I don't know about you, but I prefer this at what Hibernate (for instance) does... which is declaring by injection by something using an annotation or what not a session somewhere. And if something goes wrong? I hope you have integration tests, because it'll be check @runtime.
In this case, the compiler will blow out if it cannot find a session object, that's it!

So it's explicit at one single place, and will be implicit at all call sites. And I think it's cool/powerful to have such duality.


A for-comprehension in Scala is very similar to a for loop in Java when dealing with sequences, however it's more than this. But beofre going futher, here is what is possible using lists:

For those of you having already tried to use the Future API of Java should just find the following really pleasant because it'll be terse and straightforward to chain futures, without pain but with some implicit meanings...

What's going on there? We fetched a user in a future, when it has been fetched we fetched each friend, one by one, within a sequence then we yielded both results in a tuple.
The resulting fut variable is yet another Future that will hold this later tuple if all the fetches successfully returned, otherwise everything fails!
Afterwards, it's still allowed to adapt the contained tuple as a new tuple... Note that we're not dealing with the value yet, we're just describing what has to be done when the result will happen... or not.

I'm not enough courageous to write the code using the Future API in Java, it would be too painful for me, too many brain gymnastic for nothing... and I didn't even talked about the number of  bugs It'd be prone of.

Oh yes, one last thing on this for-comprehension, fetch should have an implicit parameter, the storage session/metadata access and the Futures need an ExecutionContext instance to be executable... but it's not the role of this piece of code to create or even pass them!

Flaws of conciseness in Scala

Mainly the flaws are raising when the code tries to be concise at a such level the the expressiveness itself is penalized. 
For instance, sometimes, I like to tweet implementation in 140 characters like this one; this is just fun to do... I won't expect to have such code dropped as is in a project unless there are a lot of reasons (I can't even image a single one).

Why? Because it breaks all good conventions that are becoming the de facto best practices in Scala. For those interested in, there is a very good keynote by Martin Odersky on that.

The most important is probably to track the status of the types chain by decomposing a multicalls line in several ones, the very next one being to not overuse the wildcard underscore for inline function.

So the code in the tweet can be migrated this way from:


What you only need to understand are these 5 concepts:
  1. map
  2. flatMap
  3. groupBy
  4. mapValues
  5. flatten
Which one couldn't you guess correctly? Make a guess than look at these rough and limited explained behaviors:
  1. List<A> => List<B> OR Map<K,V> => Map<L,W>
  2. List<List<A>>  => List<A>
  3. List<A> => Map<K,List<A>>
  4. Map<k,V> => Map<K,W>
  5. List<List<A>> => List<A> OR List<Map<K,V>> => Map<K,V>
I kept it cryptic on purpose because it has been explained so many times on the web!!

But also not that the readable code (that could even be more readable by still using non-verbose tools) is, with comments, less than 2 times taller. But it would require hundreds of line of Java, which I don't even have the energy to write... Or you need to convince me that I should...

Last note: just don't override, nor create operators that haven't a widely known semantic. Haskell is a bad example, even if this language is awesome!

Monday, August 19, 2013

mid-2013, my Spark Odyssey... Rolling out Streams in the Spray River

In this post, I'm going to relate the last bits of the project using Spark streaming that started discussing here and continued here.

Up to now, the project hosted on github, has been updated with a JavaScript web application that can show the streams that were created in the first two posts (see above). This web application is not the purpose of this post, it's purpose is about the way I was able to combine streams and present the results in a Spray server.


So what we will see here is how Spark Streaming enables two streams to be combine when they are, somehow, containing data about the same topics.

Then we'll see how it is possible to package to amount of data that is coming over the web using an aggregation method like a windowed summarization.

Since a data is only interesting when it can be consumed by yet another analytic tool, it's pretty helpful to set up a server that enables some access to them -- to be consume by an OLAP application for instance; rather than reflushing everything in HDFS.
For that, we'll use the Spray server that Spark helps us to create, and we'll add it routes that renders the aggregated value (RDD) as Json document to be sent over the wire.

Rollin' to the river

In the first blog, it was configured a DStream that carried on data for each tweet matching a list of stocks ; these data were associated with a sentiment score and packaged in a dedicated structure named TwitterData.

In the next blog, we created yet another DStream that was responsible to poll the Yahoo! finance API to fetch stock changes in real time, for that another structure was created: YahooData that contains information about the actual changes of a given stock.

With that in hand, we can easily figure out that both DStreams are continuously giving information about the same topic: a stock. That's why we can setup a parent trait that define only one thing, that is a stock.

Here are the structures and the related streams:
The asInstanceOf[Data] are required because DStream is not covariant (probably because it is mutable).

Thus, what we could do now it to merge both of them into a single one! I can't wait more, let's do it:

What? You feel a bit frustrated because it's so simple? Completely understood, I was astonished as well how easy we can combine streams of packets where the time was part of the problem without having to do anything...

Cool, so now we have access to a full river of data coming in real time for a bunch of given stocks, but in real life (and with even more streams) it can be problematic to keep track of everything, or redistribute the whole thing. Let's see what we could do.

Compaction (aggregation)

A common workaround of the problem mention above (the capacity one) is to compute aggregated value for a window of time that slides on the stream.

To give a simple example, if we have a stream of integers that arrive at a very high rate, what can be done is to compute several stats on them on a regular basis. That is to say, we can choose to compute the mean and standard deviation of the integers coming the last 30 seconds, and do that every 5s.
This would reduce the need to store only doubles every 5s, rather than, maybe thousands of doubles. Having overlapping sliding window being helpful when applying regression for instance (more component of the variance is kept).

So here, what we are going to do is to compute an aggregation value which an overall score for each stocks present in the stream. 
This score will simply be the sum over the sentiment (as -1 or 1) for Twitter part, itself added to the sum of the Yahoo changes (converted to -1 or 1) -- I'm not a financial guy so I've chosen something easy and straightforward to compute... it's not related to the technologies. Let's see how we could achieve that using the Spark API for DStream.

Nah! Okay, I'll start by the trick map(x => (x.stock, List(x))). Actually, this is rather convenient for what will follow, that is we created a pair composed of a key being the stock and a value being the singleton of the initial data. This singleton thing will become really helpful when a "reduce phase" will happen.

Exactly, an aggregation over a sliding window being a classical use case, the Spark API has already integrated it for (thankfully should I say). So that, we only have to call the reduceByKeyAndWindow method with the reduce function (List.appendAll) and the window's duration, 60 seconds. Note that the sliding time wasn't given, I've chosen to keep the original slicing time from the DStream creation, 5 seconds.

I have to warn something, nevertheless, this function is only available here because we had adapted to something that looks like a KVP, that's why we can reduce byKey as well.
Since the key is the stock, we now have the list of all Data for a 60 seconds window grouped by stock.

Now, recalling what was said earlier, we only want to keep track of a score, not every Data. For that, we mapped the values only to something smaller, the overall score and the number of tweets and yahoo information that participated to this score.

Tada! We're done if the Spark thing... yeah actually, for this post at least (next will come the cluster deployment... but that's for the future!).

True, we're now collecting real time information about a list of stock associated with a score and the sources participation. 

However, they're remain in memory... that's not really helpful right? Actually, once would request these data to perform their own analysis on them, so we need to open them to the outside world.


Mmmmh, this part is a bit tricky because looking at the DStream API you don't have that much choice for that. We can persist (to memory, ...), we can save to an object file or a text file, but redistribute these persisted data would be costy for me, and I'm lazy.

But I like to read code, so I read the Spark code again, I felt again on the AkkaUtils class used in my previous post to create the actor system. And, in fact, this helper is able to create a Spray server as well -- you know this super power server thing that is able to deliver resources at an insane rate thanks to Akka!? If no, open this link in a new tab and read it afterwards (or now), you'll love it! 

Given that, I thought to my self that it'd open some door to laziness, which I liked a lot.

The idea is to start such Spray server that would deliver an actor holding a representation of the aggregated information grouped by stock, with the value being the list of these aggregated data.
WARN: This is certainly not the most efficient way to do this because the state of the actor grows monotonically, but it sufficient for now.

To do so, I've reused the trick from the previous post, to create a class in the spark package that starts the Spray server using AkkaUtils. Afterwards, the route are declared to render the data accumulated in the following actor:

This actor will be simply populated by the DStream itself using the transform function:

I wanted to show it to you because of the FIXME, which is still not fixed. Actually, the function that is given to transform will be part of the lineage of the RDDs, so it has to be serializable. And if I simply reuse an actor as a closure, Akka will tell me (actually how Spray uses the Akka serialization) that it's not possible to make the actor serializable since it doesn't have any actor system in the context... (when Spray calls the serialization).
To counter that, I took the hard way, which is simply "recreating" actor system and actor within each RDD's transform call! And it's very bad. Sorry about that...

If you want more information about this service, just check out this little class here, we're you'll find the JSON serialization as well for the "REST" operations.


Since reading JSON on the fly is not my passion neither part of my resume (which should be somewhere there! or maybe there? Arf, I dunno, anyway...), I've created a very lightweight and not really user friendly interface that shows the results in real-time.

It's all located in the web folder and is intensively using Bootstrap from Twitter, AngularJs from Google and D3.js from Mike.

To start it, you'll need to start the whole thing as described in the README, afterwards you'll be show this kind of stuffs:
My fairly UI

What should come next...

... When I'll have some time.
The next logical step, for me at least, from here is now to deploy everything on a cluster. Because, for now, back in the first post, the context was created locally, so everything to going fine... maybe due to this fact.

So that'll be the next thing in this series, deploy on Amazon or on a local Vagrantized cluster of VMs ^^.

If any is likely to help, give me pointer or make any comments, Please do!

Hope you like the read, if only someone reached this point without getting bored. If you do, a small tweet to poke me would be awesome (@noootsab).

Friday, August 16, 2013

mid-2013, my Spark Odyssey... when Akka sparks Yahoo! finance

This entry is the second of a series that relates some work/test I'm doing using Spark. All information can be found in the very first post of this series.

In the previous post, I've explain how easy it is to consume a Twitter stream using Spark, and seamlessly easy how to attach a sentiment score to each tweet passing a given condition (read: filtered).

In this one, I'm going to tackle another fold of my test, which is fetching the current stock information for a given list of companies.
These information will be provided by the Yahoo finance API...

...well, hum, not exactly an API. Actually, Thanks to @kennyhelsens who shown me this "hack", we'll use a Yahoo! URL that produces a CSV file containing real-time (mostly) information about companies' stock changes. Information are really hard to grasp for this URL's parameter however you can find some out there.


In this series, we're trying to uses real-time streams rather that batch processing to give a real-time fluctuation of the health of given companies.
But we've just seen that we're going to consume CSVs that give a snapshot of the stocks changes.
A common solution to transform a static API to a stream is to poll it and to create a streams by appending the results one after the other.

Here, we'll try a pretty simple solution for that, we'll delegate to Akka the task to consume the CSV, to parse it, to split data by company, and to modelize each and finally to push everything to a DStream.

It's rather straightforward and we'll see that it's even simpler that we would expect since, Spark is already playing a good game with Akka by integrating out of the box an API to "consume" actor's messages.


In this scene, there will be several actors each playing its own roles (sorry Peter Seller's fans -- like me btw).
Here they are:
  • scheduler: this one is provided by Akka out-of-the-box and is able to send a "given message" to a given actor at a given interval.
  • feeder: this actor will simply fetch the CSV and produce the data to be sent to the below actor.
  • receiver: this one will receive a message whenever a data is available for a given company.
  • parentReceiver: this one will be created by Spark itself using the StreamingContext and is able to push the received Data to the right RDD in its holding DStream (see ActorReceiver).

Now the scenario:
Sequence from the scheduler to the DStream (current RDD)
Does that make sense? Let's have a short overview on the implementation.

Scheduling the poller

First we have to create a scheduler that will poll every 500ms the server... well using Akka and the separation of concerns and Message Passing Style, we'll just create a scheduled action that passes a message to a given actor:

But even before that, let's prepare the Akka field:

There are a plenty way of creating ActorSystem with Akka, but Spark is preparing some stuffs for us to create one that will respect some of its own internal conventions. This helper is available in the `spark` package, naming AkkaUtils; however it's package protected... That's why I've created my own utils to create the ActorSystem under the spark package -- I'm bad sometime, but I'm so often lazy :-D.

Actually, the Spark utility for Akka will create an ActorSystem using a host and a port for us, that'll be really helpful afterwards... because everything in Spark must be serializable and thus we cannot so easily package an ActorRef (needs an ActorSystem, ...). So that, we'll use URL to create refs rather than using actors picked within a closure in a Spark action.
Recall: that's how Spark manage to recover from a failure, it's take to lineage of the RDD or rebuild everything up from the initial data (not possible in Streams...). But specially, the actions (functions) are dispatched to machine where data reside. So the lineage contains the functions, and a function might contain ActorRefs!
Now that we have an actorSystem available, let's create the scheduler (see last section, Drawbacks, for more info about problems with that methodology):
Nothing relevant to say about that, only maybe the definition of FeederActor...

Feeding Spark with Yahoo data

Here is coming the feeding phase, we won't see how the CSV is read and parsed however we'll see what going on with the messages flow.
What's interesting in this actor is two-folds:
  1. it can receive a message containing two data: 
    1. an actorRef that it will hold in its state: this one will be the receiver
    2. a list of stocks to follow (by building the according call to Yahoo)
  2. when consuming the data and producing the data by stock, it sends them one by one to the above actor
Actually, this feeder has two responsibilities which are fetching the data and pushing it to Spark.
But how is this receiver constructed and passed?

Receiver -- our spark stuff

The receiver is actually an actor that will be created internally by Spark for us, for that it must respect yet another trait, Receiver. And a Receiver can only be an Actor (using self definition)!
Four things to point here:
  1. this actor is well-defined by extending the Receiver trait, which provide the pushBlock method.
  2. it holds a reference to actor that it, it-self, create based on a give URL -- an actor URL... this is where AkkaUtils came handy! This Actor will be the feeder actor.
  3. in the hook preStart we tell the feeder actor that this ActorRef is the Spark end-point where data must be sent to publish in the stream.
  4. when data arrives, we check in the cache if it's already there (see below), if not we call the Receiver#pushBlock with it.
Why do we have to cache? Actually when the stock market has closed, Yahoo will continually return the last change before closing... so, here, we just avoid telling our health checker that the mood is monotonously increasing or decreasing (or even just flat if zero).

The pushBlock method is the clue to publish to the underlying DStream managed by Spark, actually it will wrap our data into an Internal representation before telling the parent actor to use it: context.parent ! Data(y).

But what is that context.parent?

parentReceiver -- not our spark stuff

This receiver is totally managed by Spark, and so I won't dig into more details than just saying it's being created underneath when doing the following:
Fairly simple, yes, it's true! Actually, the StreamingContext we created at the very start of the project contains everything to create DStream based on Akka, all it needs is a reference to a Receiver (defined in the previous section).

What is this actorStream doing is creating an ActorReceiver (with a Supervisor and blahblah) which will itself create and manage a worker for the given Props, but also it will hold the reference to the current RDD in the DStream.

The worker will be able to send him messages of the form Data(d) using pushBlock, and which it'll push to the DStream.

Concise huh? If only the document was telling us that, I wouldn't have to dig that deeper in the code... the good side-effect is that now I understand it pretty well. 
So, I hope you too!

Where are we?

Up to here, we've created a DStream of data for each tweet paired with 'its' company, but also we have now some info about what the stock change of them.
What will happen in the next blog is that we'll create a multiplexed stream for which we'll compute a value for every window of 60'.

Drawbacks of the/my methodology

At this level, I can see two problems.

The scheduler

Using a scheduler is bad because we cannot ensure that each message are processed sequentially by Spark. So it would say that we push data to an RDD that correspond to the time where the whole CSV has been processed but also the messages sent by the actors!

I put this aside for the moment since I'm not building a real-life project (no-one will use this tool to take decision on the marker ^^).
Moreover, mathematically, let say at the limit, the value will be correct: reducing an amount by 2 at t⁽¹⁾ or at t⁽²⁾ is not really problematic, it's like inverting two ints in a list that we're summing over.

The consumer

I didn't shown its current implementation because its very simple and bad, that is I'm simply opening a connection on the URL, consuming everything until the end before applying the process...

That says: it's blocking and non-reactive.
Also, That says that we loose the power of parallelism since the Akka message dispatcher will have to wait a CSV to be completely processed before handling the next one.

However it's just an implementation detail that can be easily worked around without modifying the solution.

Monday, August 12, 2013

mid-2013, my Spark Odyssey...

Short intro

In this post, I'll talk about yet another technology-tester project I've worked on these last days: Spark.
Long story short, spark is a handy distributed tool that helps in manipulating Big Data from several sources -- even hot streams.


In the first part, I'll put the context in place.

  1. I'll start with a gentle bunch of words about Spark and Spark-streaming.
  2. Then I'll give some bits about the project I've realized so far, and why.
The second part will discuss some implementation details.

  1. Setup: spark streaming
  2. Twitter stream filtered by keywords to DStream
  3. Tweets' sentiment analyzer
  4. Yahoo finance CSV data and the actor consumer
  5. Actor consumer publishing on a DStream (back-ended by a dedicated actor)
  6. Aggregation (union) based on a 60 seconds length window
  7. Publishing the results using Spray
What're missing so far (among other business related things for instance):
  1. cluster deployment...
  2. ... and configuration
  3. probably a storing strategy for further consumptions (Riak, ...?)
  4. a web application that shows the results or configure the processes



I won't discuss Spark that much because there are already a plenty of good blogs/articles on that. However, I'll give you just the intuition that you got the whole thing about it:
  • Spark is a collection of functions that works on a Sequence (List) of typed data  --  Dataset = Sequence ; operation = Function
  • Chaining these functions will define a workflow for your data  --  combination
  • Each data will be processed only once by one or another Spark worker  --  Distributed
  • If it fails for some external reason, the data can be fetched again from the source and the workflow to be replayed entirely  --  Resilient
Spark defines an ResilientDistributedDataset which is very different that the classical MR -- because it is distributed by essence, a workflow can easily be iterative and doesn't require intermediate caching/storage.

Also, spark as a sub-project spark-streaming that allows manipulating streamed data. The idea is very simple by defining a DStream as being a sequence of RDDs containing the data for a certain slice of time.

Oh, RDD and DStream are typesafe -- the whole thing being Scala code.

The spark-streaming project is really well integrated and can consume a lot of streams like Twitter, File System, Kafka or even an Akka actor.


This blog relates a small project publicly available on github (spark-bd) that helped me catch the big picture of Spark, after having followed the mini course here.

The project is based on another working group I'm participating in. This project, simply called project 2, has been initiated and is still supported by the great Belgian Big Data user group (
The idea was to co-create in several workshops an application that given a list of companies to analyse;
  • catches the Twitter stream and the Yahoo finance data using Storm;
  • manipulates both by either applying a sentiment analysis or simply the changes in price;
  • aggregates by window of time both information for each company
  • gives (constantly) the information about how is going a given list of companies on the market.
Fairly simple and really funny... the project is still ongoing but you can check its progression here.

It was really funny but I wanted to see how my feeling would compare by writing the same application using my actual preferred language (Scala) and the next technology in the Big Data field I was eagerly looking at (Spark).

IMPORTANT: since if I'm not there yet to make a real comparison (a niche for another blog, later on), I'll only give an insight on my actual work.



The setup is very minimal for a Spark application. All we certainly need is reduced to two things:
  1. an SBT project declaring as dependencies Spark and Spark-streaming.
  2. in a main class/object configure the spark streaming context.
The SBT project only requires an build.sbt in the root folder that contains these lines only:

Where the only lines that are under interest are the two last ones.

Then we can create an object that will start the spark context.
Here we just asked Spark to create a streaming context on the local environment, named Project2 and that will slice the DStream into RDDs of 5 seconds duration each.

Afterwards, we asked the context to start its work... which is nothing up to now, since nothing as been added yet to the context.
So let's do it.

Twitter stream

Here we'll consume the tweets stream provided by the Twitter's API. For that, we'll have to do only two things (a pattern emerge...?):
  1. register the 4 needed information for an OAuth2 authentication to the Twitter API (so don't forget to add an application to your Twitter development account).
  2. add a tweets stream to the spark context
Actually, the streaming context as convenient methods to add a twitter stream.
For that it'll use the Twitter4J library (it's definitively not the best, but at least we can achieve our goal).

This library needs several keys to be set in the
There are a plenty of way to do that, in my project I took something rather straightforward: I've picked the Typesafe config library and added those four configuration keys in an application.conf file, then I load all value in the related System property.

As you may (or not) see, the application.conf file will look for environment variables giving values for the required key/secret/token.

That's done we can start listening for tweets:

Dead simple, after having created a model for a company/stock, we used the configured keywords as tags for the twitter stream.
Behind the sea, Twitter4J will access the stream end-point with these tags, and Twitter to pre-filter all tweets based on those keywords (in hashtag, username, text, ...).

At this stage we've only added the stream, but we didn't yet said anything about how to consume them... So we can simply call print() on the stream to ask spark printing the 10 first incoming event in each RDD.

However, that's not how we want to deal with those tweets right? Let's adapt our stream to add relevant information for our use case.

Sentiment analyzer

Now that we have a stream of tweets it's time to read them and to give them a sentiment score based on a sentiments file. The code that reads it and that assign a score for a tweet is not that important here, but the code is here.

However, what we would like to do is to adapt our stream with this score attached to each tweet.

As said earlier, Spark is a bunch of methods on Sequences of data, so we can now assert that it's true. 

Look at what was done in the map, we picked each tweet, fetch the list of sentiment entries that match the text, and also we kept the whole original status for further computation. This results in a DStream[(List[Sentiment], Status)].

Since we only want those tweets that have at least one Sentiment that matches, we then filtered on the length of this matching list.

We combined both actions to get a resulting filtered and sentiment-analyzed tweets.

Okay great, but what we really want is to have such information grouped by Company/Stock right? So that we can compute the participation of the social network to the health of each company.

To do so, we have to group semantically (based on text's content and required stocks' keywords).
Note that it's not that straightforward because a tweet could relate several companies at once!

That's all! We're now done with the twitter part, we are now getting all Data that gathered the information of a Company/Stock, the tweet and its associated sentiment. Which opens doors for further aggregations/grouping/reducing.


Let's now pause a bit, I encourage you to try your own stuffs and playing with the tweets.
For that, I also encourage you to fork/clone the original repo, where you can already start only the twitter feed and print everything in the console by running the appropriate command:
$> sbt
sbt> run-main be.bigdata.p2.P2 twitter print GOOG AAPL

This will run the start spark and filter the tweets based on the keywords associated to both GOOG and AAPL, whose tweets will be analyzed as said in the last section before being printed to the console.
WARN: don't forget to export the required environment variables


In the next blog posts, I'll continue with the explanation on how use Akka to construct Spark streams and how to combine several streams into a single one.

This multiplexed stream will then be used to compute an aggregated data by a window of time.

Having these data in hand, we'll be able to create a Spray server that expose them into a JSON format, helping us creating a client web application that simply consumes the JSON messages.

Sunday, July 14, 2013

Function1[-T1, +R] -- What the heck?

In this post, I'll try to cover an important notion of the Scala's Type System -- and a Java's Terra Incognita.

(generic) Type covariance or contravariance.

Ho yeah, I heard you... yet another post... right?
... right! but it seems that it needs a more gentle one, where Category Theory is left apart. Because, Category Theory must come when you want yet more fun, not to get basic concepts!

That's why we'll see what are these constraints in a more pragmatic way, using (important) examples.
Afterwards, I'll let you read the blogs using Category Theory ... if you like.

I thought that taking the problem from the Java side might be easier to explain, thus the following parts
  • Problem: Assignments failures using Java Generics
    • Solution: Covariance Type in Scala
  • Problem: Unuseful generics for method parameter in Java
    • Solution: Contravariance Type in Scala

Problem: Java tells me that Kids aren't Humans

Since Java 5, as Java developers, we were really interested in the new breaking feature that was added: Generic Types.
And it was rather interesting enough thanks to the java.util.Collection API, but also the syntactic sugar for (A a: as).
However, we also felt really quickly on the problem that the hierarchy of Generics doesn't span to upper level type (as explain in the Java tutorial).

Let's clarify the situation with an example.
First we define a model with an classical hierarchy:

Okay, with this setup, one might want to create a list of kids and assign it to a list of human. Why? 
  1. because they are...
  2. because they grow!
Recall: in OO a model (should) represents the domain.

Fine! Here is several tries:

See? You just can't in Java! Because a ArrayList is not considered to be an instance of ArrayList. This will impact all your further algorithms, unless you use the tricky tip to anonymise your type and to constrain it with a the Human type as bound.

At least we spotted the real problem... Let's see the solution that Scala offers.

Solution: Covariance

In Scala, a type can be defined to be covariant to one (or several) of its generic type (roughly speaking).
We can simply check the Scala's List definition: 

sealed abstract classList[+A]

Here, we tells the compiler that List is covariant with its generic type A.

In other words? List will vary accordingly with the variance of A!

And life gets simple (respecting the OO concepts):

Following the type of generic type is pretty cool and useful, but there are cases where it won't help... let's jump to the next section if you don't believe me...

Problem: Java tells me that Kids aren't Human and Integer isn't a Number...

To illustrate this case, we'll create two helper type that define no-arg procedure and an action (a function) with one argument.
Simple as simple.

We also created to action definitions, one that maps a single integer onto a single human, and another one that maps a number onto an adult.

This will help us defining an higher order operation that, for instance, can map a list of integers into a new list of humans. For that, we'll create the ListMapper class (see below) that will map an input type I to an output type O.

Using this ListMapper, we'll try to convert:
  • a list of integer into a list of humans 
  • a list of numbers into a list of adults
We can easily imagine that the logic should remain in the actions introduced in above. Right?
Let's see how it goes in Java thus:
It seems that the mapper from Integer to Human is only able to deal with an action that takes exactly these types as input and output resp.
But, it's weird, because something that works on Number should be able to handle any Integer, isn't it?
And, if the returned object is an Adult, it should be ok to assign it to an Human? True?

For sure, but for the same reason as before, Java is not able to deal with such multi-levels hierarchies. crap...

Before going ahead, one might have noticed that the inheritance hierarchies are opposites:
  • Number is extended by Integer
  • Adult extends Human
But the mapper should be able to accept both pairs of input/output type! damn... the action type doesn't seems to vary the same way for input as for output!

Indeed, an action must be contravariant on its Input and covariant with its Output.

Here we are.

Solution: Contravariance

In the example above, we introduce action that acted as function taking a single argument. And those actions had to behave differently on the type of the input and the output.

Let's check this out in Scala, looking at what looks like such function:


Bloody hell, a function type in Scala will vary in the opposite way as for the covariant types. That's why a minus sign is used on the T1 type.

Simple said: a function F will be extended by any function that respect either or both of these cases:
  • its argument is a super-type of the F's argument.
  • its type (output type) is a sub-type of the F's type.
Let's see everything in action now:
That was for illustration purpose because the types weren't optimize for maximal genericity, however we can see that we were able to use a function that takes an AnyVal argument and results into an Adult object where a function from an Int to a Human were used!


A lot for nothing? Maybe?
But it's a question that is very common for Scala newcomers. And I think it's one of the most important ones, thus it must be answered in the more precise and accessible way.

Furthermore, it also demonstrates that Scala is even more Object Oriented that Java can be.

Please let me know, if it can be more clearer, this blog is meant to be organic, so any comments, concerns, helps are more than welcome