European Soccer Events Analysis with Apache Spark and Databricks

The global sports market is huge, comprising of players, teams, leagues, fan clubs, sponsors etc., and all of these entities interact in myriad ways generating enormous amount of data. Some of that data is used internally to help make better decisions, and there are number of use cases within the Media industry that utilize the same data to create better products and attract/retain viewers.

Few ways how the Sports and Media industries have started utilizing big data are:

  • Analyze on-field conditions and events (passes, player positions etc.) that lead to soccer goals, football touchdowns or baseball home runs etc.
  • Assess the win-loss percentage with different combinations of players in different on-field positions.
  • Track a sportsperson’s or team’s performance graph over the years/seasons
  • Analyze real-time viewership events and social media streams to provide targeted content, and more..

European Soccer Leagues Data

We all have a favorite sport, and mine is Soccer for its global appeal, amazing combination of skills & strategy, and the extreme fans that add extra element to the game.

In this article, we’ll see how one could use Databricks to create an end-to-end data pipeline with European Soccer games data – including facets from Data Engineering, Data Analysis and Machine Learning, to help answer business questions. We’ll utilize a dataset from Kaggle, that provides a granular view of 9,074 games, from the biggest 5 European soccer leagues: England, Spain, Germany, Italy and France, from 2011 to 2016 season.

Primary dataset is specific events from the games in chronological order, including key information like:

  • id_odsp – unique identifier of game
  • time – minute of the game
  • event_type – primary event
  • event_team – team that produced the event
  • player – name of the player involved in main event
  • shot_place – placement of the shot, 13 possible placement locations
  • shot_outcome – 4 possible outcomes
  • location – location on the pitch where the event happened, 19 possible locations
  • is_goal – binary variable if the shot resulted in a goal (own goals included)
  • And more..

Second smaller dataset includes high-level information and advanced stats with one record per game. Key attributes are “League”, “Season”, “Country”, “Home Team”, “Away Team” and various market odds.

Data Engineering

We start out by creating a ETL notebook, where the two CSV datasets are transformed and joined into a single parquet data layer, which enables us to utilize DBIO caching feature for high-performance big data queries.

Extraction

First task is to create a Dataframe schema for the larger game events dataset, so the read operation doesn’t spend time inferring it from the data. Once extracted, we’ll replace “null” values for interesting fields with data-type specific constants.

Data_Events_Extract
Game events data extraction and replace nulls

This is what the raw data (with some nulls replaced) looks like:

Data_Events_Display
Display game events data

We also read the second dataset into a Dataframe, as it includes country name which we’ll use later during analysis.

Data_Agg_Extract_Display
Extract and display high-level game information

Transformation

Next step is to transform and join the dataframes into one. Many fields of interest in the game events dataframe have numeric IDs, so we define a generic UDF that could use look-up tables for mapping IDs to descriptions.

UDF_maptokeyval
Generic UDF for look-up

The mapped descriptions are stored in new columns in the dataframe. So once the two dataframes are joined, we’ll filter out the original numeric columns to keep it as sparse as possible. We’ll also use QuantileDiscretizer to add a categorical “time_bin” column based on “time” field.

transform_and_join
Look-up using UDF and Joining of Dataframes

Loading

Once the data is in desired shape, we’ll load it as parquet into a Spark/Databricks table that would reside in a domain specific database. The database and table will be registered with internal Databricks metastore, and the data will be stored in DBFS. We’ll partition the parquet data by “country_code” during write.

load_parquet
Load final dataframe into Spark SQL table

Data Analysis

Now that the data shape and format is all set, it’s time to dig in and, try and find answers to a few business questions. We’ll use plain-old super-strong SQL (Spark SQL) for that purpose, and create a second notebook from the perspective of data analysts.

For example, if one wants to see distribution of goals by shot place, then it could look like this simple query and resulting pie-chart (or alternatively viewable as a data-grid):

dist_goals_sp
Distribution of goals by shot place

Or if the requirement is to see distribution of goals by countries/leagues, it could look like this map visualization (which needs ISO country codes, or US state codes as a column):

dist_goals_countries
Distribution of goals by countries/leagues

Once we observe that Spanish league has had most goals over the term of this data, we could find top 3 goals locations per shot place from the games in Spain, by writing a more involved query using Window functions in Spark SQL. It would be a stepwise nested query:

top_locs_sp.png
Top 3 goal locations per spot place in Spanish league

We could do time-based analysis as well, e.g. by observing total number of goals over the course of a game (0-90+ minutes), across all games in the five leagues. We could use the “time_bin” column created as part of transformation process earlier, rather than a continuous variable like “time”:

goals_time_bin
Goals per time bin per country/league

Machine Learning

As we saw, doing descriptive analysis on big data (like above) has been made super easy with Spark SQL and Databricks. But what if you’re a Data Scientist who’s looking at the same data to find combinations of on-field playing conditions that lead to “goals”?

We’ll now create a third notebook from that perspective, and see how one could fit a GBT classifier Spark ML model on the game events training dataset. In this case, our binary classification label will be field “is_goal”, and we’ll use a mix of categorical features like “event_type_str”, “event_team”, “shot_place_str”, “location_str”, “assist_method_str”, “situation_str” and “country_code”.

ml_imports
Spark ML imports for the use case

Then the following three-step process is required to convert our categorical feature columns to a single binary vector:

ml_pre-transforms
Spark ML pre-modeling transformations

Finally we’ll create a Spark ML Pipeline using the above transformers and the GBT classifier. We’ll divide the game events data into training and test datasets, and fit the pipeline to the former.

ml_modeling
Create Spark ML pipeline and fit over training data

Now we can validate our classification model by running inference on test dataset. We could compare the predicted label with actual label one by one, but that could be a painful process for lots of test data. For scalable model evaluation in this case, we can use BinaryClassificationEvaluator with Area under ROC metric. One could also use the Precision-Recall Curve as evaluation metric. If it was a multi-classification problem, we could’ve used the MulticlassClassificationEvaluator.

ml_evaluate_model
Validate Spark ML model

Conclusion

Above end-to-end data pipeline showcases how different sets of users i.e. Data Engineers, Data Analysts and Data Scientists could work together to find hidden value in big data from any sports.

But this is just the first step. A Sports or Media organization could do more by running model-based inference on real-time streaming data processed using Structured Streaming, in order to provide targeted content to its users. And then there are many other ways to combine different Spark/Databricks technologies, to solve different big data problems in Sport and Media industries.

Try Databricks on AWS or Azure Databricks today.

Amazon (AWS) SQS with Spring Cloud

Amazon Simple Queue Service (SQS) is a HA cloud messaging service, that can be used to decouple parts of a single system, or integrate multiple disparate systems. A SQS queue acts as buffer between a message producer and a consumer, when former’s message generation rate is more than latter’s processing throughput. It also provides durable point-to-point messaging, such that messages are stored in a fail-safe queue, and can be processed once a consumer application comes up after scheduled maintenance or unexpected downtime. For publish-subscribe design pattern, Amazon (AWS) provides another cloud service called Simple Notification Service (SNS).

Integration_with_Amazon_SQS

Above diagram shows a subset of possible producer-consumer scenarios that can utilize SQS.

For developers who use JMS to connect to messaging providers/brokers like ActiveMQ, Websphere MQ, JBoss Messaging etc., they can use Amazon SQS Java Messaging Library to work with SQS. One can use it along with AWS SDK to write JMS 1.1 compliant applications. But for real-world applications, writing directly with AWS provided library can be cumbersome. That’s where Spring Cloud AWS comes to the rescue. In this post, I’ll show how one can use Spring Cloud with Spring Boot to write a SQS message producer and a consumer. Before that, a few important points to note when working with SQS:

  • Maximum payload of a SQS message can be 256 KB. For applications that have need to send more data in a single packet, could use SQS Extended Client Library which utilizes AWS S3 as the message store (with a reference being sent in actual SQS message). Other option is to compress the message before sending over SQS.
  • Messages can be sent synchronously, but received either synchronously or asynchronously (Using AmazonSQSAsyncClient). If required, one can send messages in a separate child thread so as not to block the main thread for SQS IO operation.
  • Messages can be sent or received in batch to increase throughput (10 messages at max in a batch). Please note that maximum payload size limit for a batch message is also 256 KB, so it can only be used for lighter payloads.
  • There can be multiple producer and consumer threads interacting with the same queue, just like any other messaging provider/broker. Multiple consumer threads provide load balancing for message processing.
  • Consumers can poll for messages using long polling and standard polling. It’s recommended to use long polling to avoid too many empty responses (one can configure timeout).
  • Each message producer or consumer needs AWS credentials to work with SQS. AWS provides different implementations of AWSCredentialsProvider for the same. DefaultAWSCredentialsProviderChain is used by default, if no provider is configured specifically.

Now, how to use Spring Cloud with SQS. First of all, below dependencies should be provided in  Maven project’s pom.xml or Gradle’s build file. AWS SDK and other Spring dependencies would be automatically added to the classpath (unless already specified explicitly).

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-autoconfigure</artifactId>
	<version>latest.version</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-aws-messaging</artifactId>
	<version>latest.version</version>
</dependency>

From a configuration perspective, we first need to wire the appropriate AWS SQS client. Assuming  asynchronous interaction with SQS, we will configure a AmazonSQSAsyncClient. There’re a couple of things to note. One, if you’re behind a proxy, you’ll need to specify proxy host and port using ClientConfiguration. Second, DefaultAWSCredentialsProviderChain will look for AWS access key, secret access key and default region in this order – environment variables, system variables, properties file and instance profile (only in EC2). We’ve just overridden the default region below (which is also us-east-1 though).

@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
    AmazonSQSAsyncClient awsSQSAsyncClient;
    if (useProxy.equalsIgnoreCase("Y")) {
	ClientConfiguration clientConfig = new ClientConfiguration();
	clientConfig.setProxyHost("proxy.xyz.com");
	clientConfig.setProxyPort(8085);

	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), clientConfig);
    } else {
	awsSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
    }
		
    awsSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName("us-east-1")));
    return awsSQSAsyncClient;
}

Next we look at configuring a QueueMessagingTemplate, which will be used to send messages to a SQS queue.

@Configuration
public class ProducerAWSSQSConfig {
   ...

   @Bean
   public QueueMessagingTemplate queueMessagingTemplate() {
	return new QueueMessagingTemplate(amazonSQSClient());
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}

@Component
public class MessageProducer {

   private QueueMessagingTemplate sqsMsgTemplate;
   ...

   public void sendMessage(String payload) {
      ...
      this.sqsMsgTemplate.convertAndSend("randomQName", payload);
      ...
   }
}

We’re sending a String message, which will be converted using a StringMessageConverter internally. One can also send a complete data object as JSON, if Jackson library is available on classpath (using MappingJackson2MessageConverter).

Finally, we look at configuring a SimpleMessageListenerContainer and QueueMessageHandler, which will be used to create a message listener for SQS queue. Please note that internally Spring Cloud creates a SynchonousQueue based thread pool to listen to different queues and to process received messages using corresponding handlers. Number of core threads for thread pool are 2 * number of queues (or configured listeners), and property “maxNumMsgs” determines the maximum number of threads. Property “waitTimeout” determines the timeout period for long polling (default with Spring Cloud).

public class ConsumerAWSSQSConfig {
   ...

   @Bean
   public SimpleMessageListenerContainer simpleMessageListenerContainer() {
	SimpleMessageListenerContainer msgListenerContainer = simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
	msgListenerContainer.setMessageHandler(queueMessageHandler());
	
        return msgListenerContainer;
   }

   @Bean
   public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
	SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
	msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
	msgListenerContainerFactory.setDeleteMessageOnException(false);
	msgListenerContainerFactory.setMaxNumberOfMessages(maxNumMsgs);
	msgListenerContainerFactory.setWaitTimeOut(waitTimeout);
	
        return msgListenerContainerFactory;
   }

   @Bean
   public QueueMessageHandler queueMessageHandler() {
	QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();
	queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
	
        QueueMessageHandler queueMessageHandler = queueMsgHandlerFactory.createQueueMessageHandler();

	return queueMessageHandler;
   }
   
   @Lazy
   @Bean(name = "amazonSQS", destroyMethod = "shutdown")
   public AmazonSQSAsync amazonSQSClient() {
   ...
}
@Component
public class MessageConsumer {
   ...

   @MessageMapping("randomQName")
   public void onRandomQMessage(String payload) {
	doSomething(payload);
   }
}

One can also send custom message attributes (headers) with a SQS message, so as to provide contextual information to the consumer. In addition, there’re important concepts of visibility timeout and delay queues, which should be explored before getting your hands dirty with SQS.

Hope you have fun with Spring Cloud and AWS SQS.