Demo entry 3297545

index_fasta

   

Submitted by anonymous on Dec 07, 2015 at 22:42
Language: Python. Code size: 6.0 kB.

# Utils.
def _starts_with(lst, prefix):
  return lst[:len(prefix)] == prefix

def _drop_prefix(lst, prefix):
  if _starts_with(lst, prefix):
    return lst[len(prefix):]
  return lst

def parse_header_dict(header):
  header = _drop_prefix(header, '>').split(' ')
  header_dict = {'accession': header[0]}
  for entry in header[1:]:
      key, value = entry.split('=')
      header_dict[key] = value
  return header_dict

# Defines a generator that produces fasta dictionaries parsed from the given
# iterable of fasta lines (see tests for examples).
def parse_fasta_gen(fasta_lines):
    entry = None
    for line in fasta_lines:
        if _starts_with(line, '>'):
            if entry:
                yield entry
            entry = parse_header_dict(line)
            entry['sequence'] = ''
        else:
            entry['sequence'] = entry['sequence'] + line
    if entry:
        yield entry

# Indexes the given list of fasta dicts by creating a dict of fasta dicts,
# indexed by the given key. Any fields in collect_fields will be accumulated
# into a list. If a field is not listed in collect_fields, the values of the
# first entry will be used and the values of other entries will be discarded.
# See tests for examples.
def index_fasta(fasta_dicts, key='range', collect_fields=None):

    # Default param (avoiding Python bug due to mutability).
    if collect_fields is None:
        collect_fields = ['accession']

    indexed_dicts = {}
    for fd in fasta_dicts:
        header_key = fd[key]
        if header_key in indexed_dicts:
            entry = indexed_dicts[header_key]
            for field in collect_fields:
                entry[field + '_list'].append(fd[field])
        else:
            entry = fd.copy()
            for field in collect_fields:
                entry[field + '_list'] = [entry[field]]
                del entry[field]
            indexed_dicts[header_key] = entry
    return indexed_dicts

# Similar to index_fasta(..), but loads the fasta data from a file.
def load_indexed_fasta(filename, **kwargs):
    with open(filename, 'r') as file_lines:
        # Stream the file without newlines by making it a generator.
        def without_newlines():
            for line in file_lines:
                yield line.rstrip('\n')

        fasta_dicts = parse_fasta_gen(without_newlines())
        return index_fasta(fasta_dicts, **kwargs)


# Testing code.
def run_tests():
    # Test main workflow.
    fasta_lines = [
        '>mm9_knownGene_uc007amb.1 range=chr1:11-22 5\'pad=0 3\'pad=0 strand=- repeatMasking=lower',
        'aacc',
        'AACC',
        '>mm9_knownGene_uc007aro.1 range=chr1:33-44 5\'pad=0 3\'pad=0 strand=- repeatMasking=lower',
        'ggtt',
        'GGTT',
        '>mm9_knownGene_uc007amb.2 range=chr1:11-22 5\'pad=0 3\'pad=0 strand=- repeatMasking=lower',
        'aacc',
        'AACC',
    ]
    parsed_fasta = [
        {
            'accession': 'mm9_knownGene_uc007amb.1',
            'range': 'chr1:11-22',
            '5\'pad': '0',
            '3\'pad': '0',
            'strand': '-',
            'repeatMasking': 'lower',
            'sequence': 'aaccAACC',
        },
        {
            'accession': 'mm9_knownGene_uc007aro.1',
            'range': 'chr1:33-44',
            '5\'pad': '0',
            '3\'pad': '0',
            'strand': '-',
            'repeatMasking': 'lower',
            'sequence': 'ggttGGTT',
        },
        {
            'accession': 'mm9_knownGene_uc007amb.2',
            'range': 'chr1:11-22',
            '5\'pad': '0',
            '3\'pad': '0',
            'strand': '-',
            'repeatMasking': 'lower',
            'sequence': 'aaccAACC',
        },
    ]
    indexed_fasta = {
        'chr1:11-22': {
            'accession_list': [
                'mm9_knownGene_uc007amb.1',
                'mm9_knownGene_uc007amb.2',
            ],
            'range': 'chr1:11-22',
            '5\'pad': '0',
            '3\'pad': '0',
            'strand': '-',
            'repeatMasking': 'lower',
            'sequence': 'aaccAACC',
        },
        'chr1:33-44': {
            'accession_list': [
                'mm9_knownGene_uc007aro.1',
            ],
            'range': 'chr1:33-44',
            '5\'pad': '0',
            '3\'pad': '0',
            'strand': '-',
            'repeatMasking': 'lower',
            'sequence': 'ggttGGTT',
        },
    }
    assert_equals(parsed_fasta, list(parse_fasta_gen(fasta_lines)))
    assert_equals(indexed_fasta, index_fasta(parsed_fasta))

    # Test indexing.
    input_fasta = [
        {
            'accession': 'mm9_knownGene_uc007amb.1',
            'range': 'chr1:11-22',
            'strand': '-',
        },
        {
            'accession': 'mm9_knownGene_uc007amb.2',
            'range': 'chr1:11-22',
            'strand': '+',
        },
    ]
    indexed_by_range = {
        'chr1:11-22': {
            'accession_list': [
                'mm9_knownGene_uc007amb.1',
                'mm9_knownGene_uc007amb.2',
            ],
            'range': 'chr1:11-22',
            'strand': '-',
        },
    }
    indexed_by_strand = {
        '-': {
            'accession_list': [
                'mm9_knownGene_uc007amb.1',
            ],
            'range_list': [
                'chr1:11-22',
            ],
            'strand': '-',
        },
        '+': {
            'accession_list': [
                'mm9_knownGene_uc007amb.2',
            ],
            'range_list': [
                'chr1:11-22',
            ],
            'strand': '+',
        },
    }
    assert_equals(indexed_by_range, index_fasta(input_fasta))
    assert_equals(indexed_by_strand,
        index_fasta(input_fasta,
                    key='strand',
                    collect_fields=['accession', 'range']))

    print 'All tests pass'

def assert_equals(expected, actual):
    assert expected == actual, \
        'expected <{}>, but got <{}>'.format(expected, actual)

#run_tests()

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).