import tempfile
try:
from urllib.request import urlopen
from urllib.error import HTTPError, URLError
except:
from urllib2 import urlopen, HTTPError, URLError
import os.path
from biokit.rtools import bool2R, RSession
from distutils.version import StrictVersion
from easydev import Logging, TempFile
__all__ = ["get_R_version", "biocLite", "RPackage",
'install_package', 'RPackageManager']
[docs]def install_package(query, dependencies=False, verbose=True,
repos = "http://cran.univ-paris1.fr/"):
"""Install a R package
:param str query: It can be a valid URL to a R package (tar ball), a CRAN
package, a path to a R package (tar ball), or simply the directory
containing a R package source.
:param bool dependencies:
:param repos: if provided, install_packages automatically select the
provided repositories otherwise a popup window will ask you to select a repo
::
>>> rtools.install_package("path_to_a_valid_Rpackage.tar.gz")
>>> rtools.install_package("http://URL_to_a_valid_Rpackage.tar.gz")
>>> rtools.install_package("hash") # a CRAN package
>>> rtools.install_package("path to a valid R package directory")
.. seealso:: :class:`biokit.rtools.RPackageManager`
"""
session = RSession(verbose=verbose)
# Is it a local file?
if os.path.exists(query):
repos = 'NULL'
else:
repos = '"{0}"'.format(repos) # we want the " to be part of the string later on
try:
# PART for fetching a file on the web, download and install locally
if verbose:
print("Trying from the web ?")
data = urlopen(query)
fh = TempFile(suffix=".tar.gz")
with open(fh.name, 'w') as fh:
for x in data.readlines():
fh.write(x)
code = """install.packages("%s", dependencies=%s """ % \
(fh.name, bool2R(dependencies))
code += """ , repos=NULL) """
session.run(code)
except Exception as err:
if verbose:
print(err)
print("trying local or from repos")
print("RTOOLS warning: URL provided does not seem to exist %s. Trying from CRAN" % query)
code = """install.packages("%s", dependencies=%s """ % \
(query, bool2R(dependencies))
code += """ , repos=%s) """ % repos
session.run(code)
return
[docs]def get_R_version():
"""Return R version"""
r = RSession()
r.run("version")
return r.version
[docs]def biocLite(package=None, suppressUpdates=True, verbose=True):
"""Install a bioconductor package
This function does not work like the R function. Only a few options are
implemented so far. However, you can use rcode function directly if needed.
:param str package: name of the bioconductor package to install. If None, no
package is installed but installed packages are updated. If not provided,
biocLite itself may be updated if needed.
:param bool suppressUpdates: updates the dependencies if needed (default is
False)
:return: True if update is required or the required package is installed and
could be imported. False otherwise.
::
>>> from biokit.viz.rtools import biocLite
>>> biocLite("CellNOptR")
"""
code = """source("http://bioconductor.org/biocLite.R")\n"""
# without a package, biocLite performs an update of the installed packages
if package is None:
code += """biocLite(suppressUpdates=%s) """ % (
bool2R(suppressUpdates))
else:
# if not found, no error is returned...
code += """biocLite("%s", suppressUpdates=%s) """ % (
package,
bool2R(suppressUpdates))
r = RSession(verbose=verbose)
r.run(code)
[docs]class RPackage(object):
"""
::
>>> from biokit.rtools import package
>>> p = package.RPackage('CellNOptR')
>>> p.isinstalled
True
>>> p.version
'1.11.3'
.. todo:: do we need the version_required attribute/parameter anywhere ?
.. note:: R version includes dashes, which are not recognised
by distutils so they should be replaced.
"""
def __init__(self, name, version_required=None, install=False, verbose=False):
self.name = name
self.version_required = version_required # the required version
if self.version_required and isinstance(self.version_required, str) is False:
raise TypeError("version_required argument must be a string e.g., 2.0, 2.0.1")
if version_required and "." not in self.version_required:
# trying to infer correct version
self.version_required += '.0'
self.session = RSession()
code = """rvar_version = as.character(packageVersion("%s"))"""
self.session.run(code % (self.name))
try:
self._version = self.session.rvar_version
except:
self._version = None
if self.version is None and install is True:
self.install(name)
if self.version and self.version_required:
if self._get_val_version(self.version) >= self._get_val_version(self.version_required):
pass
else:
print("Found %s (version %s) but version %s required." % (
self.name, self.version, self.version_required))
def _get_val_version(self, version):
return StrictVersion(version.replace("-", "a"))
[docs] def install(self):
install_package(self.name)
def _get_isinstalled(self):
if self.version:
return True
else:
return False
isinstalled = property(_get_isinstalled)
def _get_version(self):
return self._version
version = property(_get_version)
def __str__(self):
if self.version:
txt = self.name + ": " + self.version
else:
txt = self.name
return txt
[docs]class RPackageManager(object):
"""Implements a R package manager from Python
So far you can install a package (from source, or CRAN, or biocLite)
::
pm = PackageManager()
[(x, pm.installed[x][2]) for x in pm.installed.keys()]
You can access to all information within a dataframe called **packages** where
indices are the name packages. Some aliases are provided as attributes (e.g., available,
installed)
"""
cran_repos = "http://cran.univ-lyon1.fr/"
def __init__(self, verbose=True):
self.session = RSession()
self.logging = Logging("INFO")
self.logging.info('Fetching package information')
self.update()
def _update(self):
# local import ?
import numpy
import pandas
# figure out the installed packages first
code = """rvar_packages = as.data.frame(installed.packages())"""
self.session.run(code)
s = self.session.rvar_packages
# FIXME. these 4 lines are needed as a hack related to pyper.
try:
s = s.replace("\n", "")
df = eval(s)
except:
df = s
df.set_index('Package', inplace=True)
self._packages = df.copy()
# Now, fetch was is possible to install from the default cran repo
code = """rvar_status=packageStatus(repos="%s/src/contrib")"""
code = code % self.cran_repos
self.session.run(code)
s = self.session.rvar_status
# FIXME.
try:
s = s.replace("\n", "")
res = eval(s)
except:
res = s
res['inst'].set_index('Package', inplace=True)
res['avail'].set_index('Package', inplace=True)
self._status = res
[docs] def update(self):
"""If you install/remove packages yourself elsewhere, you may need to
call this function to update the package manager"""
try:
#self.session.reconnect()
self._update()
except:
self.logging.warning("Could not update the packages. Call update() again")
def _compat_version(self, version):
return version.replace("-", "a")
def _get_installed(self):
# we do not buffer because packages may be removed manually or from R of
# using remove_packages method, ....
#self._package_status()
return self._status['inst']
installed = property(_get_installed, "returns list of packages installed as a dataframe")
def _get_available(self):
# we do not buffer because packages may be removed manually or from R of
# using remove_packages method, ....
#self._package_status()
return self._status['avail']
available = property(_get_available, "returns list of packages available as a dataframe")
def _get_packages(self):
# do not buffer since it may change in many places
return self._packages
packages = property(_get_packages)
[docs] def get_package_latest_version(self, package):
"""Get latest version available of a package"""
return self.available['Version'].ix[package]
[docs] def get_package_version(self, package):
"""Get version of an install package"""
if package not in self.installed.index:
self.logging.error("package {0} not installed".format(package))
return self.installed['Version'].ix[package]
[docs] def biocLite(self, package=None, suppressUpdates=True, verbose=False):
"""Installs one or more biocLite packages
:param package: a package name (string) or list of package names (list of
strings) that will be installed from BioConductor. If package is set
to None, all packages already installed will be updated.
"""
if isinstance(package, str):
if package not in self.installed.index:
biocLite(package, suppressUpdates, verbose=verbose)
elif isinstance(package, list):
for pkg in package:
self.logging.info("Installing %s" % pkg)
if self.is_installed(pkg) is False:
biocLite(pkg, suppressUpdates, verbose=verbose)
else: # trying other cases (e.g., None updates biocLite itself).
biocLite(package, suppressUpdates, verbose=verbose)
self.update()
def _isLocal(self, pkg):
if os.path.exists(pkg):
return True
else:
return False
[docs] def remove(self, package):
"""Remove a package (or list) from local repository"""
rcode ="""remove.packages("%s")"""
if isinstance(package, str):
package = [package]
for pkg in package:
if pkg in self.installed.index:
self.session(rcode % pkg)
else:
self.logging.warning("Package not found. Nothing to remove")
self.update()
[docs] def require(self, pkg, version):
"Check if a package with given version is available"
if pkg not in self.installed.index:
self.logging.info("Package %s not installed" % pkg)
return False
currentVersion = self.packageVersion(pkg)
if self._get_version(currentVersion) >= self._get_version(version):
return True
else:
return False
def _install_package(self, packageName, dependencies=True):
"""Installs one or more CRAN packages
.. todo:: check if it is already available to prevent renstallation ?
"""
repos = self.cran_repos
# if this is a source file we want to reset the repo
if isinstance(packageName, str):
packageName = [packageName]
for pkg in packageName:
if self.is_installed(pkg) is False:
self.logging.info("Package not found. Installing %s..." % pkg)
install_package(pkg, dependencies=dependencies,
repos=repos)
else:
self.logging.info("Package %s found. " % pkg)
install_package(pkg, dependencies=dependencies,
repos=repos)
self.update()
[docs] def install(self, pkg, require=None, update=True, reinstall=False):
"""install a package automatically scanning CRAN and biocLite repos
if require is not set and update is True, when a newest version of a package
is available, it is installed
"""
from easydev import to_list
pkgs = to_list(pkg)
for pkg in pkgs:
self._install(pkg, require=require, update=update, reinstall=reinstall)
def _install(self, pkg, require=None, update=update, reinstall=False):
# LOCAL file
if self._isLocal(pkg):
# if a local file, we do not want to jump to biocLite or CRAN. Let
# us install it directly. We cannot check version yet so we will
# overwrite what is already installed
self.logging.warning("Installing from source")
self._install_package(pkg)
return
# From CRAN
if self.is_installed(pkg):
currentVersion = self.get_package_version(pkg)
# if not provided, require should be the latest version
if require is None and update is True:
try:
require = self.get_package_latest_version(pkg)
except:
# a non-cran package (bioclite maybe)
pass
if require is None:
self.logging.info("%s already installed with version %s" % \
(pkg, currentVersion))
return
# if require is not none, is it the required version ?
if self._get_version(currentVersion) >= self._get_version(require) and reinstall is False:
self.logging.info("%s already installed with required version %s" \
% (pkg, currentVersion))
# if so, nothing to do
else:
# Try updating
self.logging.info("Updating")
self._install_package(pkg)
if require is None:
return
currentVersion = self.get_package_version(pkg)
if self._get_version(currentVersion) < self._get_version(require):
self.logging.warning("%s installed but current version (%s) does not fulfill your requirement" % \
(pkg, currentVersion))
elif pkg in self.available.index:
self._install_package(pkg)
else:
# maybe a biocLite package ?
# require is ignored. The latest will be installed
self.logging.info("Trying to find the package on bioconductor")
self.biocLite(pkg)
if require is None:
return
currentVersion = self.get_package_version(pkg)
if self._get_version(currentVersion) >= self._get_version(require):
self.logging.warning("%s installed but version is %s too small (even after update)" % \
(pkg, currentVersion, require))
def _get_version(self, version):
# some pacakge do not use the correct version convention
try:
return StrictVersion(version)
except:
try:
return StrictVersion(version.replace("-", "a"))
except:
# snowfall package example was 1.86-6.1
# This becomes 1.86a61 which is not great but not workaround
# for now
left, right = version.split("-")
version = left + "a" + right.replace('.', '')
return StrictVersion(version)
[docs] def is_installed(self, pkg_name):
if pkg_name in self.installed.index:
return True
else:
return False