import os
import time
import json
from typing import List, Dict, Generator, Callable, Union, Optional, TypeVar, Any, cast
import grpc
import pandas as pd
from google.protobuf.json_format import MessageToJson
from google.protobuf.timestamp_pb2 import Timestamp
from python_pachyderm.client.pfs.pfs_pb2 import (
Repo, Commit, Branch, File,
ListRepoRequest, InspectRepoRequest, ListCommitRequest, ListBranchRequest, GlobFileRequest,
CreateRepoRequest, DeleteRepoRequest,
CreateBranchRequest, DeleteBranchRequest,
StartCommitRequest, FinishCommitRequest, DeleteCommitRequest, FlushCommitRequest,
GetFileRequest, PutFileRequest, DeleteFileRequest,
RESERVED as FILETYPE_RESERVED, FILE as FILETYPE_FILE, DIR as FILETYPE_DIR,
NONE as DELIMITER_NONE, JSON as DELIMITER_JSON, LINE as DELIMITER_LINE, SQL as DELIMITER_SQL, CSV as DELIMITER_CSV
)
from python_pachyderm.client.pfs.pfs_pb2_grpc import APIStub as PfsAPIStub
from python_pachyderm.client.pps.pps_pb2 import (
Pipeline, Job, Input, Transform, Datum,
ListPipelineRequest, ListJobRequest, ListDatumRequest, GetLogsRequest, DeleteJobRequest,
CreatePipelineRequest, DeletePipelineRequest, StartPipelineRequest, StopPipelineRequest, RunPipelineRequest,
InspectPipelineRequest, InspectJobRequest, InspectDatumRequest,
FAILED as DATUM_FAILED, SUCCESS as DATUM_SUCCESS, SKIPPED as DATUM_SKIPPED, STARTING as DATUM_STARTING,
JOB_STARTING, JOB_RUNNING, JOB_MERGING, JOB_SUCCESS, JOB_FAILURE, JOB_KILLED,
PIPELINE_STARTING, PIPELINE_RUNNING, PIPELINE_RESTARTING, PIPELINE_FAILURE, PIPELINE_PAUSED, PIPELINE_STANDBY,
)
from python_pachyderm.client.pps.pps_pb2_grpc import APIStub as PpsAPIStub
from python_pachyderm.client.version.versionpb.version_pb2 import Version
from python_pachyderm.client.version.versionpb.version_pb2_grpc import APIStub as VersionAPIStub
from .utils import invert_dict, to_timestamp, to_timedelta
T = TypeVar('T')
pipeline_state_mapping = {
PIPELINE_STARTING: 'starting',
PIPELINE_RUNNING: 'running',
PIPELINE_RESTARTING: 'restarting',
PIPELINE_FAILURE: 'failure',
PIPELINE_PAUSED: 'paused',
PIPELINE_STANDBY: 'standby',
'PIPELINE_STARTING': 'starting',
'PIPELINE_RUNNING': 'running',
'PIPELINE_RESTARTING': 'restarting',
'PIPELINE_FAILURE': 'failure',
'PIPELINE_PAUSED': 'paused',
'PIPELINE_STANDBY': 'standby',
}
job_state_mapping = {
JOB_STARTING: 'starting',
JOB_RUNNING: 'running',
JOB_MERGING: 'merging',
JOB_FAILURE: 'failure',
JOB_SUCCESS: 'success',
JOB_KILLED: 'killed',
'JOB_STARTING': 'starting',
'JOB_RUNNING': 'running',
'JOB_MERGING': 'merging',
'JOB_FAILURE': 'failure',
'JOB_SUCCESS': 'success',
'JOB_KILLED': 'killed',
}
datum_state_mapping = {
DATUM_FAILED: 'failed',
DATUM_SUCCESS: 'success',
DATUM_SKIPPED: 'skipped',
DATUM_STARTING: 'starting',
'FAILED': 'failed',
'SUCCESS': 'success',
'SKIPPED': 'skipped',
'STARTING': 'starting',
}
file_type_mapping = {
FILETYPE_RESERVED: 'reserved',
FILETYPE_FILE: 'file',
FILETYPE_DIR: 'dir',
'RESERVED': 'reserved',
'FILE': 'file',
'DIR': 'dir',
}
class PachydermError(Exception):
def __init__(self, message: str, code=None):
super().__init__(message)
try:
self.status_code = code.value[0]
self.status = code.value[1]
except (AttributeError, KeyError):
self.status_code = None
self.status = None
def retry(f: T) -> T:
def retry_wrapper(self, *args, **kwargs):
adapter = self.adapter if hasattr(self, 'adapter') else self
try:
return f(self, *args, **kwargs)
except grpc._channel._Rendezvous as e:
if e.code().value[1] == 'unavailable' and adapter._retries < adapter._max_retries:
if adapter.check_connectivity():
adapter._retries += 1
return retry_wrapper(self, *args, **kwargs)
raise PachydermError(e.details(), e.code())
else:
adapter._retries = 0
adapter._connectable = True
return cast(T, retry_wrapper)
class PachydermAdapter:
"""Adapter class handling communication with Pachyderm.
It is effectively a wrapper around the python_pachyderm package.
"""
def __init__(self, host: Optional[str] = None, port: Optional[int] = None):
if host is None:
host = os.getenv('PACHD_ADDRESS')
if host is None:
try:
with open(os.path.expanduser('~/.pachyderm/config.json'), 'r') as f:
config = json.load(f)
host = config['v1']['pachd_address']
except (json.JSONDecodeError, KeyError):
pass
if host is not None and ':' in host:
host_split = host.split(':')
host = host_split[0]
port = int(host_split[1])
self.host = host or 'localhost'
self.port = port or 30650
self.channel = grpc.insecure_channel(f'{self.host}:{self.port}')
self.pfs_stub = PfsAPIStub(self.channel)
self.pps_stub = PpsAPIStub(self.channel)
self.version_stub = VersionAPIStub(self.channel)
self._retries = 0
self._max_retries = 1
self._connectable: Optional[bool] = None
def check_connectivity(self, timeout: float = 10.0) -> bool:
"""Checks the connectivity to pachd. Tries to connect if not currently connected.
The gRPC channel connectivity knows 5 states:
0 = idle, 1 = connecting, 2 = ready, 3 = transient failure, 4 = shutdown.
Args:
timeout: Timeout in seconds.
Returns:
True if the connectivity state is ready (2), False otherwise.
"""
connectivity = 0
timeout = time.time() + timeout
connectivity = self.channel._channel.check_connectivity_state(True)
while connectivity < 2:
if time.time() > timeout:
connectivity = 5
break
time.sleep(0.001)
connectivity = self.channel._channel.check_connectivity_state(False)
self._connectable = connectivity == 2
return self._connectable
@retry
def list_repos(self) -> pd.DataFrame:
res = []
for repo in self.pfs_stub.ListRepo(ListRepoRequest()).repo_info:
res.append({
'repo': repo.repo.name,
'size_bytes': repo.size_bytes,
'branches': [b.name for b in repo.branches],
'created': to_timestamp(repo.created.seconds, repo.created.nanos),
})
return pd.DataFrame(res, columns=['repo', 'size_bytes', 'branches', 'created']) \
.astype({'size_bytes': 'int', 'created': 'datetime64[ns, UTC]'})
@retry
def list_repo_names(self) -> List[str]:
repos = self.pfs_stub.ListRepo(ListRepoRequest()).repo_info
return [r.repo.name for r in repos]
@retry
def list_branch_heads(self, repo: str) -> Dict[str, str]:
branches = self.pfs_stub.ListBranch(ListBranchRequest(repo=Repo(name=repo))).branch_info
return {b.branch.name: b.head.id for b in branches}
@retry
def list_commits(self, repo: str, n: int = 20) -> pd.DataFrame:
i = 1
res = []
commit_branches = invert_dict(self.list_branch_heads(repo))
stream = self.pfs_stub.ListCommitStream(ListCommitRequest(repo=Repo(name=repo)))
for commit in stream:
res.append({
'repo': commit.commit.repo.name,
'commit': commit.commit.id,
'parent_commit': commit.parent_commit.id,
'branches': commit_branches.get(commit.commit.id, []),
'size_bytes': commit.size_bytes,
'started': to_timestamp(commit.started.seconds, commit.started.nanos),
'finished': to_timestamp(commit.finished.seconds, commit.finished.nanos),
})
i += 1
if n is not None and i > n:
stream.cancel()
break
return pd.DataFrame(res, columns=['repo', 'commit', 'branches', 'size_bytes', 'started', 'finished', 'parent_commit']) \
.astype({'size_bytes': 'int', 'started': 'datetime64[ns, UTC]', 'finished': 'datetime64[ns, UTC]'})
@retry
def list_files(self, repo: str, branch: Optional[str] = 'master', commit: Optional[str] = None, glob: str = '**') -> pd.DataFrame:
if branch is None and commit is None:
raise ValueError('branch and commit cannot both be None')
res = []
branch_heads = self.list_branch_heads(repo)
commit_branches = invert_dict(branch_heads)
if commit is None and branch is not None:
commit = branch_heads.get(branch)
if commit is not None:
commit = Commit(repo=Repo(name=repo), id=commit)
for file in self.pfs_stub.GlobFileStream(GlobFileRequest(commit=commit, pattern=glob)):
res.append({
'repo': file.file.commit.repo.name,
'commit': file.file.commit.id,
'branches': commit_branches.get(file.file.commit.id, []),
'path': file.file.path,
'type': file_type_mapping.get(file.file_type, 'unknown'),
'size_bytes': file.size_bytes,
'committed': to_timestamp(file.committed.seconds, file.committed.nanos),
})
return pd.DataFrame(res, columns=['repo', 'commit', 'branches', 'path', 'type', 'size_bytes', 'committed']) \
.astype({'size_bytes': 'int', 'committed': 'datetime64[ns, UTC]'})
@retry
def list_pipelines(self) -> pd.DataFrame:
def input_string(i: Input) -> str:
if i.cross:
return '(' + ' ⨯ '.join([input_string(j) for j in i.cross]) + ')'
elif i.union:
return '(' + ' ∪ '.join([input_string(j) for j in i.union]) + ')'
elif i.pfs.name:
name = i.pfs.name + ('/' + i.pfs.branch if i.pfs.branch != 'master' else '')
return str(name + ':' + i.pfs.glob)
elif i.cron.name:
return str(i.cron.name)
elif i.git.name:
return str(i.git.name + ('/' + i.git.branch if i.git.branch != 'master' else ''))
else:
return '?'
def input_repos(i: Input) -> Generator[str, None, None]:
cross_or_union = i.cross or i.union
if cross_or_union:
for j in cross_or_union:
yield from input_repos(j)
elif i.pfs.repo:
yield i.pfs.repo
res = []
pipelines = self.pps_stub.ListPipeline(ListPipelineRequest()).pipeline_info
for pipeline in pipelines:
res.append({
'pipeline': pipeline.pipeline.name,
'image': pipeline.transform.image,
'cron_spec': ', '.join([cron['spec'] for cron in self._get_pipeline_input_cron_specs(pipeline.input)]),
'input': input_string(pipeline.input),
'input_repos': list(input_repos(pipeline.input)),
'output_branch': pipeline.output_branch,
'parallelism_constant': pipeline.parallelism_spec.constant,
'parallelism_coefficient': pipeline.parallelism_spec.coefficient,
'datum_tries': pipeline.datum_tries,
'max_queue_size': pipeline.max_queue_size,
'jobs_running': pipeline.job_counts[JOB_RUNNING],
'jobs_success': pipeline.job_counts[JOB_SUCCESS],
'jobs_failure': pipeline.job_counts[JOB_FAILURE],
'created': to_timestamp(pipeline.created_at.seconds, pipeline.created_at.nanos),
'state': pipeline_state_mapping.get(pipeline.state, 'unknown'),
})
return pd.DataFrame(res, columns=[
'pipeline', 'state', 'image', 'cron_spec', 'input', 'input_repos', 'output_branch',
'parallelism_constant', 'parallelism_coefficient', 'datum_tries', 'max_queue_size',
'jobs_running', 'jobs_success', 'jobs_failure', 'created',
]).astype({
'parallelism_constant': 'int',
'parallelism_coefficient': 'float',
'datum_tries': 'int',
'jobs_running': 'int',
'jobs_success': 'int',
'jobs_failure': 'int',
'created': 'datetime64[ns, UTC]',
})
@retry
def list_pipeline_names(self) -> List[str]:
pipelines = self.pps_stub.ListPipeline(ListPipelineRequest()).pipeline_info
return [p.pipeline.name for p in pipelines]
@retry
def list_jobs(self, pipeline: Optional[str] = None, n: int = 20) -> pd.DataFrame:
i = 1
res = []
pipeline_obj = Pipeline(name=pipeline) if pipeline is not None else None
jobs_stream = self.pps_stub.ListJobStream(ListJobRequest(pipeline=pipeline_obj))
for job in jobs_stream:
res.append({
'job': job.job.id,
'pipeline': job.pipeline.name,
'state': job_state_mapping.get(job.state, 'unknown'),
'started': to_timestamp(job.started.seconds, job.started.nanos),
'finished': to_timestamp(job.finished.seconds, job.finished.nanos),
'restart': job.restart,
'data_processed': job.data_processed,
'data_skipped': job.data_skipped,
'data_total': job.data_total,
'download_time': to_timedelta(job.stats.download_time.seconds, job.stats.download_time.nanos),
'process_time': to_timedelta(job.stats.process_time.seconds, job.stats.process_time.nanos),
'upload_time': to_timedelta(job.stats.upload_time.seconds, job.stats.upload_time.nanos),
'download_bytes': job.stats.download_bytes,
'upload_bytes': job.stats.upload_bytes,
'output_commit': job.output_commit.id,
})
i += 1
if n is not None and i > n:
jobs_stream.cancel()
break
return pd.DataFrame(res, columns=[
'job', 'pipeline', 'state', 'started', 'finished', 'restart',
'data_processed', 'data_skipped', 'data_total',
'download_time', 'process_time', 'upload_time',
'download_bytes', 'upload_bytes', 'output_commit'
]).astype({
'started': 'datetime64[ns, UTC]',
'finished': 'datetime64[ns, UTC]',
'restart': 'int',
'data_processed': 'int',
'data_skipped': 'int',
'data_total': 'int',
'download_time': 'timedelta64[ns]',
'process_time': 'timedelta64[ns]',
'upload_time': 'timedelta64[ns]',
'download_bytes': 'float',
'upload_bytes': 'float',
})
@retry
def list_datums(self, job: str) -> pd.DataFrame:
file_type_mapping = {
FILETYPE_RESERVED: 'reserved',
FILETYPE_FILE: 'file',
FILETYPE_DIR: 'dir',
}
res = []
datums_stream = self.pps_stub.ListDatumStream(ListDatumRequest(job=Job(id=job)))
for datum in datums_stream:
for data in datum.datum_info.data:
res.append({
'job': datum.datum_info.datum.job.id,
'datum': datum.datum_info.datum.id,
'state': datum_state_mapping.get(datum.datum_info.state, 'unknown'),
'repo': data.file.commit.repo.name,
'commit': data.file.commit.id,
'path': data.file.path,
'type': file_type_mapping.get(data.file_type, 'unknown'),
'size_bytes': data.size_bytes,
'committed': to_timestamp(data.committed.seconds, data.committed.nanos),
})
return pd.DataFrame(res, columns=['job', 'datum', 'state', 'repo', 'path', 'type', 'size_bytes', 'commit', 'committed']) \
.astype({'size_bytes': 'int', 'committed': 'datetime64[ns, UTC]'})
@retry
def get_logs(self, pipeline: Optional[str] = None, job: Optional[str] = None, master: bool = False, tail: int = 0) -> pd.DataFrame:
pipeline = Pipeline(name=pipeline) if pipeline else None
job = Job(id=job) if job else None
if pipeline is None and job is None:
raise ValueError('One of `pipeline` or `job` must be specified')
res = []
logs = self.pps_stub.GetLogs(GetLogsRequest(pipeline=pipeline, job=job, master=master, tail=tail))
for msg in logs:
message = msg.message.strip()
if message:
res.append({
'pipeline': msg.pipeline_name,
'job': msg.job_id,
'ts': to_timestamp(msg.ts.seconds, msg.ts.nanos),
'message': message,
'worker': msg.worker_id,
'datum': msg.datum_id,
'user': msg.user,
})
return pd.DataFrame(res, columns=[
'pipeline', 'job', 'ts', 'message',
'worker', 'datum', 'user'
]).astype({
'ts': 'datetime64[ns, UTC]',
'user': 'bool',
})
@retry
def inspect_repo(self, repo: str) -> Dict[str, Any]:
res = self.pfs_stub.InspectRepo(InspectRepoRequest(repo=Repo(name=repo)))
info = dict(json.loads(MessageToJson(res)))
for k in ['created']:
if k in info:
info[k] = pd.to_datetime(info[k]).to_pydatetime(warn=False)
for k in ['sizeBytes']:
if k in info:
info[k] = int(info[k])
return info
@retry
def inspect_pipeline(self, pipeline: str) -> Dict[str, Any]:
res = self.pps_stub.InspectPipeline(InspectPipelineRequest(pipeline=Pipeline(name=pipeline)))
info = dict(json.loads(MessageToJson(res)))
for k in ['version', 'maxQueueSize', 'datumTries']:
if k in info:
info[k] = int(info[k])
for k in ['createdAt']:
if k in info:
info[k] = pd.to_datetime(info[k]).to_pydatetime(warn=False)
if 'input' in info:
info['input'] = self._transform_cron_start(info['input'], lambda x: pd.to_datetime(x).to_pydatetime(warn=False))
if 'state' in info:
info['state'] = pipeline_state_mapping[info['state']]
if 'jobCounts' in info:
info['jobCounts'] = {job_state_mapping[int(k)]: v for k, v in info['jobCounts'].items()}
if 'lastJobState' in info:
info['lastJobState'] = job_state_mapping[info['lastJobState']]
return info
@retry
def inspect_job(self, job: str) -> Dict[str, Any]:
res = self.pps_stub.InspectJob(InspectJobRequest(job=Job(id=job)))
info = dict(json.loads(MessageToJson(res)))
for k in ['pipelineVersion', 'dataProcessed', 'dataSkipped', 'dataTotal', 'datumTries']:
if k in info:
info[k] = int(info[k])
for k in ['started', 'finished']:
if k in info:
info[k] = pd.to_datetime(info[k]).to_pydatetime(warn=False)
if 'input' in info:
info['input'] = self._transform_cron_start(info['input'], lambda x: pd.to_datetime(x).to_pydatetime(warn=False))
if 'state' in info:
info['state'] = job_state_mapping[info['state']]
if 'stats' in info:
for k in ['downloadTime', 'processTime', 'uploadTime']:
if k in info['stats']:
info['stats'][k] = float(info['stats'][k][:-1])
for k in ['downloadBytes', 'uploadBytes']:
if k in info['stats']:
info['stats'][k] = int(info['stats'][k])
return info
@retry
def inspect_datum(self, job: str, datum: str) -> Dict[str, Any]:
res = self.pps_stub.InspectDatum(InspectDatumRequest(datum=Datum(id=datum, job=Job(id=job))))
info = dict(json.loads(MessageToJson(res)))
if 'state' in info:
info['state'] = datum_state_mapping[info['state']]
if 'stats' in info:
for k in ['downloadTime', 'processTime', 'uploadTime']:
if k in info['stats']:
info['stats'][k] = float(info['stats'][k][:-1])
for file in info['data']:
file['committed'] = pd.to_datetime(file['committed']).to_pydatetime(warn=False)
file['fileType'] = file_type_mapping[file['fileType']]
return info
@retry
def create_pipeline(self, pipeline_spec: dict) -> None:
fields = {f.name for f in CreatePipelineRequest.DESCRIPTOR.fields}
transform_fields = {f.name for f in Transform.DESCRIPTOR.fields}
pipeline_spec = {k: v for k, v in pipeline_spec.items() if k in fields}
pipeline_spec['transform'] = {k: v for k, v in pipeline_spec['transform'].items() if k in transform_fields}
def protobuf_timestamp(x):
dt = pd.to_datetime(x)
if dt.tzinfo:
dt = dt.tz_convert('utc').tz_localize(None)
ts = Timestamp()
ts.FromDatetime(dt.to_pydatetime())
return ts
pipeline_spec['input'] = self._transform_cron_start(pipeline_spec['input'], protobuf_timestamp)
self.pps_stub.CreatePipeline(CreatePipelineRequest(**pipeline_spec))
def update_pipeline(self, pipeline_spec: dict, reprocess: bool = False) -> None:
pipeline_spec['update'] = True
pipeline_spec['reprocess'] = reprocess
self.create_pipeline(pipeline_spec)
@retry
def delete_pipeline(self, pipeline: str) -> None:
self.pps_stub.DeletePipeline(DeletePipelineRequest(pipeline=Pipeline(name=pipeline)))
@retry
def start_pipeline(self, pipeline: str) -> None:
self.pps_stub.StartPipeline(StartPipelineRequest(pipeline=Pipeline(name=pipeline)))
@retry
def stop_pipeline(self, pipeline: str) -> None:
self.pps_stub.StopPipeline(StopPipelineRequest(pipeline=Pipeline(name=pipeline)))
@retry
def get_pipeline_cron_specs(self, pipeline: str) -> List[Dict[str, Any]]:
res = self.pps_stub.InspectPipeline(InspectPipelineRequest(pipeline=Pipeline(name=pipeline)))
return list(self._get_pipeline_input_cron_specs(res.input))
@retry
def create_repo(self, repo: str, description: Optional[str] = None) -> None:
self.pfs_stub.CreateRepo(CreateRepoRequest(repo=Repo(name=repo), description=description))
@retry
def delete_repo(self, repo: str, force: bool = False) -> None:
self.pfs_stub.DeleteRepo(DeleteRepoRequest(repo=Repo(name=repo), force=force))
@retry
def delete_commit(self, repo: str, commit: str) -> None:
self.pfs_stub.DeleteCommit(DeleteCommitRequest(commit=Commit(repo=Repo(name=repo), id=commit)))
@retry
def create_branch(self, repo: str, commit: str, branch: str) -> None:
repo = Repo(name=repo)
commit = Commit(repo=repo, id=commit)
branch = Branch(repo=repo, name=branch)
self.pfs_stub.CreateBranch(CreateBranchRequest(head=commit, branch=branch))
@retry
def delete_branch(self, repo: str, branch: str) -> None:
self.pfs_stub.DeleteBranch(DeleteBranchRequest(branch=Branch(repo=Repo(name=repo), name=branch)))
@retry
def delete_job(self, job: str) -> None:
self.pps_stub.DeleteJob(DeleteJobRequest(job=Job(id=job)))
@retry
def get_file(self, repo: str, path: str, branch: Optional[str] = 'master', commit: Optional[str] = None) -> Generator[bytes, None, None]:
if commit is None and branch is not None:
commit = self.list_branch_heads(repo).get(branch)
file = File(commit=Commit(repo=Repo(name=repo), id=commit), path=path)
response = self.pfs_stub.GetFile(GetFileRequest(file=file))
for content in response:
yield content.value
@retry
def run_pipeline(self, pipeline: str) -> None:
self.pps_stub.RunPipeline(RunPipelineRequest(pipeline=Pipeline(name=pipeline)))
@retry
def get_version(self) -> str:
version = self.version_stub.GetVersion(Version())
return f'{version.major}.{version.minor}.{version.micro}'
@classmethod
def _get_pipeline_input_cron_specs(cls, i: Input) -> Generator[Dict[str, Any], None, None]:
if i.cron.spec != '':
yield {
'name': str(i.cron.name),
'spec': str(i.cron.spec),
'repo': str(i.cron.repo),
'overwrite': bool(i.cron.overwrite),
}
cross_or_union = i.cross or i.union
if cross_or_union:
for j in cross_or_union:
yield from cls._get_pipeline_input_cron_specs(j)
@classmethod
def _transform_cron_start(cls, i: dict, transformer: Callable) -> dict:
for k, v in i.items():
if k == 'cron' and 'start' in v:
v['start'] = transformer(v['start'])
elif k in ('cross', 'union'):
for j in v:
j = cls._transform_cron_start(j, transformer)
return i
class PachydermCommitAdapter:
def __init__(self, adapter: PachydermAdapter, repo: str, branch: Optional[str] = 'master', parent_commit: Optional[str] = None, flush: bool = False):
self.adapter = adapter
self.repo = repo
self.branch = branch
self.parent_commit = parent_commit
self.flush_ = flush
self._commit = None
self._finished = False
self._buffer_size = 3 * 1024 * 1024 # 3 MB
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.delete()
else:
self.finish()
if self.flush_:
self.flush()
@property
def commit(self) -> Optional[str]:
"""Commit ID."""
if self._commit is not None:
return str(self._commit.id)
else:
return None
@property
def finished(self) -> bool:
"""Whether the commit is finished."""
return self._finished
@retry
def start(self):
"""Start the commit."""
self._raise_if_finished()
parent_commit = Commit(repo=Repo(name=self.repo), id=self.parent_commit)
self._commit = self.adapter.pfs_stub.StartCommit(StartCommitRequest(parent=parent_commit, branch=self.branch))
@retry
def finish(self):
"""Finish the commit."""
self._raise_if_finished()
self.adapter.pfs_stub.FinishCommit(FinishCommitRequest(commit=self._commit))
self._finished = True
@retry
def flush(self, to_repos=None):
"""Blocks until all jobs triggered by this commit have finished.
Args:
to_repos: If specified, only the commits up to and including those repos
will be considered, otherwise all repos are considered.
"""
if to_repos:
to_repos = [to_repos] if isinstance(to_repos, str) else to_repos
to_repos = [Repo(name=repo) for repo in to_repos]
res = self.adapter.pfs_stub.FlushCommit(FlushCommitRequest(commits=[self._commit], to_repos=to_repos or []))
for _ in res:
if res.done():
return
time.sleep(0.25)
@retry
def delete(self):
"""Delete the commit."""
self.adapter.pfs_stub.DeleteCommit(DeleteCommitRequest(commit=self._commit))
self._finished = True
@retry
def put_file_bytes(self, value: Union[str, bytes], path: str, encoding: str = 'utf-8',
delimiter: Optional[str] = None, target_file_datums: int = 0, target_file_bytes: int = 0) -> None:
"""Uploads a string or bytes `value` to a file in the given `path` in PFS.
Args:
value: The value to upload. If a string is given, it will be encoded using `encoding` (default UTF-8).
path: PFS path to upload file to. Needs to be the full PFS path including filename.
delimiter: Causes data to be broken up into separate files with `path` as a prefix.
Possible values are 'none' (default), 'json', 'line', 'sql' and 'csv'.
target_file_datum: Specifies the target number of datums in each written file.
It may be lower if data does not split evenly, but will never be higher, unless the value is 0 (default).
target_file_bytes: Specifies the target number of bytes in each written file.
Files may have more or fewer bytes than the target.
"""
self._raise_if_finished()
if isinstance(value, str):
value = value.encode(encoding)
delimiter = {
None: DELIMITER_NONE,
'none': DELIMITER_NONE,
'json': DELIMITER_JSON,
'line': DELIMITER_LINE,
'sql': DELIMITER_SQL,
'csv': DELIMITER_CSV
}[delimiter.lower() if delimiter is not None else None]
file = File(commit=self._commit, path=path)
def _blocks(v):
for i in range(0, len(v) + 1, self._buffer_size):
yield PutFileRequest(file=file, value=v[i:i + self._buffer_size], delimiter=delimiter,
target_file_datums=target_file_datums, target_file_bytes=target_file_bytes)
self.adapter.pfs_stub.PutFile(_blocks(value))
@retry
def put_file_url(self, url: str, path: str, recursive: bool = False) -> None:
"""Uploads a file using the content found at a URL.
The URL is sent to the server which performs the request.
Args:
url: The URL to download content from.
path: PFS path to upload file to. Needs to be the full PFS path including filename.
recursive: Allow recursive scraping of some URL types, e.g. on s3:// URLs.
"""
self._raise_if_finished()
file = File(commit=self._commit, path=path)
self.adapter.pfs_stub.PutFile(iter([PutFileRequest(file=file, url=url, recursive=recursive)]))
@retry
def delete_file(self, path: str) -> None:
"""Deletes the file found in a given `path` in PFS, if it exists.
Args:
path: PFS path of file to delete.
"""
self._raise_if_finished()
self.adapter.pfs_stub.DeleteFile(DeleteFileRequest(file=File(commit=self._commit, path=path)))
@retry
def create_branch(self, branch: str) -> None:
"""Sets this commit as a branch.
Args:
branch: Name of the branch.
"""
branch = Branch(repo=Repo(name=self.repo), name=branch)
self.adapter.pfs_stub.CreateBranch(CreateBranchRequest(head=self._commit, branch=branch))
@retry
def _list_file_paths(self, glob: str) -> List[str]:
return [str(f.file.path) for f in self.adapter.pfs_stub.GlobFileStream(GlobFileRequest(commit=self._commit, pattern=glob))]
def _raise_if_finished(self):
if self.finished:
raise PachydermError(f'Commit {self.commit} is already finished')