Skip to content

Commit

Permalink
Use modes in SequenceWriter (biopython#4850)
Browse files Browse the repository at this point in the history
Also use a temporary file for GenBank output test
  • Loading branch information
mdehoon authored Sep 23, 2024
1 parent e6d24c5 commit 0b57f76
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 36 deletions.
2 changes: 2 additions & 0 deletions Bio/SeqIO/FastaIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ class FastaWriter(SequenceWriter):
``Bio.SeqIO.write()`` function instead using ``format="fasta"``.
"""

modes = "t"

def __init__(self, target, wrap=60, record2title=None):
"""Create a Fasta writer (OBSOLETE).
Expand Down
2 changes: 2 additions & 0 deletions Bio/SeqIO/InsdcIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ class _InsdcWriter(SequenceWriter):
"transl_table",
)

modes = "t"

def _write_feature_qualifier(self, key, value=None, quote=None):
if not _allowed_table_component_name_chars.issuperset(key):
warnings.warn(
Expand Down
62 changes: 35 additions & 27 deletions Bio/SeqIO/Interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,37 +169,45 @@ class SequenceWriter:
the number of records.
"""

def __init__(self, target: _IOSource, mode: str = "w") -> None:
@property
@abstractmethod
def modes(self):
"""File modes (binary or text) that the writer can handle.
This property must be "t" (for text mode only), "b" (for binary mode
only), "tb" (if both text and binary mode are accepted, but text mode
is preferred), or "bt" (if both text and binary mode are accepted, but
binary mode is preferred).
"""
pass

def __init__(self, target: _IOSource) -> None:
"""Create the writer object."""
if mode == "w":
if isinstance(target, _PathLikeTypes):
# target is a path
handle = open(target, mode)
else:
if isinstance(target, _PathLikeTypes):
mode = self.modes[0]
stream = open(target, "w" + mode)
else:
stream = target
modes = "tb"
values = ("", b"")
for mode, value in zip(modes, values):
try:
handle = target
target.write("")
stream.write(value)
except TypeError:
# target was opened in binary mode
raise StreamModeError("File must be opened in text mode.") from None
elif mode == "wb":
if isinstance(target, _PathLikeTypes):
# target is a path
handle = open(target, mode)
continue
else:
break
else:
handle = target
try:
target.write(b"")
except TypeError:
raise RuntimeError("Failed to read from input data") from None
if mode not in self.modes:
if mode == "t":
# target was opened in text mode
raise StreamModeError(
"File must be opened in binary mode."
) from None
else:
raise RuntimeError(f"Unknown mode '{mode}'")

self._target = target
self.handle = handle
raise StreamModeError("File must be opened in binary mode.")
elif mode == "b":
# target was opened in binary mode
raise StreamModeError("File must be opened in text mode.")
self.target = target
self.handle = stream

def clean(self, text: str) -> str:
"""Use this to avoid getting newlines in the output."""
Expand Down Expand Up @@ -269,7 +277,7 @@ def write_file(self, records, mincount=0, maxcount=None):
count = self.write_records(records, maxcount)
self.write_footer()
finally:
if self.handle is not self._target:
if self.handle is not self.target:
self.handle.close()
if count < mincount:
if mincount == 1: # Common case
Expand Down
4 changes: 3 additions & 1 deletion Bio/SeqIO/NibIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ def __next__(self):
class NibWriter(SequenceWriter):
"""Nib file writer."""

modes = "b"

def __init__(self, target):
"""Initialize a Nib writer object.
Arguments:
- target - output stream opened in binary mode, or a path to a file
"""
super().__init__(target, mode="wb")
super().__init__(target)

def write_header(self):
"""Write the file header."""
Expand Down
2 changes: 2 additions & 0 deletions Bio/SeqIO/PhdIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def __next__(self):
class PhdWriter(SequenceWriter):
"""Class to write Phd format files."""

modes = "t"

def __init__(self, handle: _IOSource) -> None:
"""Initialize the class."""
super().__init__(handle)
Expand Down
2 changes: 2 additions & 0 deletions Bio/SeqIO/PirIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def __next__(self):
class PirWriter(SequenceWriter):
"""Class to write PIR format files."""

modes = "t"

def __init__(self, handle, wrap=60, record2title=None, code=None):
"""Create a PIR writer.
Expand Down
8 changes: 8 additions & 0 deletions Bio/SeqIO/QualityIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,8 @@ class FastqPhredWriter(SequenceWriter):
>>> os.remove("Quality/temp.fastq")
"""

modes = "t"

def write_record(self, record: SeqRecord) -> None:
"""Write a single FASTQ record to the file."""
self._record_written = True
Expand Down Expand Up @@ -1667,6 +1669,8 @@ class QualPhredWriter(SequenceWriter):
>>> os.remove("Quality/temp.qual")
"""

modes = "t"

def __init__(
self,
handle: _TextIOSource,
Expand Down Expand Up @@ -1844,6 +1848,8 @@ class FastqSolexaWriter(SequenceWriter):
>>> os.remove("Quality/temp.fastq")
"""

modes = "t"

def write_record(self, record: SeqRecord) -> None:
"""Write a single FASTQ record to the file."""
self._record_written = True
Expand Down Expand Up @@ -1926,6 +1932,8 @@ class FastqIlluminaWriter(SequenceWriter):
warning is issued.
"""

modes = "t"

def write_record(self, record: SeqRecord) -> None:
"""Write a single FASTQ record to the file."""
self._record_written = True
Expand Down
4 changes: 3 additions & 1 deletion Bio/SeqIO/SeqXmlIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ class SeqXmlWriter(SequenceWriter):
"protein".
"""

modes = "b"

def __init__(
self, target, source=None, source_version=None, species=None, ncbiTaxId=None
):
Expand All @@ -536,7 +538,7 @@ def __init__(
- ncbiTaxId - The NCBI taxonomy identifier of the species of origin.
"""
super().__init__(target, "wb")
super().__init__(target)
handle = self.handle
self.xml_generator = XMLGenerator(handle, "utf-8")
self.xml_generator.startDocument()
Expand Down
4 changes: 3 additions & 1 deletion Bio/SeqIO/SffIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ def __init__(self, source):
class SffWriter(SequenceWriter):
"""SFF file writer."""

modes = "b"

def __init__(self, target, index=True, xml=None):
"""Initialize an SFF writer object.
Expand All @@ -1132,7 +1134,7 @@ def __init__(self, target, index=True, xml=None):
reading this data).
"""
super().__init__(target, "wb")
super().__init__(target)
self._xml = xml
if index:
self._index = []
Expand Down
2 changes: 2 additions & 0 deletions Bio/SeqIO/TabIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class TabWriter(SequenceWriter):
with ``format="tab"``.
"""

modes = "t"

def write_record(self, record):
"""Write a single tab line to the file."""
assert self._header_written
Expand Down
4 changes: 3 additions & 1 deletion Bio/SeqIO/XdnaIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,16 @@ def __next__(self):
class XdnaWriter(SequenceWriter):
"""Write files in the Xdna format."""

modes = "b"

def __init__(self, target):
"""Initialize an Xdna writer object.
Arguments:
- target - Output stream opened in binary mode, or a path to a file.
"""
super().__init__(target, mode="wb")
super().__init__(target)

def write_file(self, records):
"""Write the specified record to a Xdna file.
Expand Down
13 changes: 8 additions & 5 deletions Tests/test_GenBank.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import os
import sys
import tempfile
import unittest
import warnings
from datetime import datetime
Expand Down Expand Up @@ -7811,15 +7812,17 @@ def test_qualifier_escaping_read(self):
def test_qualifier_escaping_write(self):
"""Check qualifier escaping is preserved when writing."""
# Write some properly escaped qualifiers and test
genbank_out = "GenBank/qualifier_escaping_write.gb"
record = SeqIO.read(genbank_out, "gb")
genbank_in = "GenBank/qualifier_escaping_write.gb"
record = SeqIO.read(genbank_in, "gb")
f1 = record.features[0]
f2 = record.features[1]
f1.qualifiers["note"][0] = '"Should" now "be" escaped in "file"'
f2.qualifiers["note"][0] = '"Should also be escaped in file"'
SeqIO.write(record, genbank_out, "gb")
# Read newly escaped qualifiers and test
record = SeqIO.read(genbank_out, "gb")
with tempfile.NamedTemporaryFile("w+") as genbank_out:
SeqIO.write(record, genbank_out, "gb")
genbank_out.seek(0)
# Read newly escaped qualifiers and test
record = SeqIO.read(genbank_out, "gb")
f1 = record.features[0]
f2 = record.features[1]
self.assertEqual(
Expand Down

0 comments on commit 0b57f76

Please sign in to comment.