uploader-bot/app/core/models/node_storage.py

111 lines
4.5 KiB
Python

from base58 import b58decode
from sqlalchemy import Column, BigInteger, Integer, String, ForeignKey, DateTime, JSON, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.logger import make_log
from app.core._config import UPLOADS_DIR, PROJECT_HOST
import os
from app.core.content.content_id import ContentId
from app.core.models.content.indexation_mixins import NodeStorageIndexationMixin
from .base import AlchemyBase
class StoredContent(AlchemyBase, NodeStorageIndexationMixin):
__tablename__ = 'node_storage'
id = Column(Integer, autoincrement=True, primary_key=True)
type = Column(String(32), nullable=False)
hash = Column(String(64), nullable=False, unique=True) # base58
content_id = Column(String(512), nullable=True) # base58
onchain_index = Column(BigInteger, nullable=True, default=None)
status = Column(String(32), nullable=True)
filename = Column(String(1024), nullable=False)
meta = Column(JSON, nullable=False, default={})
user_id = Column(Integer, ForeignKey('users.id'), nullable=True)
owner_address = Column(String(1024), nullable=True)
btfs_cid = Column(String(1024), nullable=True)
ipfs_cid = Column(String(1024), nullable=True)
telegram_cid = Column(String(1024), nullable=True)
codebase_version = Column(Integer, nullable=True)
created = Column(DateTime, nullable=False, default=0)
updated = Column(DateTime, nullable=False, default=0)
disabled = Column(DateTime, nullable=False, default=0)
disabled_by = Column(Integer, ForeignKey('users.id'), nullable=True, default=None)
encrypted = Column(Boolean, nullable=False, default=False)
decrypted_content_id = Column(Integer, ForeignKey('node_storage.id'), nullable=True, default=None)
key_id = Column(Integer, ForeignKey('known_keys.id'), nullable=True, default=None)
user = relationship('User', uselist=False, foreign_keys=[user_id])
key = relationship('KnownKey', uselist=False, foreign_keys=[key_id])
decrypted_content = relationship('StoredContent', uselist=False, foreign_keys=[decrypted_content_id])
@property
def cid(self) -> ContentId:
return ContentId(
content_hash=b58decode(self.hash),
onchain_index=self.onchain_index,
accept_type=self.meta.get('content_type', 'image/jpeg')
)
@property
def filepath(self) -> str:
return os.path.join(UPLOADS_DIR, self.hash)
@property
def web_url(self) -> str:
return f"{PROJECT_HOST}/api/v1/storage/{self.cid.serialize_v2(include_accept_type=True)}"
@property
def decrypt_possible(self) -> bool:
if self.encrypted is False:
return True
return bool(self.key_id or self.decrypted_content_id)
def json_format(self):
extra_fields = {}
if self.btfs_cid:
extra_fields['btfs_cid'] = self.btfs_cid
if self.ipfs_cid:
extra_fields['ipfs_cid'] = self.ipfs_cid
if self.type.startswith('local'):
extra_fields['filename'] = self.filename
extra_fields['encrypted'] = self.encrypted
elif self.type.startswith('onchain'):
extra_fields['onchain_index'] = self.onchain_index
extra_fields['owner_address'] = self.owner_address
for k in [
'item_address', 'license_type',
'metadata_cid', 'content_cid', 'cover_cid',
]:
extra_fields[k] = self.meta.get(k, None)
return {
**extra_fields,
"hash": self.hash,
"cid": self.cid.serialize_v2(),
"content_type": self.meta.get('content_type', 'application/x-binary'),
"status": self.status,
"updated": self.updated.isoformat() if isinstance(self.updated, datetime) else (make_log("Content.json_format", f"Invalid Content.updated: {self.updated} ({type(self.updated)})", level="error") or None),
"created": self.created.isoformat() if isinstance(self.created, datetime) else (make_log("Content.json_format", f"Invalid Content.created: {self.created} ({type(self.created)})", level="error") or None),
}
@classmethod
def from_cid(cls, db_session, content_id):
if isinstance(content_id, str):
cid = ContentId.deserialize(content_id)
else:
cid = content_id
content = db_session.query(StoredContent).filter(StoredContent.hash == cid.content_hash_b58).first()
assert content, "Content not found"
return content