111 lines
4.5 KiB
Python
111 lines
4.5 KiB
Python
from base58 import b58decode
|
|
from sqlalchemy import Column, BigInteger, Integer, String, ForeignKey, DateTime, JSON, Boolean
|
|
from sqlalchemy.orm import relationship
|
|
from datetime import datetime
|
|
|
|
from app.core.logger import make_log
|
|
from app.core._config import UPLOADS_DIR, PROJECT_HOST
|
|
import os
|
|
from app.core.content.content_id import ContentId
|
|
from app.core.models.content.indexation_mixins import NodeStorageIndexationMixin
|
|
from .base import AlchemyBase
|
|
|
|
|
|
class StoredContent(AlchemyBase, NodeStorageIndexationMixin):
|
|
__tablename__ = 'node_storage'
|
|
|
|
id = Column(Integer, autoincrement=True, primary_key=True)
|
|
type = Column(String(32), nullable=False)
|
|
hash = Column(String(64), nullable=False, unique=True) # base58
|
|
content_id = Column(String(512), nullable=True) # base58
|
|
onchain_index = Column(BigInteger, nullable=True, default=None)
|
|
|
|
status = Column(String(32), nullable=True)
|
|
filename = Column(String(1024), nullable=False)
|
|
meta = Column(JSON, nullable=False, default={})
|
|
|
|
user_id = Column(Integer, ForeignKey('users.id'), nullable=True)
|
|
owner_address = Column(String(1024), nullable=True)
|
|
|
|
btfs_cid = Column(String(1024), nullable=True)
|
|
ipfs_cid = Column(String(1024), nullable=True)
|
|
telegram_cid = Column(String(1024), nullable=True)
|
|
|
|
codebase_version = Column(Integer, nullable=True)
|
|
created = Column(DateTime, nullable=False, default=0)
|
|
updated = Column(DateTime, nullable=False, default=0)
|
|
disabled = Column(DateTime, nullable=False, default=0)
|
|
disabled_by = Column(Integer, ForeignKey('users.id'), nullable=True, default=None)
|
|
|
|
encrypted = Column(Boolean, nullable=False, default=False)
|
|
decrypted_content_id = Column(Integer, ForeignKey('node_storage.id'), nullable=True, default=None)
|
|
key_id = Column(Integer, ForeignKey('known_keys.id'), nullable=True, default=None)
|
|
|
|
user = relationship('User', uselist=False, foreign_keys=[user_id])
|
|
key = relationship('KnownKey', uselist=False, foreign_keys=[key_id])
|
|
decrypted_content = relationship('StoredContent', uselist=False, foreign_keys=[decrypted_content_id])
|
|
|
|
@property
|
|
def cid(self) -> ContentId:
|
|
return ContentId(
|
|
content_hash=b58decode(self.hash),
|
|
onchain_index=self.onchain_index,
|
|
accept_type=self.meta.get('content_type', 'image/jpeg')
|
|
)
|
|
|
|
@property
|
|
def filepath(self) -> str:
|
|
return os.path.join(UPLOADS_DIR, self.hash)
|
|
|
|
@property
|
|
def web_url(self) -> str:
|
|
return f"{PROJECT_HOST}/api/v1/storage/{self.cid.serialize_v2()}"
|
|
|
|
@property
|
|
def decrypt_possible(self) -> bool:
|
|
if self.encrypted is False:
|
|
return True
|
|
|
|
return bool(self.key_id or self.decrypted_content_id)
|
|
|
|
def json_format(self):
|
|
extra_fields = {}
|
|
if self.btfs_cid:
|
|
extra_fields['btfs_cid'] = self.btfs_cid
|
|
if self.ipfs_cid:
|
|
extra_fields['ipfs_cid'] = self.ipfs_cid
|
|
if self.type.startswith('local'):
|
|
extra_fields['filename'] = self.filename
|
|
extra_fields['encrypted'] = self.encrypted
|
|
elif self.type.startswith('onchain'):
|
|
extra_fields['onchain_index'] = self.onchain_index
|
|
extra_fields['owner_address'] = self.owner_address
|
|
for k in [
|
|
'item_address', 'license_type',
|
|
'metadata_cid', 'content_cid', 'cover_cid',
|
|
|
|
]:
|
|
extra_fields[k] = self.meta.get(k, None)
|
|
|
|
return {
|
|
**extra_fields,
|
|
"hash": self.hash,
|
|
"cid": self.cid.serialize_v2(),
|
|
"content_type": self.meta.get('content_type', 'application/x-binary'),
|
|
"status": self.status,
|
|
"updated": self.updated.isoformat() if isinstance(self.updated, datetime) else (make_log("Content.json_format", f"Invalid Content.updated: {self.updated} ({type(self.updated)})", level="error") or None),
|
|
"created": self.created.isoformat() if isinstance(self.created, datetime) else (make_log("Content.json_format", f"Invalid Content.created: {self.created} ({type(self.created)})", level="error") or None),
|
|
}
|
|
|
|
@classmethod
|
|
def from_cid(cls, db_session, content_id):
|
|
if isinstance(content_id, str):
|
|
cid = ContentId.deserialize(content_id)
|
|
else:
|
|
cid = content_id
|
|
|
|
content = db_session.query(StoredContent).filter(StoredContent.hash == cid.content_hash_b58).first()
|
|
assert content, "Content not found"
|
|
return content
|
|
|