Source code for galaxywitness.datasets

import os
import csv
import requests
from tqdm import tqdm
import pyvo as vo


[docs]class Dataset: """ Class to handle prepared datasets :param name: name of dataset :type name: str """ def __init__(self, name: str): self.inner_names = {"Galaxies_400K": "Galaxies_400K.csv", "Galaxies_1KK": "Galaxies_1KK.csv", "rcsed": "rcsed.csv", "simbad": "simbad.csv", "ned": "ned.csv"} self.addresses = {"Galaxies_400K": "https://raw.githubusercontent.com/Arrrtemiron/galaxy_witness_datasets/main/result_glist_s.csv", "Galaxies_1KK": "https://raw.githubusercontent.com/Arrrtemiron/galaxy_witness_datasets/main/result_rcsed_vo.csv", "rcsed": "http://rcsed-vo.sai.msu.ru/tap/", "simbad": "http://simbad.u-strasbg.fr/simbad/sim-tap/", "ned": "http://ned.ipac.caltech.edu/tap/"} self.name = name self.url = '' if name in self.addresses: self.url = self.addresses[name] self.dataset_prepared = True else: print("Incorrect name of dataset") self.dataset_prepared = False
[docs] def download(self, chunk_size=1024) -> None: """ Download current prepared dataset """ assert self.dataset_prepared if not os.path.isdir('data'): os.mkdir('data') os.chdir("./data") resp = requests.get(self.url, stream=True, timeout=60) total = int(resp.headers.get('content-length', 0)) with open(self.inner_names[self.name], 'wb') as file, tqdm( desc=self.name, total=total, unit='iB', unit_scale=True, unit_divisor=chunk_size, ) as bar_: for data in resp.iter_content(chunk_size=chunk_size): bar_.update(file.write(data)) os.chdir("..")
[docs] def download_via_tap(self, size: int = 100000) -> None: """ Download current prepared dataset via TAP :param size: size of dataset :type size: int """ assert self.dataset_prepared tap_service = vo.dal.TAPService(self.url) tap_service.describe() oid, redshift, table, otype = '', '', '', '' if self.name == "rcsed": oid = "objid"; redshift = "z"; table = "specphot.rcsed" elif self.name == "simbad": oid = "main_id"; redshift = "rvz_redshift"; table = "basic"; otype = "AND otype = 'galaxy..'" elif self.name == "ned": oid = "prefname"; redshift = "z"; table = "objdir"; otype = "AND (pretype = 'G' OR pretype = 'QSO')" tap_results = tap_service.run_async(f"SELECT {oid}, ra, dec, {redshift} FROM {table} WHERE ra is not NULL AND \ dec is not NULL AND {redshift} > 0 {otype} ORDER BY {redshift}", maxrec = size) header = [oid, 'ra', 'dec', redshift] rows = [] for i in range(size): cur = [] for j in header: cur.append(tap_results[i][j]) rows.append(cur) if not os.path.isdir('data'): os.mkdir('data') os.chdir("./data") with open(self.inner_names[self.name], 'w', encoding='UTF8', newline='') as f: writer = csv.writer(f) # write the header writer.writerow(header) # write the data writer.writerows(rows) os.chdir("..")
[docs] def add_new_dataset(self, name: str, url: str) -> None: """ Add new dataset by name and by URL where it can be retrieved :param name: name of dataset :type name: str :param name: URL :type name: str """ self.inner_names[name] = name + '.csv' self.addresses[name] = url print("New dataset added successfully:", name, "from", url) print("You can change current dataset to this one by calling change_dataset_to method")
[docs] def change_dataset_to(self, name: str) -> None: """ Change current dataset to another by name :param name: name of dataset :type name: str """ self.name = name if name in self.addresses: self.url = self.addresses[name] self.dataset_prepared = True else: print("Incorrect name of dataset") self.dataset_prepared = False