Source code for CanGraph.miscelaneous

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# SPDX-FileCopyrightText: 2022 Pablo Marcos <software@loreak.org>
#
# SPDX-License-Identifier: MIT

"""
A python module that provides a collection of functions to be used across the different
scripts present in the CanGraph package, with various, useful functionalities
"""

# Import external modules necessary for the script
import neo4j                         # The Neo4J python driver
import urllib.request as request     # Extensible library for opening URLs
from zipfile import ZipFile          # Work with ZIP files
import tarfile                       # Work with tar.gz files
import os                            # Integration with the system
import xml.etree.ElementTree as ET   # To parse and split XML files
import re                            # To split XML files with a regex pattern
import time                          # Manage the time, and wait times, in python
import pandas as pd                  # Analysis of tabular data
import subprocess                    # Manage python sub-processes
import logging                       # Make ``verbose`` messages easier to show
import psutil                        # Kill the burden of the neo4j process
import argparse                      # Arguments pàrser for Python
from alive_progress import alive_bar # A cute progress bar

# ********* Manage the Neo4J Database Connection and Transactions ********* #

[docs]def restart_neo4j(neo4j_home = "neo4j"): """ A simple function that (re)starts a neo4j server and returns its bolt adress Args: neo4j_home (str): the installation directory for the ``neo4j`` program; by default, ``neo4j`` .. NOTE:: Re-starting is better than starting, as it tries to kills old sessions (a task at which it fails miserably, thus the need for :obj:`~CanGraph.miscelaneous.kill_neo4j`), and, most importantly, because it returns the currently used bolt port """ result = subprocess.run([f"{os.path.abspath(neo4j_home)}/bin/neo4j", "restart"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) neo4j_message = print(result.stdout.decode("utf-8"), type(result.stdout.decode("utf-8")))
[docs]def get_import_path(driver): """ A function that runs an autocommit transaction to get Neo4J's Import Path .. NOTE:: By doing the Neo4JImportPath search this way (in two functions), we are able to run the query as a :obj: execute_read, which, unlike autocommit transactions, allows the query to be better controlled, and repeated in case it fails. Args: driver (neo4j.Driver): Neo4J's Bolt Driver currently in use Returns: str: Neo4J's Import Path, i.e., where Neo4J will pick up files to be imported using the ```file:///``` schema """ result = manage_transaction( """ Call dbms.listConfig() YIELD name, value WHERE name='dbms.directories.import' RETURN value """, driver) try: return result[0]["value"] except: raise RuntimeError("Couldn't connect to Neo4j. Please, check the auths")
[docs]def connect_to_neo4j(port = "bolt://localhost:7687", username = "neo4j", password="neo4j"): """ A function that establishes a connection to the neo4j server and returns a :obj:`~neo4j.Driver` into which transactions can be passed Args: port (str): The URL where the database is available to be queried. It must be of ``bolt://`` format username (str): the username for your neo4j database; by default, ``neo4j`` password (str): the password for your database; by default, ``neo4j`` Returns: neo4j.Driver: An instance of Neo4J's Bolt Driver that can be used .. NOTE:: Since this is a really short function, this doesn't really simplify the code that much, but it makes it much more re-usable and understandable """ instance = port; user = username; passwd = password try: driver = neo4j.GraphDatabase.driver(instance, auth=(user, passwd)) return driver except Exception as E: exit(f"Could not connect to Neo4J due to error: {E}")
[docs]def kill_neo4j(neo4j_home = "neo4j"): """ A simple function that kills any process that was started using a cmd argument including "neo4j" Args: neo4j_home (str): the installation directory for the ``neo4j`` program; by default, ``neo4j`` .. WARNING:: This function may unintendedly kill any command run from the ``neo4j`` folder. This is unfortunate, but the creation of this function was essential given that ``neo4j stop`` does not work properly; instead of dying, the process lingers on, interfering with :obj:`~CanGraph.setup.find_neo4j_installation_status` and hindering the main program """ neo4j_home = os.path.abspath(neo4j_home) neo4j_dead = False if os.path.exists(f"{neo4j_home}/run/neo4j.pid"): with open(f"{neo4j_home}/run/neo4j.pid") as f: neo4j_pid = f.readline().rstrip() for proc in psutil.process_iter(): if proc.pid == neo4j_pid: proc.terminate() neo4j_dead = True os.remove(f"{neo4j_home}/run/neo4j.pid") if os.path.exists(neo4j_home): subprocess.run([f"{os.path.abspath(neo4j_home)}/bin/neo4j", "stop"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) neo4j_dead = True for proc in psutil.process_iter(): if os.path.abspath(neo4j_home) in " ".join(proc.cmdline()): proc.kill() neo4j_dead = True if neo4j_dead == True: sleep_with_counter(5, message = "Killing existing Neo4j sessions...")
[docs]def manage_transaction(tx, driver, num_retries = 10, neo4j_home = "neo4j", **kwargs): """ A function that repeats transactions whenever an error is found. This may make an incorrect script unnecessarily repeat; however, since the error is printed, one can discriminate those out, and the function remains helpful to prevent SPARQL Read Time-Outs. It will also re-start neo4j in case it randomly dies while executing a query. Args: tx (str): The transaction that we desire to run, specified as a CYPHER query driver (neo4j.Driver): Neo4J's Bolt Driver currently in use num_retries (int): The number of times that we wish the transaction to be retried neo4j_home (str): the installation directory for the ``neo4j`` program; by default, ``neo4j`` **kwargs: Any number of arbitrary keyword arguments Raises: Exception: An exception telling the user that the maximum number of retries has been exceded, if such a thing happens Returns: list: The response from the Neo4J Database .. NOTE:: This function does not accept args, but only kwargs (named keyword arguments). Thus, if you wish to add a parameter (say, ``number``, you should add it as: ``number=33`` """ # For as many times as has been specified for attempt in range(num_retries): try: # We try to open a session and obtain a graph response session = driver.session() graph_response = session.run(tx, **kwargs) graph_response_list = [record for record in list(graph_response) ] if attempt > 0: print(f"Error solved on attempt #{attempt}") if any(graph_response_list): if hasattr(graph_response_list[0], "data"): return [record.data() for record in graph_response_list] else: return [] else: return [] # If we get to the graph response, we break and close break; session.close() # Else if Neo4J decides to DIE except (OSError, neo4j.exceptions.ServiceUnavailable) as error: subprocess.run([f"{os.path.abspath(neo4j_home)}/bin/neo4j", "start"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) print(f"Neo4J has died. Restarting Neo4J & Retrying... ({attempt + 1}/{num_retries})") time.sleep(20) # Else if there is another kind of error except Exception as error: graph_response = [] error_code = error.code if "code" in vars(error).keys() else "Unknown" if attempt < (num_retries - 1): print(f"An error with error code: {error_code} was found.") print(f"Retrying... ({attempt + 1}/{num_retries})") # If the error is a ProcedureCallFailed error and we tried num_retries, skip elif error_code == "Neo.ClientError.Procedure.ProcedureCallFailed": print("Falied to invoke a procedure, most likely due to a read time out") print("Skipping this function..."); return [] # If its any other error, exit, as it seems it could he a really bad error else: raise Exception(f"{num_retries} consecutive attempts were made on a function. Aborting...")
# ********* Interact with the Neo4J Database ********* #
[docs]def call_db_schema_visualization(): """ Shows the DB Schema. This function is intended to be run only in Neo4J's console, since it produces no output when called from the driver. Args: tx (neo4j.Session): The session under which the driver is running .. TODO:: Make it download the image """ return (""" CALL db.schema.visualization() """)
[docs]def clean_database(): """ A CYPHER query that gets all the nodes in a Neo4J database and removes them, in transactions of 100 rows to alleviate memory load Returns: str: A text chain that represents the CYPHER query with the desired output. This can be run using: :obj:`neo4j.Session.run` .. NOTE:: This is an **autocommit transaction**. This means that, in order to not keep data in memory (and make running it with a huge amount of data) more efficient, you will need to add ```:auto ``` when calling it from the Neo4J browser, or call it as using :obj:`neo4j.Session.run` from the driver. """ return """ MATCH (n) CALL { WITH n DETACH DELETE n } IN TRANSACTIONS OF 100 ROWS """
[docs]def create_n10s_graphconfig(): """ A CYPHER query that creates a *neosemantics* (n10s) constraint to hold all the RDF we will import. Args: tx (neo4j.Session): The session under which the driver is running Returns: neo4j.Result: A Neo4J connexion to the database that modifies it according to the CYPHER statement contained in the function. .. seealso:: More information on this approach can be found in `Neosemantics' 101 Guide <https://neo4j.com/labs/neosemantics/>`_ and in `Neo4J's guide on how to import data from Wikidata <https://neo4j.com/labs/neosemantics/how-to-guide/>`_ , where this approach was taken from .. deprecated:: 0.9 Since we are importing based on apoc.load.jsonParams, this is not needed anymore """ return (""" CALL n10s.graphconfig.init({ handleVocabUris: 'MAP', handleMultival: 'ARRAY', keepLangTag: true, keepCustomDataTypes: true, applyNeo4jNaming: true }) """)
[docs]def remove_n10s_graphconfig(): """ Removes the "_GraphConfig" node, which is necessary for querying SPARQL endpoints but not at all useful in our final export Args: tx (neo4j.Session): The session under which the driver is running Returns: neo4j.Result: A Neo4J connexion to the database that modifies it according to the CYPHER statement contained in the function. .. deprecated:: 0.9 Since we are importing based on apoc.load.jsonParams, this is not needed anymore """ return (""" MATCH (n:`_GraphConfig`) DETACH DELETE n """)
[docs]def remove_ExternalEquivalent(): """ Removes all nodes of type: ExternalEquivalent from he DataBase; since this do not add new info, one might consider them not useful. Args: tx (neo4j.Session): The session under which the driver is running Returns: neo4j.Result: A Neo4J connexion to the database that modifies it according to the CYPHER statement contained in the function. """ return (""" MATCH (e:ExternalEquivalent) DETACH DELETE e """)
[docs]def remove_duplicate_relationships(): """ Removes duplicated relationships between ANY existing pair of nodes. Args: tx (neo4j.Session): The session under which the driver is running Returns: neo4j.Result: A Neo4J connexion to the database that modifies it according to the CYPHER statement contained in the function. .. NOTE:: Only deletes DIRECTED relationships between THE SAME nodes, combining their properties .. seealso:: This way of working has been taken from `StackOverflow #18724939 <https://stackoverflow.com/questions/18724939/neo4j-cypher-merge-duplicate-relationships>`_ """ return (""" MATCH (s)-[r]->(e) WITH s, e, type(r) as typ, collect(r) as rels CALL apoc.refactor.mergeRelationships(rels, {properties:"combine"}) YIELD rel RETURN rel """)
[docs]def merge_duplicate_nodes(node_types, node_property, optional_condition="", more_props=""): """ Removes any two nodes of any given ```node_type``` with the same ```condition```. Args: node_types (str): The labels of the nodes that will be selected for merging; i.e. ``n:Fruit OR n:Vegetable`` node_property (str): The node properties used for collecting, if not using all properties. optional_condition (str): An optional Neo4J Statement, starting with "AND", to be added after the ``WHERE`` clause. Returns: neo4j.Result: A Neo4J connexion to the database that modifies it according to the CYPHER statement contained in the function. .. WARNING:: When using, take good care on how the keys names are written: sometimes, if a key is not present, all nodes will be merged! """ if not more_props: condition_any = (f"AND ( ANY(x IN n.{node_property} " f"WHERE x IN m.{node_property}) )") more_props = f"HEAD(COLLECT(DISTINCT [n, m])) AS alt_thing" else: condition_any = "" if node_types == "": node_types="1=1" # Allow all nodes return (f""" MATCH (n), (m) WHERE ({node_types}) AND ({node_types.replace("n:", "m:")}) AND (n.{node_property} IS NOT null) AND (m.{node_property} IS NOT null) AND ID(n) < ID(m) {condition_any} {optional_condition} WITH {more_props}, HEAD(COLLECT(DISTINCT [n, m])) AS ns WHERE size(ns) > 1 CALL apoc.refactor.mergeNodes(ns, {{properties:"combine", mergeRels:true}}) YIELD node RETURN node """)
[docs]def purge_database(driver, method = ["merge", "delete"]): """ A series of commands that purge a database, removing unnecessary, duplicated or empty nodes and merging those without required properties. This has been converted into a common function to standarize the ways the nodes are merged. Args: driver (neo4j.Driver): Neo4J's Bolt Driver currently in use method (list): The part of the function that we want to execute; if ["delete"], only call queries that delete nodes; if ["merge"], only call those that merge; if both, do both Returns: This function modifies the Neo4J Database as desired, but does not produce any particular return. .. WARNING:: When modifying, take good care on how the keys names are written: with :obj:`~CanGraph.miscelaneous.merge_duplicate_nodes`, sometimes, if a key is not present, all nodes will be merged! """ method = list(method) session = driver.session() if "merge" in method: # Fist, we purge Publications by PubMed_ID, using the abstract to merge those that have no PubMed_ID manage_transaction(merge_duplicate_nodes("n:Publication", "Pubmed_ID"), driver) manage_transaction(merge_duplicate_nodes("n:Publication", "Abstract", "AND (n.Pubmed_ID IS null)"), driver) # Now, we work on Proteins/Metabolites/Drugs: # We merge Proteins by UniProt_ID, and, when there is none, by Name: manage_transaction(merge_duplicate_nodes("n:Protein", "UniProt_ID"), driver) manage_transaction(merge_duplicate_nodes("n:Protein", "Name", "AND (n.UniProt_ID IS null)"), driver) # We merge by HMDB_ID, ChEBI_ID, Name, InChI and InChIKey (normally, they should be unique): manage_transaction(merge_duplicate_nodes( "n:Protein OR n:Metabolite OR n:Drug OR n:OriginalMetabolite", "HMDB_ID"), driver) manage_transaction(merge_duplicate_nodes( "n:Protein OR n:Metabolite OR n:Drug OR n:OriginalMetabolite", "ChEBI_ID"), driver) manage_transaction(merge_duplicate_nodes( "n:Protein OR n:Metabolite OR n:Drug OR n:OriginalMetabolite", "Name"), driver) manage_transaction(merge_duplicate_nodes( "n:Protein OR n:Metabolite OR n:Drug OR n:OriginalMetabolite", "InChI"), driver) manage_transaction(merge_duplicate_nodes( "n:Protein OR n:Metabolite OR n:Drug OR n:OriginalMetabolite", "InChIKey"), driver) # WikiData_IDs showuld also be unique: manage_transaction(merge_duplicate_nodes("", "WikiData_ID"), driver) # We also remove all non-unique Subjects. We do this by passing on all three parameters # this nodes may have to apoc.mergeNodes # .. NOTE:: This concerns only those nodes that DO NOT COME from Exposome_Explorer manage_transaction(merge_duplicate_nodes("n:Subject", "Age_Mean", more_props="n.Gender as gender, n.Information as inf", optional_condition="AND (n.Exposome_Explorer_ID IS null)"), driver) # We can do the same for the different Dosages; this has to be done manually # because otherwise the database somehow crases (no idea why) manage_transaction("""MATCH (n:Dosage) WITH n.Form as frm, n.Stength as str, n.Route as rt, COLLECT(n) AS ns WHERE size(ns) > 1 CALL apoc.refactor.mergeNodes(ns, {properties:"combine", mergeRels:True}) YIELD node RETURN node""", driver) # For products, we merge all those with the same EMA_MA_Number or FDA_Application_Number to # try to minimize duplicates, although this is not the best approach manage_transaction(merge_duplicate_nodes("n:Product", "EMA_MA_Number"), driver) manage_transaction(merge_duplicate_nodes("n:Product", "FDA_Application_Number"), driver) # For CelularLocations and BioSpecimens, we merge those with the same Name: manage_transaction(merge_duplicate_nodes("n:CelularLocation", "Name"), driver) manage_transaction(merge_duplicate_nodes("n:BioSpecimen", "Name"), driver) if "delete" in method: # Finally, we delete all empty nodes. This shouldn't be created on the first place, # but, in case anyone escapes, this makes the DB cleaner. # .. NOTE:: In the case of Taxonomies, these "empty nodes" are # actually created on purpose. Here, they are removed. manage_transaction("MATCH (n) WHERE size(keys(properties(n))) < 1 CALL " "{ WITH n DETACH DELETE n } IN TRANSACTIONS OF 1000 ROWS", driver) # For Measurements and Sequences, 2 properties are the minimum, since they are always booleans manage_transaction("MATCH (m:Measurement) " "WHERE size(keys(properties(m))) < 2 DETACH DELETE m", driver) manage_transaction("MATCH (s:Sequence) " "WHERE size(keys(properties(s))) < 2 DETACH DELETE s", driver) # And those that do not match our Schema - Be careful with the \" character manage_transaction(""" MATCH (n:Metabolite) WHERE (n.ChEBI_ID IS NULL OR n.ChEBI_ID = "" OR n.CAS_Number IS NULL OR n.CAS_Number = "") DETACH DELETE n""", driver) # We will also remove all disconnected nodes (they give no useful information) manage_transaction("MATCH (n) WHERE NOT (n)--() DETACH DELETE n", driver) #At last, we may remove any duplicate relationships, which, since we have merged nodes, will surely be there: manage_transaction(remove_duplicate_relationships(), driver) session.close()
# ********* Work with Files ********* #
[docs]def check_file(filepath): """ Checks for the presence of a file or folder. If it exists, it returns the filepath; if it doesn't, it raises an :obj:`argparse.ArgumentTypeError`, which tells argparse how to process file exclussion .. NOTE:: Perhaps its not ideal, but I will be using this also to check for file existence throughout the CanGraph project, although the error type might not be correct Args: filepath (str): The path of the file or folder whose existence is being checked Returns: str: The original filepath, which now is sure to exist Raises: argparse.ArgumentTypeError: If the file does not exist """ # First, turn the path into abspath for consistency filepath = os.path.abspath(filepath) # Check if the filepath does exist if not os.path.exists(filepath): raise argparse.ArgumentTypeError(f"Missing file: {filepath}. Please add the file and run the script back") return filepath # Return the same string for argparse to work
[docs]def check_neo4j_protocol(string): """ Checks that a given ``string`` starts with any of the protocols accepted by the :obj:`neo4j.Driver` Args: string (str): A string, which will normally represent the neo4j adress Returns: str: The same string that was provided as an argument (required by :obj:`argparse.ArgumentParser`) Raises: argparse.ArgumentTypeError: If the string is not of the correct protocol """ # First, declare a list of accepted formats accepted_formats = ['bolt', 'bolt+ssc', 'bolt+s', 'neo4j', 'neo4j+ssc', 'neo4j+s'] # This must be converted into a tuple for :obj:`str.startswith` if not string.startswith(tuple(accepted_formats)): msg = f"Invalid format. Your string must start with one of {accepted_formats}" raise argparse.ArgumentTypeError(msg) return string # Return the same string for argparse to work
[docs]def export_graphml(exportname): """ Exports a Neo4J graph to GraphML format. The graph will be exported to Neo4JImportPath Args: exportname (str): The name for the exported file, which will be saved under ./Neo4JImportPath/ Returns: neo4j.Result: A Neo4J connexion to the database that exports the file, using batch optimizations and smaller batch sizes to try to keep the impact on memory use low .. NOTE:: for this to work, you HAVE TO have APOC availaible on your Neo4J installation """ return (f""" CALL apoc.export.graphml.all("{exportname}", {{batchSize: 5, useTypes:true, storeNodeIds:false, useOptimizations: {{type: "UNWIND_BATCH", unwindBatchSize: 5}} }}) """)
[docs]def import_graphml(importname): """ Imports a GraphML file into a Neo4J graph. The file has to be located in Neo4JImportPath Args: importname (str): The name for the file to be imported, which must be under ./Neo4JImportPath/ Returns: neo4j.Result: A Neo4J connexion to the database that imports the file, using batch optimizations and smaller batch sizes to try to keep the impact on memory use low .. NOTE:: for this to work, you HAVE TO have APOC availaible on your Neo4J installation """ return (f""" CALL apoc.import.graphml("{importname}", {{batchSize: 5, useTypes:true, storeNodeIds:false, readLabels:True, useOptimizations: {{type: "UNWIND_BATCH", unwindBatchSize: 5}} }}) """)
[docs]def download(url, folder): """ Downloads a file from the internet into a given folder Args: url (str): The Uniform Resource Locator for the Zipfile to be downloaded and unzipped folder (str): The folder under which the file will be stored. Returns: str: The path where the file we just downloaded has been stored """ folder = os.path.abspath(folder) # Set the folder to be an absolute path # If the folder does not exist, we create it if not os.path.exists(f"{folder}"): os.makedirs(f"{folder}") # Create some naming variables and request the file from the URL filename = url.split('/')[-1].split('.')[0] file_ext = url.split('.')[-1] file_path = f"{folder}/{filename}.{file_ext}" request.urlretrieve(url, file_path) return file_path
[docs]def unzip(file_path, folder): """ Unizps a file present at a given ``file_path`` into a given ``folder`` Args: url (str): The Uniform Resource Locator for the Zipfile to be unzipped folder (str): The folder under which the file will be stored. Returns: str: The path where the file we just unzipped has been stored """ folder = os.path.abspath(folder) # Set the folder to be an absolute path filename = file_path.split('/')[-1].split('.')[0] # Create some naming variables # If the folder does not exist, we create it if not os.path.exists(f"{folder}"): os.makedirs(f"{folder}") # And we unzip the file zf = ZipFile(file_path) zf.extractall(path = f"{folder}/") zf.close() return filename
[docs]def untargz(file_path, folder): """ Untargzs a file present at a given ``file_path`` into a given ``folder`` Args: url (str): The Uniform Resource Locator for the Tarfile to be untargz folder (str): The folder under which the file will be stored. Returns: str: The path where the file we just untargz has been stored """ folder = os.path.abspath(folder) # Set the folder to be an absolute path filename = file_path.split('/')[-1].split('.')[0] # Create some naming variables # If the folder does not exist, we create it if not os.path.exists(f"{folder}"): os.makedirs(f"{folder}") # And we untargz the file tf = tarfile.open(file_path) tf.extractall(path = f"{folder}/") tf.close() return filename
[docs]def download_and_unzip(url, folder): """ Downloads and unzips a given Zipfile from the internet; useful for databases which provide zip access. Args: url (str): The Uniform Resource Locator for the Zipfile to be downloaded and unzipped folder (str): The folder under which the file will be stored. Returns: This function downloads and unzips the file in the desired folder, but does not produce any particular return. .. seealso:: Code snippets for this function were taken from `Shyamal Vaderia's Github <https://svaderia.github.io/articles/downloading-and-unzipping-a-zipfile/>`_ and from `StackOverflow #32123394 <https://stackoverflow.com/questions/32123394/workflow-to-create-a-folder-if-it-doesnt-exist-already>`_ """ file_path = download(url, "/tmp") unzip(file_path, folder) os.remove(file_path)
[docs]def download_and_untargz(url, folder): """ Downloads and unzips a given ``tar.gz`` from the internet Args: url (str): The Uniform Resource Locator for the ``tar.gz`` to be downloaded and unzipped folder (str): The folder under which the file will be stored. Returns: This function downloads and unzips the file in the desired folder, but does not produce any particular return. """ file_path = download(url, "/tmp") untargz(file_path, folder) os.remove(file_path)
[docs]def split_xml(filepath, splittag, bigtag): """ Splits a given .xml file in n smaller XML files, one for each ``splittag`` section that is pressent in the original file, which should be of type ``bigtag``. For example, we might have an ``<hmdb>`` file which we want to slit based on the ``<metabolite>`` items therein contained. Ths is so that Neo4J does not crash when processing it. Args: filepath (str): The path to the file that needs to be xplitted splittag (str): The tag based on which the file will be split bigtag (str): The main tag of the file, which needs to be re-added. Returns: int: The number of files that have been produced from the original .. WARNING:: The original file will be removed """ # Set the filepath to be an absolute path filepath = os.path.abspath(filepath) # Set counters and text variables to null current_text = ""; current_line = 0; num_files = 0 # Open the file that we wish to split (but without reading! neat!) with open(f'{filepath}', "r") as f: for line in f: # For each line, scan and recount current_text += line; current_line += 1 # Whenever we find a closing tag, we know the info is over # and we generate a new file if line.startswith(f"</{splittag}>"): newfile = filepath.split(".")[0] + "_" + str(num_files) + ".xml" with open(newfile, "w+") as f: # Skip already-present headers for the first file if num_files > 1: f.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n") f.write(f'<{bigtag}>\n') f.write(current_text) # This is the same since it will just stop processing at the last tag f.write(f'</{bigtag}>\n') f.close() num_files += 1 current_text = "" # Remove the original file os.remove(f"{filepath}") return num_files - 1
[docs]def split_csv(filename, folder, sep=",", sep_out=",", startFrom=0, withStepsOf=1): """ Splits a given .csv/tsv file in n smaller csv files, one for each row on the original file, so that it does not crash when processing it. It also allows to start reading from ```startFrom``` lines Args: filepath (str): The path to the file that needs to be xplitted splittag (str): The tag based on which the file will be split bigtag (str): The main tag of the file, which needs to be re-added. Returns: int: The number of files that have been produced from the original .. WARNING:: The original file will be removed """ # Set the filepath to be an absolute path filepath = os.path.abspath(f"{folder}/{filename}") # Read the file using pandas (and hope it does not crash" bigfile = pd.read_csv(filepath, sep=sep, skiprows=startFrom) filenumber = 0 for index in range(0, len(bigfile), withStepsOf): new_df = bigfile.iloc[index:(index+withStepsOf)] new_df.to_csv(f"{folder}/{os.path.splitext(filename)[0]}_{filenumber}.csv", index = False, sep=sep_out) filenumber += 1 os.remove(filepath) # And remove the file after finishing return filenumber
[docs]def scan_folder(folder_path): """ Scans a folder and finds all the files present in it Args: folder_path (str): The folder that is to be scanned Returns: list: A list of all the files in the folder, listed by their absolute path """ all_files = [] for root,dirs,files in os.walk(folder_path): for filename in files: all_files.append( os.path.abspath(os.path.join(root, filename)) ) return all_files
[docs]def countlines(start, header=True, lines=0, begin_start=None): """ A function that counts all the lines of code present in a given directory; useful to show off in Sphinx Docs Args: start (str): The directory from which to start the line counting header (bool): whether to print a header, or not lines (int): Number of lines already counted; do not fill, only for recursion begin_start (str): The subdirectory currently in use; do not fill, only for recursion Returns: int: The number of lines present in ``start`` .. seealso:: This function was taken from `StackOverflow #38543709 <https://stackoverflow.com/questions/38543709/count-lines-of-code-in-directory-using-python/>`_ """ if header: print('{:>10} |{:>10} | {:<20}'.format('ADDED', 'TOTAL', 'FILE')) print('{:->11}|{:->11}|{:->20}'.format('', '', '')) for thing in os.listdir(start): thing = os.path.join(start, thing) if os.path.isfile(thing): if thing.endswith('.py'): with open(thing, 'r') as f: newlines = f.readlines() newlines = len(newlines) lines += newlines if begin_start is not None: reldir_of_thing = '.' + thing.replace(begin_start, '') else: reldir_of_thing = '.' + thing.replace(start, '') print('{:>10} |{:>10} | {:<20}'.format( newlines, lines, reldir_of_thing)) for thing in os.listdir(start): thing = os.path.join(start, thing) if os.path.isdir(thing): lines = countlines(thing, header=False, lines=lines, begin_start=start) return lines
# ********* Other useful functions ********* #
[docs]def sleep_with_counter(seconds, step = 20, message = "Waiting..."): """ A function that waits while showing a cute animation Args: seconds (int): The number of seconds that we would like the program to wait for step (int): The number times the counter wheel will turn in a second; by default, 20 message (str): An optional, text message to add to the waiting period """ with alive_bar(seconds*step, title=message) as bar: for i in range(seconds*step): time.sleep(1/step); bar()
[docs]def old_sleep_with_counter(seconds, step = 20, message = "Waiting..."): """ A function that waits while showing a cute animation, but **without using the ``alive_progress` module** .. NOTE:: This function interacts weirdly with slurn; I'd recommend to not use it on the HPC Args: seconds (int): The number of seconds that we would like the program to wait for step (int): The number times the counter wheel will turn in a second; by default, 20 message (str): An optional, text message to add to the waiting period """ animation = "|/-\\"; # The animation vector index = 0 # Initialize the index total_steps = seconds*step # Calculate the number of steps while index < total_steps: # Calculate the current step on base 100 current_step_100 = round(index*100/total_steps) print(f"{message} {animation[index % len(animation)]}\t{current_step_100}/100", end="\r") index += 1; time.sleep(1/step) print(f"{message}\t100/100 [COMPLETED]")