Tutorial: Distributed¶
Overview¶
In this section, we are building a distributed neural network. The key part that needs to be handled from the user perspective is how the data will be divided when passed to different computing units.
The underlying distributed API is a wrapper over Horovod the open source component of Uber’s Michelangelo. https://github.com/uber/horovod Michealangelo was designed to deploy and scale distributed deep learning projects over multiple GPUs in a network.
The remainder of the distributed book-keeping is managed with the MPI package http://mpi4py.scipy.org/docs/. The beauty of MPI is that it handles both multithreaded and multiserver synchronization ( for the purposes here we’ll call both distributed even though the term is traditionally applied to just the latter).
Schema¶
In short, the _schema
method in model describes where the data is
stored and how the retrieve it. This means as it is set up right now, we
need to create a folder with the data broken up into different files
(shards). Then define a corresponding _schema
which overrides the
default.
Shard Example¶
Say we originally have the following directory structure:
data
-->full_data.tsv
Then, we can divide the full_data.tsv into by using unix head
and
tail
:
head -100 data/full_data.tsv > data/shard0/shard0.tsv
tail -n +101 > data/shard1/shard1.tsv
This gives us the following directory structure:
data
--->full_data.tsv
--->shard0
---/--->shard0.tsv
--->shard1
---/--->shard1.tsv
This lets us train simultaneously on shard0 and shard1
In our example, we define tsv_reader
, which accesses the data with
pandas and provide for the descriptors and target the following three
arguments: file name, table class, and helper function in our case
tsv_reader
.
def _schema(self):
tsv_reader = lambda fpath: pd.read_csv(fpath, sep='\t',
dtype={'ID':str}).set_index('ID')
return {'desc': ('descriptors.tsv', tflon.data.Table, tsv_reader),
'targ': ('targets.tsv', tflon.data.Table, tsv_reader)}
Distributed Feeds and Distributed Trainer¶
Thankfully, most of hardwork has already been done. It simply suffices to interface the functionality from tflon.distributed
Setup¶
First, we need to initialize horovod and gpu resources with a 1000ms timeout if training gets stalled:
tflon.data.TensorQueue.DEFAULT_TIMEOUT=1000
config = tflon.distributed.init_distributed_resources()
Trainer¶
We use the built-in DistributedTrainer
which interfaces just like
the ordinary trainer, OpenOptTrainer. According to the creators, Adam
Optimization takes advantage of
"AdaGrad (Duchi et al., 2011), which works well with sparse gradients,
and RMSProp (Tieleman & Hinton, 2012), which works well in on-line and non-stationary
settings"
This explicitly is implemented as
trainer = tflon.distributed.DistributedTrainer( tf.train.AdamOptimizer(1e-3),
iterations=1000 )
Feed¶
For defining the feed, we use make_distributed_table_feed
. The first
argument is the path to the directory containing the
shards(subdirectories with partitions of the data). The second argument
denotes the scheme. The master_table argument is simply indicating
which key in the table to use as the index. By default, the feed will
divide up the shards evenly between available processors.
feed = tflon.distributed.make_distributed_table_feed(
resource_filename('tflon_test.data', 'distributed'),
NN.schema, master_table='desc', partition_strategy='all' )
Executing the model¶
Since the distributed table is implemented with MPI, you will need to run it with mpirun and in our case two threads:
mpirun -np 2 python ./distributed.py
The table will not work properly and throw an error if mpi threads have
no data, so the number of threads requested with the np
option must
be less than the number of shards
Warning¶
Due to the way MPI is designed, the is_master
with anything that
involved distributed data. For example, anything during the training
phase. Only use is_master
post-training. In our example, we use it
for the evaluation.
Example¶
Below is an example of putting all these changes together:
import tflon
import tensorflow as tf
import pandas as pd
from pkg_resources import resource_filename
class NeuralNet(tflon.model.Model):
def _schema(self):
tsv_reader = lambda fpath: pd.read_csv(fpath, sep='\t',
dtype={'ID':str}).set_index('ID')
return {'desc': ('descriptors.tsv', tflon.data.Table, tsv_reader),
'targ': ('targets.tsv', tflon.data.Table, tsv_reader)}
def _model(self):
I = self.add_input('desc', shape=[None, 210])
T = self.add_target('targ', shape=[None, 1])
net = tflon.toolkit.WindowInput() |\
tflon.toolkit.Dense(20, activation=tf.tanh) |\
tflon.toolkit.Dense(5, activation=tf.tanh) |\
tflon.toolkit.Dense(1)
L = net(I)
self.add_output( "pred", tf.nn.sigmoid(L) )
self.add_loss( "xent", tflon.toolkit.xent_uniform_sum(T, L) )
self.add_loss( "l2", tflon.toolkit.l2_penalty(self.weights) )
self.add_metric( 'auc', tflon.toolkit.auc(T, L) )
if __name__=='__main__':
tflon.data.TensorQueue.DEFAULT_TIMEOUT=1000
# Initialize horovod and setup gpu resources
config = tflon.distributed.init_distributed_resources()
graph = tf.Graph()
with graph.as_default():
# Add a model instance
NN = NeuralNet(use_gpu=True)
# Create the distributed trainer
trainer = tflon.distributed.DistributedTrainer( tf.train.AdamOptimizer(1e-3),
iterations=1000 )
# Create the data feed, use the same feed for all process instances
# tflon.distributed.DistributedTable adds MPI synchronization to the Table API min and max ops
# Usually, different data would be loaded on each process (see tflon.distributed.make_distributed_table_feed)
feed = tflon.distributed.make_distributed_table_feed(
resource_filename('tflon_test.data', 'distributed'),
NN.schema, master_table='desc', partition_strategy='all' )
with tf.Session(graph=graph, config=config):
# Train with minibatch size 100
NN.fit( feed.shuffle(batch_size=100), trainer, restarts=2, source_tables=feed )
# Perform inference on the master process
if tflon.distributed.is_master():
metrics = NN.evaluate( feed )
print "AUC:", metrics['auc']
AUC: 0.9991015
In [ ]: