Tutorial: Distributed

Overview

In this section, we are building a distributed neural network. The key part that needs to be handled from the user perspective is how the data will be divided when passed to different computing units.

The underlying distributed API is a wrapper over Horovod the open source component of Uber’s Michelangelo. https://github.com/uber/horovod Michealangelo was designed to deploy and scale distributed deep learning projects over multiple GPUs in a network.

The remainder of the distributed book-keeping is managed with the MPI package http://mpi4py.scipy.org/docs/. The beauty of MPI is that it handles both multithreaded and multiserver synchronization ( for the purposes here we’ll call both distributed even though the term is traditionally applied to just the latter).

Schema

In short, the _schema method in model describes where the data is stored and how the retrieve it. This means as it is set up right now, we need to create a folder with the data broken up into different files (shards). Then define a corresponding _schema which overrides the default.

Shard Example

Say we originally have the following directory structure:

data
-->full_data.tsv

Then, we can divide the full_data.tsv into by using unix head and tail:

head -100 data/full_data.tsv > data/shard0/shard0.tsv
tail -n +101 > data/shard1/shard1.tsv

This gives us the following directory structure:

data
--->full_data.tsv
--->shard0
---/--->shard0.tsv
--->shard1
---/--->shard1.tsv

This lets us train simultaneously on shard0 and shard1

In our example, we define tsv_reader, which accesses the data with pandas and provide for the descriptors and target the following three arguments: file name, table class, and helper function in our case tsv_reader.

def _schema(self):
    tsv_reader = lambda fpath: pd.read_csv(fpath, sep='\t',
                               dtype={'ID':str}).set_index('ID')
    return {'desc': ('descriptors.tsv', tflon.data.Table, tsv_reader),
            'targ': ('targets.tsv', tflon.data.Table, tsv_reader)}

Distributed Feeds and Distributed Trainer

Thankfully, most of hardwork has already been done. It simply suffices to interface the functionality from tflon.distributed

Setup

First, we need to initialize horovod and gpu resources with a 1000ms timeout if training gets stalled:

tflon.data.TensorQueue.DEFAULT_TIMEOUT=1000
config = tflon.distributed.init_distributed_resources()

Trainer

We use the built-in DistributedTrainer which interfaces just like the ordinary trainer, OpenOptTrainer. According to the creators, Adam Optimization takes advantage of

"AdaGrad (Duchi et al., 2011), which works well with sparse gradients,
and RMSProp (Tieleman & Hinton, 2012), which works well in on-line and non-stationary
settings"

This explicitly is implemented as

trainer = tflon.distributed.DistributedTrainer( tf.train.AdamOptimizer(1e-3),
            iterations=1000 )

Feed

For defining the feed, we use make_distributed_table_feed. The first argument is the path to the directory containing the shards(subdirectories with partitions of the data). The second argument denotes the scheme. The master_table argument is simply indicating which key in the table to use as the index. By default, the feed will divide up the shards evenly between available processors.

feed = tflon.distributed.make_distributed_table_feed(
        resource_filename('tflon_test.data', 'distributed'),
        NN.schema, master_table='desc', partition_strategy='all' )

Executing the model

Since the distributed table is implemented with MPI, you will need to run it with mpirun and in our case two threads:

mpirun -np 2 python ./distributed.py

The table will not work properly and throw an error if mpi threads have no data, so the number of threads requested with the np option must be less than the number of shards

Warning

Due to the way MPI is designed, the is_master with anything that involved distributed data. For example, anything during the training phase. Only use is_master post-training. In our example, we use it for the evaluation.

Example

Below is an example of putting all these changes together:

import tflon
import tensorflow as tf
import pandas as pd
from pkg_resources import resource_filename

class NeuralNet(tflon.model.Model):
    def _schema(self):
        tsv_reader = lambda fpath: pd.read_csv(fpath, sep='\t',
                                    dtype={'ID':str}).set_index('ID')
        return {'desc': ('descriptors.tsv', tflon.data.Table, tsv_reader),
                'targ': ('targets.tsv', tflon.data.Table, tsv_reader)}

    def _model(self):
        I = self.add_input('desc', shape=[None, 210])
        T = self.add_target('targ', shape=[None, 1])
        net = tflon.toolkit.WindowInput() |\
              tflon.toolkit.Dense(20, activation=tf.tanh) |\
              tflon.toolkit.Dense(5, activation=tf.tanh) |\
              tflon.toolkit.Dense(1)
        L = net(I)

        self.add_output( "pred", tf.nn.sigmoid(L) )
        self.add_loss( "xent", tflon.toolkit.xent_uniform_sum(T, L) )
        self.add_loss( "l2", tflon.toolkit.l2_penalty(self.weights) )
        self.add_metric( 'auc', tflon.toolkit.auc(T, L) )

if __name__=='__main__':
    tflon.data.TensorQueue.DEFAULT_TIMEOUT=1000
    # Initialize horovod and setup gpu resources
    config = tflon.distributed.init_distributed_resources()

    graph = tf.Graph()
    with graph.as_default():
        # Add a model instance
        NN = NeuralNet(use_gpu=True)

        # Create the distributed trainer
        trainer = tflon.distributed.DistributedTrainer( tf.train.AdamOptimizer(1e-3),
                    iterations=1000 )

    # Create the data feed, use the same feed for all process instances
    # tflon.distributed.DistributedTable adds MPI synchronization to the Table API min and max ops
    # Usually, different data would be loaded on each process (see tflon.distributed.make_distributed_table_feed)
    feed = tflon.distributed.make_distributed_table_feed(
            resource_filename('tflon_test.data', 'distributed'),
            NN.schema, master_table='desc', partition_strategy='all' )

    with tf.Session(graph=graph, config=config):
        # Train with minibatch size 100
        NN.fit( feed.shuffle(batch_size=100), trainer, restarts=2, source_tables=feed )

        # Perform inference on the master process
        if tflon.distributed.is_master():
            metrics = NN.evaluate( feed )
            print "AUC:", metrics['auc']

AUC: 0.9991015

In [ ]: