Skip to content

Latest commit

 

History

History

flask

http://michal.karzynski.pl/blog/2016/06/19/building-beautiful-restful-apis-using-flask-swagger-ui-flask-restplus/
nice example

throw exception

                # raise werkzeug.exceptions.NotFound('LawFirms not exists by id')
                lawfirm_namespace.abort(404, 'LawFirm not exists by provided id')

flask namespace

# !!! don't use braces in description !!!
yelp_namespace = Namespace('yelp', description='Yelp review, communication with Yelp API')

flask view, flask complex return view

    'pgroups': fields.List(cls_or_instance=fields.String(required=True, description='id of groups'),
                           attribute=lambda x: x["pgroups"].split(",")),
    'practices': fields.List(cls_or_instance=fields.String(required=True, description='practices'),
                             attribute=lambda x: x["practices"].split(","))

    def parse_json_string(record: Dict) -> str:
        try:
            return json.loads(record.answer_options)
        except:
            return record.answer_options

    'answer_options': fields.Raw(description='answer_options', attribute=parse_json_string),

    'practices': fields.Nested(name="practice", as_list=True,
                               model={'practice': fields.Integer(attribute=lambda x: x)},
                               attribute=lambda row: [row['practice1'], row['practice2'], row['practice3']],
                               description="practices, delimited by comma"),			     

header, header request

    @namespace.response(200, "by user ")
    # !!! DON'T USER UNDERSCORE !!!
    @namespace.param(name="userid", description="id of user", _in="header")
    def get(self):
        input_arg = reqparse.RequestParser()\
            .add_argument(name="userid", type=int, location="headers")\
            .parse_args()
        user_id = input_arg["userid"]

No API definition provided. need to check all 'object_view'

@namespace.marshal_with(object_view)

initialization

    @app.before_first_request
    def load_db_settings(engine=None):
        if engine is None:
            engine = db.engine
        with engine.connect() as connection:
            settings.load_database_config(connection)

DateTime, datetime, default datetime, onupdate

    # added = Column(DateTime, nullable=False, server_default=sqlalchemy.sql.func.now())
    # added = Column(DateTime, nullable=False, server_default=text('NOW()'))

    first_created = Column(DateTime(), default=datetime.datetime.now)
    last_modified = Column(DateTime(), onupdate=datetime.datetime.now)

Download file

# for inpath parameter name avoid char '/'
# @image_namespace.route('/item/<int:image_id>/<string:name>')
#
@image_namespace.route('/item/<int:image_id>/<int:size_id>')
@image_namespace.response(404, 'Image(s) by Listing id not found.')
class Image(Resource):

    def __init__(self, api=None, *args, **kwargs):
        super().__init__(api, *args, **kwargs)
        self._log = logging.getLogger(self.__class__.__name__)

    @image_namespace.doc("get one image by id")
    @image_namespace.response(200, "file from external storage")
    @image_namespace.response(404, "can't find image")
    @image_namespace.response(500, "can't download image from external storage ")
    @image_namespace.produces(['image/png', 'image/bmp', 'image/jpeg', 'image/gif', 'image/tiff', 'image/webp'])
    def get(self, image_id: int, size_id: int):
        """
        Returns image by image_id ( ask /items endpoint )
        and number of size_id ( 75, 250, 500, 1000 )
        """
        image: OrmImage = session_aware(lambda session:
                                        session.query(OrmImage).filter_by(image_id=image_id).one_or_none())
        if not image:
            return {"message": "image not found"}, 404

        client_download_file_path = StorageUtils.get_path_to_download_file(image.image_name, size_id)
        server_uploaded_file_path = StorageUtils.get_path_to_upload_file(image.image_name, size_id)
        if not StorageClass().is_exist(server_uploaded_file_path):
            return {"message": "can't find image in external storage "}, 404
        if not StorageClass().load(server_uploaded_file_path, client_download_file_path):
            return {"message": "can't download image from external storage "}, 500
        image_file_path = unarchive_file(client_download_file_path)
        image_format = image_utils.get_file_format(image_file_path)
        return send_file(image_file_path,
                         mimetype=f"image/{image_format}",
                         as_attachment=True,
                         attachment_filename=image.image_name)

Upload file https://flask.palletsprojects.com/en/1.1.x/patterns/fileuploads/

image_upload_parser = reqparse.RequestParser() \
    .add_argument('image_body', type=FileStorage, location='files', required=True, help='image file')
""" request parser for image """


@image_namespace.route('/items/<int:id>')
class ImageItems(Resource):

    def __init__(self, api=None, *args, **kwargs):
        super().__init__(api, *args, **kwargs)
        self._log = logging.getLogger(self.__class__.__name__)

    @image_namespace.response(201, 'image was uploaded and processed')
    @image_namespace.response(400, "request error")
    @image_namespace.response(500, "can't save data in storage ( internal or external ) ")
    @image_namespace.doc(params={'image_body': "image itself", 'listing_id': "id of listing"})
    @image_namespace.marshal_with(image_item_view)
    @image_namespace.expect(image_upload_parser)
    def post(self, listing_id: int):
        """
        add new image to listing
        """

        full_path_to_image, extension = self._save_client_image_to_tempstorage(dir_name, image_name)



    def _save_client_image_to_tempstorage(self, directory_title: str, image_name: str) -> List[str]:
        """
        :param directory_title: directory name
        :param image_name: image name
        :return: [full path to image in temp storage,  file format (jpg, png, gif ....) ]
        """
        try:
            arguments = image_upload_parser.parse_args()
        except Exception as e:
            self._log.warning(f"unexpected user request {e}")
            image_namespace.abort(400, "unexpected user request")
        if not arguments["image_body"]:
            image_namespace.abort(400, "no image found in request ")
        full_path_to_image = StorageUtils.get_path_to_temp_upload(directory_title, image_name)

        file_format = ImageItems._file_extension_from_content_type(arguments['image_body'].content_type.lower())
        full_path_to_image = f"{full_path_to_image}.{file_format}"
        try:
            arguments["image_body"].save(full_path_to_image)
            self._log.info(f"uploaded file saved: {full_path_to_image}")
        except FileNotFoundError as e:
            self._log.error(f"can't save data into temp folder {full_path_to_image}")
            image_namespace.abort(500, "internal storage (temp) error")
        return full_path_to_image, file_format
    app.config['MAX_CONTENT_LENGTH'] = settings.FLASK_MAX_CONTENT_LENGTH

    @app.before_request
    def check_directory(request: Request):
        if (client_origin := get_client_url(request)):


    @app.after_request
    def add_cors_headers(response: Response) -> Response:
        # nginx settings:
        # proxy_set_header X-Real-IP $remote_addr
        remote_client = request.referrer[0:request.referrer.find("/", 9)] if request.referrer else request.remote_addr


    @app.before_first_request
    def load_db_settings(engine=None):
        if engine is None:


    @app.route('/')
    def get():
    	@after_this_request
	def add_header(response):
		return response

example of reading all htto request parameters

    @app.before_request
    def check_directory():
        if (client_origin := get_client_url(request)):
            if settings.DEFAULT_ID:
                g.directory_id = settings.DEFAULT_DIRECTORY_ID
            else:
                print("   ".join([f"{each_key}:{request.environ.get(each_key)}" for each_key in request.environ.keys()]))
                print(
                    f">>> {request.url_root}  {request.remote_addr}  {request.referrer}  {request.host_url}  {request.host}  {request.base_url}")
                g.directory_id = settings.LIST.get(client_origin, None)
                if not g.directory_id:
                    log.warning(f"current id is unknown for {client_origin}")
def null_int(field_name: str) -> Callable[[RowProxy], Optional[int]]:
    '''
    wrapper for 'fields.Integer' - avoid null value
    ```attribute=null_int('lead_id'))```
    :param field_name: name of Integer field
    :return: function for assigning to 'attribute'
    '''
    def avoid_null(row: RowProxy) -> Optional[int]:
        integer_value = row[field_name]
        if isinstance(integer_value, int):
            return integer_value
        if integer_value and isinstance(integer_value, str) and len(integer_value) and integer_value.isdigit():
            return int(integer_value)
        else:
            return None

    return avoid_null


def position_decoder(field_name: str) -> Callable[[RowProxy], Optional[str]]:
    '''
    wrapper for 'fields.Integer' - avoid null value
    ```attribute=null_int('lead_id'))```
    :param field_name: name of Integer field
    :return: function for assigning to 'attribute'
    '''
    def decode_decimal(row: RowProxy) -> Optional[str]:
        if field_name not in row or row[field_name] is None:
            return ""
        return "%.8f" % row[field_name]
    return decode_decimal