uploader-bot/app/core/models/node_storage.py

145 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from base58 import b58decode
from sqlalchemy import Column, BigInteger, Integer, String, ForeignKey, DateTime, JSON, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.logger import make_log
from app.core._config import UPLOADS_DIR, PROJECT_HOST
import os
from app.core.content.content_id import ContentId
from app.core.content.audio import AudioContentMixin
from app.core.content.image import ImageContentMixin
# from app.core.models.content.indexation_mixins import NodeStorageIndexationMixin
from .base import AlchemyBase
class StoredContent(AlchemyBase, AudioContentMixin):
__tablename__ = 'node_storage'
id = Column(Integer, autoincrement=True, primary_key=True)
type = Column(String(32), nullable=False)
hash = Column(String(64), nullable=False, unique=True) # base58
content_id = Column(String(512), nullable=True) # base58
onchain_index = Column(BigInteger, nullable=True, default=None)
status = Column(String(32), nullable=True)
filename = Column(String(1024), nullable=False)
meta = Column(JSON, nullable=False, default={})
user_id = Column(Integer, ForeignKey('users.id'), nullable=True)
owner_address = Column(String(1024), nullable=True)
btfs_cid = Column(String(1024), nullable=True)
ipfs_cid = Column(String(1024), nullable=True)
telegram_cid = Column(String(1024), nullable=True)
codebase_version = Column(Integer, nullable=True)
created = Column(DateTime, nullable=False, default=0)
updated = Column(DateTime, nullable=False, default=0)
disabled = Column(DateTime, nullable=False, default=0)
disabled_by = Column(Integer, ForeignKey('users.id'), nullable=True, default=None)
encrypted = Column(Boolean, nullable=False, default=False)
decrypted_content_id = Column(Integer, ForeignKey('node_storage.id'), nullable=True, default=None)
key_id = Column(Integer, ForeignKey('known_keys.id'), nullable=True, default=None)
user = relationship('User', uselist=False, foreign_keys=[user_id])
key = relationship('KnownKey', uselist=False, foreign_keys=[key_id])
decrypted_content = relationship('StoredContent', uselist=False, foreign_keys=[decrypted_content_id])
@property
def cid(self) -> ContentId:
return ContentId(
content_hash=b58decode(self.hash),
onchain_index=self.onchain_index,
accept_type=self.meta.get('content_type', 'image/jpeg')
)
@property
def filepath(self) -> str:
return os.path.join(UPLOADS_DIR, self.hash)
@property
def web_url(self) -> str:
return f"{PROJECT_HOST}/api/v1/storage/{self.cid.serialize_v2(include_accept_type=True)}"
@property
def decrypt_possible(self) -> bool:
if self.encrypted is False:
return True
return bool(self.key_id or self.decrypted_content_id)
def open_content(self, db_session, content_type=None):
try:
# Получение StoredContent в прод-виде с указанием типа данных и доступный для перевода в другие форматы
decrypted_content = self if not self.encrypted else None
encrypted_content = self if self.encrypted else None
content_type = None
if not decrypted_content:
decrypted_content = db_session.query(StoredContent).filter(StoredContent.id == self.decrypted_content_id).first()
else:
encrypted_content = db_session.query(StoredContent).filter(StoredContent.decrypted_content_id == self.id).first()
assert decrypted_content, "Can't get decrypted content"
assert encrypted_content, "Can't get encrypted content"
content_type = content_type or decrypted_content.json_format()['content_type']
content_type, content_encoding = content_type.split('/')
return {
'encrypted_content': encrypted_content,
'decrypted_content': decrypted_content,
'content_type': content_type or 'application/x-binary'
}
except BaseException as e:
make_log("NodeStorage.open_content", f"Can't open content: {self.id} {e}", level='error')
raise e
def json_format(self):
extra_fields = {}
if self.btfs_cid:
extra_fields['btfs_cid'] = self.btfs_cid
if self.ipfs_cid:
extra_fields['ipfs_cid'] = self.ipfs_cid
if self.type.startswith('local'):
extra_fields['filename'] = self.filename
extra_fields['encrypted'] = self.encrypted
elif self.type.startswith('onchain'):
extra_fields['onchain_index'] = self.onchain_index
extra_fields['owner_address'] = self.owner_address
for k in [
'item_address', 'license_type',
'metadata_cid', 'content_cid', 'cover_cid', 'license'
]:
extra_fields[k] = self.meta.get(k, None)
spec_field_updated = self.updated.isoformat() if isinstance(self.updated, datetime) else (
datetime.fromisoformat(self.updated).isoformat() if isinstance(self.updated, str) else None
)
spec_field_created = self.created.isoformat() if isinstance(self.created, datetime) else (
datetime.fromisoformat(self.created).isoformat() if isinstance(self.created, str) else None
)
return {
**extra_fields,
"hash": self.hash,
"cid": self.cid.serialize_v2(),
"content_type": self.meta.get('content_type', 'application/x-binary'),
"status": self.status,
"created": spec_field_created,
"updated": spec_field_updated,
}
@classmethod
def from_cid(cls, db_session, content_id):
if isinstance(content_id, str):
cid = ContentId.deserialize(content_id)
else:
cid = content_id
content = db_session.query(StoredContent).filter(StoredContent.hash == cid.content_hash_b58).first()
assert content, "Content not found"
return content